Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Fri Aug 3 19:00:15 2012 @@ -92,6 +92,8 @@ public class TestApplicationCleanup { Assert.assertEquals(request, conts.size()); am.unregisterAppAttempt(); + HeartbeatResponse resp = nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, + ContainerState.COMPLETE); am.waitForState(RMAppAttemptState.FINISHED); int cleanedConts = 0; @@ -102,8 +104,7 @@ public class TestApplicationCleanup { //currently only containers are cleaned via this //AM container is cleaned via container launcher waitCount = 0; - while ((cleanedConts < 3 || cleanedApps < 1) && waitCount++ < 20) { - HeartbeatResponse resp = nm1.nodeHeartbeat(true); + while ((cleanedConts < 2 || cleanedApps < 1) && waitCount++ < 20) { contsToClean = resp.getContainersToCleanupList(); apps = resp.getApplicationsToCleanupList(); LOG.info("Waiting to get cleanup events.. cleanedConts: " @@ -111,12 +112,13 @@ public class TestApplicationCleanup { cleanedConts += contsToClean.size(); cleanedApps += apps.size(); Thread.sleep(1000); + resp = nm1.nodeHeartbeat(true); } Assert.assertEquals(1, apps.size()); Assert.assertEquals(app.getApplicationId(), apps.get(0)); Assert.assertEquals(1, cleanedApps); - Assert.assertEquals(3, cleanedConts); + Assert.assertEquals(2, cleanedConts); rm.stop(); } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java Fri Aug 3 19:00:15 2012 @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.protoc import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; @@ -182,6 +183,10 @@ public class TestApplicationMasterLaunch am.registerAppAttempt(); am.unregisterAppAttempt(); + //complete the AM container to finish the app normally + nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + waitCount = 0; while (containerManager.cleanedup == false && waitCount++ < 20) { LOG.info("Waiting for AM Cleanup to happen.."); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Fri Aug 3 19:00:15 2012 @@ -20,7 +20,10 @@ package org.apache.hadoop.yarn.server.re import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -34,13 +37,21 @@ import org.apache.hadoop.yarn.api.Client import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.util.Records; import org.junit.Test; @@ -49,6 +60,9 @@ public class TestClientRMService { private static final Log LOG = LogFactory.getLog(TestClientRMService.class); + private RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + @Test public void testGetClusterNodes() throws Exception { MockRM rm = new MockRM() { @@ -109,4 +123,66 @@ public class TestClientRMService { Assert.assertNull("It should return null as application report for absent application.", applicationReport.getApplicationReport()); } + + @Test + public void testGetQueueInfo() throws Exception { + YarnScheduler yarnScheduler = mock(YarnScheduler.class); + RMContext rmContext = mock(RMContext.class); + mockRMContext(yarnScheduler, rmContext); + ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, + null, null, null); + GetQueueInfoRequest request = recordFactory + .newRecordInstance(GetQueueInfoRequest.class); + request.setQueueName("testqueue"); + request.setIncludeApplications(true); + GetQueueInfoResponse queueInfo = rmService.getQueueInfo(request); + List applications = queueInfo.getQueueInfo() + .getApplications(); + Assert.assertEquals(2, applications.size()); + } + + private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext) + throws IOException { + Dispatcher dispatcher = mock(Dispatcher.class); + when(rmContext.getDispatcher()).thenReturn(dispatcher); + QueueInfo queInfo = recordFactory.newRecordInstance(QueueInfo.class); + queInfo.setQueueName("testqueue"); + when(yarnScheduler.getQueueInfo(anyString(), anyBoolean(), anyBoolean())) + .thenReturn(queInfo); + ConcurrentHashMap apps = getRMApps(rmContext, + yarnScheduler); + when(rmContext.getRMApps()).thenReturn(apps); + } + + private ConcurrentHashMap getRMApps( + RMContext rmContext, YarnScheduler yarnScheduler) { + ConcurrentHashMap apps = + new ConcurrentHashMap(); + ApplicationId applicationId1 = getApplicationId(1); + ApplicationId applicationId2 = getApplicationId(2); + ApplicationId applicationId3 = getApplicationId(3); + YarnConfiguration config = new YarnConfiguration(); + apps.put(applicationId1, getRMApp(rmContext, yarnScheduler, applicationId1, + config, "testqueue")); + apps.put(applicationId2, getRMApp(rmContext, yarnScheduler, applicationId2, + config, "a")); + apps.put(applicationId3, getRMApp(rmContext, yarnScheduler, applicationId3, + config, "testqueue")); + return apps; + } + + private ApplicationId getApplicationId(int id) { + ApplicationId applicationId = recordFactory + .newRecordInstance(ApplicationId.class); + applicationId.setClusterTimestamp(123456); + applicationId.setId(id); + return applicationId; + } + + private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler, + ApplicationId applicationId3, YarnConfiguration config, String queueName) { + return new RMAppImpl(applicationId3, rmContext, config, null, null, + queueName, null, null, null, yarnScheduler, null, System + .currentTimeMillis()); + } } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Fri Aug 3 19:00:15 2012 @@ -19,26 +19,39 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.AMResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -207,6 +220,68 @@ public class TestFifoScheduler { Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB()); } + + @Test + public void testHeadroom() throws Exception { + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler(); + + // Add a node + RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1); + fs.handle(new NodeAddedSchedulerEvent(n1)); + + // Add two applications + ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( + appId1, 1); + SchedulerEvent event1 = new AppAddedSchedulerEvent(appAttemptId1, "queue", + "user"); + fs.handle(event1); + + ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2); + ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId( + appId2, 1); + SchedulerEvent event2 = new AppAddedSchedulerEvent(appAttemptId2, "queue", + "user"); + fs.handle(event2); + + List emptyStatus = new ArrayList(); + List emptyId = new ArrayList(); + List emptyAsk = new ArrayList(); + + // Set up resource requests + + // Ask for a 1 GB container for app 1 + List ask1 = new ArrayList(); + ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), "*", + BuilderUtils.newResource(GB), 1)); + fs.allocate(appAttemptId1, ask1, emptyId); + + // Ask for a 2 GB container for app 2 + List ask2 = new ArrayList(); + ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), "*", + BuilderUtils.newResource(2 * GB), 1)); + fs.allocate(appAttemptId2, ask2, emptyId); + + // Trigger container assignment + fs.handle(new NodeUpdateSchedulerEvent(n1, emptyStatus, emptyStatus)); + + // Get the allocation for the applications and verify headroom + Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId); + Assert.assertEquals("Allocation headroom", 1 * GB, + allocation1.getResourceLimit().getMemory()); + + Allocation allocation2 = fs.allocate(appAttemptId2, emptyAsk, emptyId); + Assert.assertEquals("Allocation headroom", 1 * GB, + allocation2.getResourceLimit().getMemory()); + + } + public static void main(String[] args) throws Exception { TestFifoScheduler t = new TestFifoScheduler(); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Fri Aug 3 19:00:15 2012 @@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -72,6 +73,7 @@ public class TestRM { MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); am.registerAppAttempt(); am.unregisterAppAttempt(); + nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE); am.waitForState(RMAppAttemptState.FINISHED); rm.stop(); } @@ -127,6 +129,7 @@ public class TestRM { Assert.assertEquals(10, conts.size()); am.unregisterAppAttempt(); + nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE); am.waitForState(RMAppAttemptState.FINISHED); rm.stop(); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Fri Aug 3 19:00:15 2012 @@ -77,7 +77,7 @@ public class TestRMNodeTransitions { InlineDispatcher rmDispatcher = new InlineDispatcher(); rmContext = - new RMContextImpl(new MemStore(), rmDispatcher, null, null, + new RMContextImpl(new MemStore(), rmDispatcher, null, null, null, mock(DelegationTokenRenewer.class), null); scheduler = mock(YarnScheduler.class); doAnswer( Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java Fri Aug 3 19:00:15 2012 @@ -52,8 +52,6 @@ public class TestNMExpiry { private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); ResourceTrackerService resourceTrackerService; - ContainerTokenSecretManager containerTokenSecretManager = - new ContainerTokenSecretManager(); private class TestNmLivelinessMonitor extends NMLivelinessMonitor { public TestNmLivelinessMonitor(Dispatcher dispatcher) { @@ -73,7 +71,7 @@ public class TestNMExpiry { // Dispatcher that processes events inline Dispatcher dispatcher = new InlineDispatcher(); RMContext context = new RMContextImpl(new MemStore(), dispatcher, null, - null, null, null); + null, null, null, null); dispatcher.register(SchedulerEventType.class, new InlineDispatcher.EmptyEventHandler()); dispatcher.register(RMNodeEventType.class, @@ -84,6 +82,8 @@ public class TestNMExpiry { nmLivelinessMonitor.start(); NodesListManager nodesListManager = new NodesListManager(context); nodesListManager.init(conf); + ContainerTokenSecretManager containerTokenSecretManager = + new ContainerTokenSecretManager(conf); resourceTrackerService = new ResourceTrackerService(context, nodesListManager, nmLivelinessMonitor, containerTokenSecretManager); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java Fri Aug 3 19:00:15 2012 @@ -52,8 +52,6 @@ import org.junit.Test; public class TestRMNMRPCResponseId { private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); ResourceTrackerService resourceTrackerService; - ContainerTokenSecretManager containerTokenSecretManager = - new ContainerTokenSecretManager(); private NodeId nodeId; @Before @@ -67,12 +65,15 @@ public class TestRMNMRPCResponseId { } }); RMContext context = - new RMContextImpl(new MemStore(), dispatcher, null, null, null, null); + new RMContextImpl(new MemStore(), dispatcher, null, null, null, + null, null); dispatcher.register(RMNodeEventType.class, new ResourceManager.NodeEventDispatcher(context)); NodesListManager nodesListManager = new NodesListManager(context); Configuration conf = new Configuration(); nodesListManager.init(conf); + ContainerTokenSecretManager containerTokenSecretManager = + new ContainerTokenSecretManager(conf); resourceTrackerService = new ResourceTrackerService(context, nodesListManager, new NMLivelinessMonitor(dispatcher), containerTokenSecretManager); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Fri Aug 3 19:00:15 2012 @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -117,10 +118,10 @@ public class TestRMAppTransitions { ContainerAllocationExpirer containerAllocationExpirer = mock(ContainerAllocationExpirer.class); AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); - this.rmContext = - new RMContextImpl(new MemStore(), rmDispatcher, - containerAllocationExpirer, amLivelinessMonitor, null, - new ApplicationTokenSecretManager(conf)); + AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); + this.rmContext = new RMContextImpl(new MemStore(), rmDispatcher, + containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, + null, new ApplicationTokenSecretManager(conf)); rmDispatcher.register(RMAppAttemptEventType.class, new TestApplicationAttemptEventDispatcher(this.rmContext)); @@ -131,7 +132,7 @@ public class TestRMAppTransitions { rmDispatcher.start(); } - protected RMApp createNewTestApp() { + protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext) { ApplicationId applicationId = MockApps.newAppID(appId++); String user = MockApps.newUserName(); String name = MockApps.newAppName(); @@ -139,12 +140,15 @@ public class TestRMAppTransitions { Configuration conf = new YarnConfiguration(); // ensure max retries set to known value conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, maxRetries); - ApplicationSubmissionContext submissionContext = null; String clientTokenStr = "bogusstring"; ApplicationStore appStore = mock(ApplicationStore.class); YarnScheduler scheduler = mock(YarnScheduler.class); ApplicationMasterService masterService = new ApplicationMasterService(rmContext, scheduler); + + if(submissionContext == null) { + submissionContext = new ApplicationSubmissionContextPBImpl(); + } RMApp application = new RMAppImpl(applicationId, rmContext, conf, name, user, @@ -235,8 +239,9 @@ public class TestRMAppTransitions { diag.toString().matches(regex)); } - protected RMApp testCreateAppSubmitted() throws IOException { - RMApp application = createNewTestApp(); + protected RMApp testCreateAppSubmitted( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = createNewTestApp(submissionContext); // NEW => SUBMITTED event RMAppEventType.START RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.START); @@ -246,9 +251,10 @@ public class TestRMAppTransitions { return application; } - protected RMApp testCreateAppAccepted() throws IOException { - RMApp application = testCreateAppSubmitted(); - // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED + protected RMApp testCreateAppAccepted( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = testCreateAppSubmitted(submissionContext); + // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_ACCEPTED); @@ -258,8 +264,9 @@ public class TestRMAppTransitions { return application; } - protected RMApp testCreateAppRunning() throws IOException { - RMApp application = testCreateAppAccepted(); + protected RMApp testCreateAppRunning( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = testCreateAppAccepted(submissionContext); // ACCEPTED => RUNNING event RMAppEventType.ATTEMPT_REGISTERED RMAppEvent event = new RMAppEvent(application.getApplicationId(), @@ -271,13 +278,35 @@ public class TestRMAppTransitions { return application; } - protected RMApp testCreateAppFinished() throws IOException { - RMApp application = testCreateAppRunning(); - // RUNNING => FINISHED event RMAppEventType.ATTEMPT_FINISHED - RMAppEvent event = + protected RMApp testCreateAppFinishing( + ApplicationSubmissionContext submissionContext) throws IOException { + // unmanaged AMs don't use the FINISHING state + assert submissionContext == null || !submissionContext.getUnmanagedAM(); + RMApp application = testCreateAppRunning(submissionContext); + // RUNNING => FINISHING event RMAppEventType.ATTEMPT_FINISHING + RMAppEvent finishingEvent = + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_FINISHING); + application.handle(finishingEvent); + assertAppState(RMAppState.FINISHING, application); + assertTimesAtFinish(application); + return application; + } + + protected RMApp testCreateAppFinished( + ApplicationSubmissionContext submissionContext) throws IOException { + // unmanaged AMs don't use the FINISHING state + RMApp application = null; + if (submissionContext != null && submissionContext.getUnmanagedAM()) { + application = testCreateAppRunning(submissionContext); + } else { + application = testCreateAppFinishing(submissionContext); + } + // RUNNING/FINISHING => FINISHED event RMAppEventType.ATTEMPT_FINISHED + RMAppEvent finishedEvent = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FINISHED); - application.handle(event); + application.handle(finishedEvent); assertAppState(RMAppState.FINISHED, application); assertTimesAtFinish(application); // finished without a proper unregister implies failed @@ -286,16 +315,37 @@ public class TestRMAppTransitions { } @Test + public void testUnmanagedApp() throws IOException { + ApplicationSubmissionContext subContext = new ApplicationSubmissionContextPBImpl(); + subContext.setUnmanagedAM(true); + + // test success path + LOG.info("--- START: testUnmanagedAppSuccessPath ---"); + testCreateAppFinished(subContext); + + // test app fails after 1 app attempt failure + LOG.info("--- START: testUnmanagedAppFailPath ---"); + RMApp application = testCreateAppRunning(subContext); + RMAppEvent event = new RMAppFailedAttemptEvent( + application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, ""); + application.handle(event); + RMAppAttempt appAttempt = application.getCurrentAppAttempt(); + Assert.assertEquals(1, appAttempt.getAppAttemptId().getAttemptId()); + assertFailed(application, + ".*Unmanaged application.*Failing the application.*"); + } + + @Test public void testAppSuccessPath() throws IOException { LOG.info("--- START: testAppSuccessPath ---"); - testCreateAppFinished(); + testCreateAppFinished(null); } @Test public void testAppNewKill() throws IOException { LOG.info("--- START: testAppNewKill ---"); - RMApp application = createNewTestApp(); + RMApp application = createNewTestApp(null); // NEW => KILLED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); @@ -307,7 +357,7 @@ public class TestRMAppTransitions { public void testAppNewReject() throws IOException { LOG.info("--- START: testAppNewReject ---"); - RMApp application = createNewTestApp(); + RMApp application = createNewTestApp(null); // NEW => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "Test Application Rejected"; RMAppEvent event = @@ -320,7 +370,7 @@ public class TestRMAppTransitions { public void testAppSubmittedRejected() throws IOException { LOG.info("--- START: testAppSubmittedRejected ---"); - RMApp application = testCreateAppSubmitted(); + RMApp application = testCreateAppSubmitted(null); // SUBMITTED => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "app rejected"; RMAppEvent event = @@ -333,7 +383,7 @@ public class TestRMAppTransitions { public void testAppSubmittedKill() throws IOException { LOG.info("--- START: testAppSubmittedKill---"); - RMApp application = testCreateAppAccepted(); + RMApp application = testCreateAppAccepted(null); // SUBMITTED => KILLED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), application); @@ -345,7 +395,7 @@ public class TestRMAppTransitions { public void testAppAcceptedFailed() throws IOException { LOG.info("--- START: testAppAcceptedFailed ---"); - RMApp application = testCreateAppAccepted(); + RMApp application = testCreateAppAccepted(null); // ACCEPTED => ACCEPTED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED for (int i=1; i KILLED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); @@ -386,7 +436,7 @@ public class TestRMAppTransitions { public void testAppRunningKill() throws IOException { LOG.info("--- START: testAppRunningKill ---"); - RMApp application = testCreateAppRunning(); + RMApp application = testCreateAppRunning(null); // RUNNING => KILLED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); @@ -398,7 +448,7 @@ public class TestRMAppTransitions { public void testAppRunningFailed() throws IOException { LOG.info("--- START: testAppRunningFailed ---"); - RMApp application = testCreateAppRunning(); + RMApp application = testCreateAppRunning(null); RMAppAttempt appAttempt = application.getCurrentAppAttempt(); int expectedAttemptId = 1; Assert.assertEquals(expectedAttemptId, @@ -439,12 +489,23 @@ public class TestRMAppTransitions { assertFailed(application, ".*Failing the application.*"); } + @Test + public void testAppFinishingKill() throws IOException { + LOG.info("--- START: testAppFinishedFinished ---"); + + RMApp application = testCreateAppFinishing(null); + // FINISHING => FINISHED event RMAppEventType.KILL + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + application.handle(event); + assertAppState(RMAppState.FINISHED, application); + } @Test public void testAppFinishedFinished() throws IOException { LOG.info("--- START: testAppFinishedFinished ---"); - RMApp application = testCreateAppFinished(); + RMApp application = testCreateAppFinished(null); // FINISHED => FINISHED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); @@ -460,7 +521,7 @@ public class TestRMAppTransitions { public void testAppKilledKilled() throws IOException { LOG.info("--- START: testAppKilledKilled ---"); - RMApp application = testCreateAppRunning(); + RMApp application = testCreateAppRunning(null); // RUNNING => KILLED event RMAppEventType.KILL RMAppEvent event = Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Fri Aug 3 19:00:15 2012 @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -56,7 +58,9 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent; @@ -68,6 +72,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -83,6 +88,8 @@ public class TestRMAppAttemptTransitions private YarnScheduler scheduler; private ApplicationMasterService masterService; private ApplicationMasterLauncher applicationMasterLauncher; + private AMLivelinessMonitor amLivelinessMonitor; + private AMLivelinessMonitor amFinishingMonitor; private RMApp application; private RMAppAttempt applicationAttempt; @@ -135,6 +142,9 @@ public class TestRMAppAttemptTransitions } private static int appId = 1; + + private ApplicationSubmissionContext submissionContext = null; + private boolean unmanagedAM; @Before public void setUp() throws Exception { @@ -142,11 +152,11 @@ public class TestRMAppAttemptTransitions ContainerAllocationExpirer containerAllocationExpirer = mock(ContainerAllocationExpirer.class); - AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); - rmContext = - new RMContextImpl(new MemStore(), rmDispatcher, - containerAllocationExpirer, amLivelinessMonitor, null, - new ApplicationTokenSecretManager(new Configuration())); + amLivelinessMonitor = mock(AMLivelinessMonitor.class); + amFinishingMonitor = mock(AMLivelinessMonitor.class); + rmContext = new RMContextImpl(new MemStore(), rmDispatcher, + containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, + null, new ApplicationTokenSecretManager(new Configuration())); scheduler = mock(YarnScheduler.class); masterService = mock(ApplicationMasterService.class); @@ -174,8 +184,7 @@ public class TestRMAppAttemptTransitions final String user = MockApps.newUserName(); final String queue = MockApps.newQueue(); - ApplicationSubmissionContext submissionContext = - mock(ApplicationSubmissionContext.class); + submissionContext = mock(ApplicationSubmissionContext.class); when(submissionContext.getUser()).thenReturn(user); when(submissionContext.getQueue()).thenReturn(queue); ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); @@ -183,6 +192,8 @@ public class TestRMAppAttemptTransitions when(amContainerSpec.getResource()).thenReturn(resource); when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec); + unmanagedAM = false; + application = mock(RMApp.class); applicationAttempt = new RMAppAttemptImpl(applicationAttemptId, null, rmContext, scheduler, @@ -247,7 +258,8 @@ public class TestRMAppAttemptTransitions assertEquals(0, applicationAttempt.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); - // Check events + // this works for unmanaged and managed AM's because this is actually doing + // verify(application).handle(anyObject()); verify(application).handle(any(RMAppRejectedEvent.class)); } @@ -269,9 +281,24 @@ public class TestRMAppAttemptTransitions /** * {@link RMAppAttemptState#SCHEDULED} */ + @SuppressWarnings("unchecked") private void testAppAttemptScheduledState() { - assertEquals(RMAppAttemptState.SCHEDULED, + RMAppAttemptState expectedState; + int expectedAllocateCount; + if(unmanagedAM) { + expectedState = RMAppAttemptState.LAUNCHED; + expectedAllocateCount = 0; + } else { + expectedState = RMAppAttemptState.SCHEDULED; + expectedAllocateCount = 1; + } + + assertEquals(expectedState, applicationAttempt.getAppAttemptState()); + verify(scheduler, times(expectedAllocateCount)). + allocate(any(ApplicationAttemptId.class), + any(List.class), any(List.class)); + assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); @@ -280,9 +307,6 @@ public class TestRMAppAttemptTransitions // Check events verify(application).handle(any(RMAppEvent.class)); - verify(scheduler). - allocate(any(ApplicationAttemptId.class), - any(List.class), any(List.class)); } /** @@ -346,19 +370,38 @@ public class TestRMAppAttemptTransitions } /** + * {@link RMAppAttemptState#FINISHING} + */ + private void testAppAttemptFinishingState(Container container, + FinalApplicationStatus finalStatus, + String trackingUrl, + String diagnostics) { + assertEquals(RMAppAttemptState.FINISHING, + applicationAttempt.getAppAttemptState()); + assertEquals(diagnostics, applicationAttempt.getDiagnostics()); + assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl()); + assertEquals("null/proxy/"+applicationAttempt.getAppAttemptId(). + getApplicationId()+"/", applicationAttempt.getTrackingUrl()); + assertEquals(container, applicationAttempt.getMasterContainer()); + assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); + } + + /** * {@link RMAppAttemptState#FINISHED} */ private void testAppAttemptFinishedState(Container container, FinalApplicationStatus finalStatus, String trackingUrl, - String diagnostics) { + String diagnostics, + int finishedContainerCount) { assertEquals(RMAppAttemptState.FINISHED, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals("null/proxy/"+applicationAttempt.getAppAttemptId(). getApplicationId()+"/", applicationAttempt.getTrackingUrl()); - assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); + assertEquals(finishedContainerCount, applicationAttempt + .getJustFinishedContainers().size()); assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); } @@ -385,6 +428,8 @@ public class TestRMAppAttemptTransitions // Mock the allocation of AM container Container container = mock(Container.class); + when(container.getId()).thenReturn( + BuilderUtils.newContainerId(applicationAttempt.getAppAttemptId(), 1)); Allocation allocation = mock(Allocation.class); when(allocation.getContainers()). thenReturn(Collections.singletonList(container)); @@ -424,7 +469,62 @@ public class TestRMAppAttemptTransitions testAppAttemptRunningState(container, host, rpcPort, trackingUrl); } - + + private void unregisterApplicationAttempt(Container container, + FinalApplicationStatus finalStatus, String trackingUrl, + String diagnostics) { + applicationAttempt.handle( + new RMAppAttemptUnregistrationEvent( + applicationAttempt.getAppAttemptId(), + trackingUrl, finalStatus, diagnostics)); + testAppAttemptFinishingState(container, finalStatus, + trackingUrl, diagnostics); + } + + + @Test + public void testUnmanagedAMSuccess() { + unmanagedAM = true; + when(submissionContext.getUnmanagedAM()).thenReturn(true); + // submit AM and check it goes to LAUNCHED state + scheduleApplicationAttempt(); + testAppAttemptLaunchedState(null); + verify(amLivelinessMonitor, times(1)).register( + applicationAttempt.getAppAttemptId()); + + // launch AM + runApplicationAttempt(null, "host", 8042, "oldtrackingurl"); + + // complete a container + applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent( + applicationAttempt.getAppAttemptId(), mock(Container.class))); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class))); + // complete AM + String trackingUrl = "mytrackingurl"; + String diagnostics = "Successful"; + FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; + applicationAttempt.handle(new RMAppAttemptUnregistrationEvent( + applicationAttempt.getAppAttemptId(), trackingUrl, finalStatus, + diagnostics)); + testAppAttemptFinishedState(null, finalStatus, trackingUrl, diagnostics, 1); + } + + @Test + public void testUnmanagedAMUnexpectedRegistration() { + unmanagedAM = true; + when(submissionContext.getUnmanagedAM()).thenReturn(true); + + // submit AM and check it goes to SUBMITTED state + submitApplicationAttempt(); + assertEquals(RMAppAttemptState.SUBMITTED, + applicationAttempt.getAppAttemptState()); + + // launch AM and verify attempt failed + applicationAttempt.handle(new RMAppAttemptRegistrationEvent( + applicationAttempt.getAppAttemptId(), "host", 8042, "oldtrackingurl")); + testAppAttemptSubmittedToFailedState("Unmanaged AM must register after AM attempt reaches LAUNCHED state."); + } @Test public void testNewToKilled() { @@ -487,36 +587,99 @@ public class TestRMAppAttemptTransitions } @Test - public void testUnregisterToKilledFinish() { + public void testUnregisterToKilledFinishing() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl"); + unregisterApplicationAttempt(amContainer, + FinalApplicationStatus.KILLED, "newtrackingurl", + "Killed by user"); + } + + @Test + public void testUnregisterToSuccessfulFinishing() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl"); + unregisterApplicationAttempt(amContainer, + FinalApplicationStatus.SUCCEEDED, "mytrackingurl", "Successful"); + } + + @Test + public void testFinishingKill() { Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl"); + FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED; String trackingUrl = "newtrackingurl"; - String diagnostics = "Killed by user"; - FinalApplicationStatus finalStatus = FinalApplicationStatus.KILLED; + String diagnostics = "Job failed"; + unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl, + diagnostics); applicationAttempt.handle( - new RMAppAttemptUnregistrationEvent( + new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), - trackingUrl, finalStatus, diagnostics)); - testAppAttemptFinishedState(amContainer, finalStatus, - trackingUrl, diagnostics); + RMAppAttemptEventType.KILL)); + testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl, + diagnostics); } - - - @Test - public void testUnregisterToSuccessfulFinish() { + + @Test + public void testFinishingExpire() { Container amContainer = allocateApplicationAttempt(); launchApplicationAttempt(amContainer); runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl"); + FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; String trackingUrl = "mytrackingurl"; String diagnostics = "Successful"; + unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl, + diagnostics); + applicationAttempt.handle( + new RMAppAttemptEvent( + applicationAttempt.getAppAttemptId(), + RMAppAttemptEventType.EXPIRE)); + testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, + diagnostics, 0); + } + + @Test + public void testFinishingToFinishing() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl"); FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; + String trackingUrl = "mytrackingurl"; + String diagnostics = "Successful"; + unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl, + diagnostics); + // container must be AM container to move from FINISHING to FINISHED applicationAttempt.handle( - new RMAppAttemptUnregistrationEvent( - applicationAttempt.getAppAttemptId(), - trackingUrl, finalStatus, diagnostics)); - testAppAttemptFinishedState(amContainer, finalStatus, - trackingUrl, diagnostics); + new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), + BuilderUtils.newContainerStatus( + BuilderUtils.newContainerId( + applicationAttempt.getAppAttemptId(), 42), + ContainerState.COMPLETE, "", 0))); + testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl, + diagnostics); + } + + @Test + public void testSuccessfulFinishingToFinished() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl"); + FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; + String trackingUrl = "mytrackingurl"; + String diagnostics = "Successful"; + unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl, + diagnostics); + applicationAttempt.handle( + new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), + BuilderUtils.newContainerStatus(amContainer.getId(), + ContainerState.COMPLETE, "", 0))); + testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, + diagnostics, 0); } } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Fri Aug 3 19:00:15 2012 @@ -40,8 +40,8 @@ import org.apache.hadoop.yarn.factory.pr import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -111,8 +111,8 @@ public class TestApplicationLimits { LOG.info("Setup top-level queues a and b"); } - private SchedulerApp getMockApplication(int appId, String user) { - SchedulerApp application = mock(SchedulerApp.class); + private FiCaSchedulerApp getMockApplication(int appId, String user) { + FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); ApplicationAttemptId applicationAttemptId = TestUtils.getMockApplicationAttemptId(appId, 0); doReturn(applicationAttemptId.getApplicationId()). @@ -158,7 +158,9 @@ public class TestApplicationLimits { int expectedMaxActiveApps = Math.max(1, (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * - csConf.getMaximumApplicationMasterResourcePercent() * + csConf. + getMaximumApplicationMasterResourcePerQueuePercent( + queue.getQueuePath()) * queue.getAbsoluteMaximumCapacity())); assertEquals(expectedMaxActiveApps, queue.getMaximumActiveApplications()); @@ -183,7 +185,9 @@ public class TestApplicationLimits { expectedMaxActiveApps = Math.max(1, (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * - csConf.getMaximumApplicationMasterResourcePercent() * + csConf. + getMaximumApplicationMasterResourcePerQueuePercent( + queue.getQueuePath()) * queue.getAbsoluteMaximumCapacity())); assertEquals(expectedMaxActiveApps, queue.getMaximumActiveApplications()); @@ -200,6 +204,72 @@ public class TestApplicationLimits { (int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()), queue.getMetrics().getAvailableMB() ); + + // should return -1 if per queue setting not set + assertEquals((int)csConf.UNDEFINED, csConf.getMaximumApplicationsPerQueue(queue.getQueuePath())); + int expectedMaxApps = (int)(csConf.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * + queue.getAbsoluteCapacity()); + assertEquals(expectedMaxApps, queue.getMaxApplications()); + + int expectedMaxAppsPerUser = (int)(expectedMaxApps * + (queue.getUserLimit()/100.0f) * queue.getUserLimitFactor()); + assertEquals(expectedMaxAppsPerUser, queue.getMaxApplicationsPerUser()); + + // should default to global setting if per queue setting not set + assertEquals((long) csConf.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT, + (long) csConf.getMaximumApplicationMasterResourcePerQueuePercent(queue.getQueuePath())); + + // Change the per-queue max AM resources percentage. + csConf.setFloat( + "yarn.scheduler.capacity." + + queue.getQueuePath() + + ".maximum-am-resource-percent", + 0.5f); + // Re-create queues to get new configs. + queues = new HashMap(); + root = + CapacityScheduler.parseQueue(csContext, csConf, null, "root", + queues, queues, + CapacityScheduler.queueComparator, + CapacityScheduler.applicationComparator, + TestUtils.spyHook); + clusterResource = Resources.createResource(100 * 16 * GB); + + queue = (LeafQueue)queues.get(A); + expectedMaxActiveApps = + Math.max(1, + (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * + csConf. + getMaximumApplicationMasterResourcePerQueuePercent( + queue.getQueuePath()) * + queue.getAbsoluteMaximumCapacity())); + + assertEquals((long) 0.5, + (long) csConf.getMaximumApplicationMasterResourcePerQueuePercent(queue.getQueuePath())); + assertEquals(expectedMaxActiveApps, + queue.getMaximumActiveApplications()); + + // Change the per-queue max applications. + csConf.setInt( + "yarn.scheduler.capacity." + + queue.getQueuePath() + + ".maximum-applications", 9999); + // Re-create queues to get new configs. + queues = new HashMap(); + root = + CapacityScheduler.parseQueue(csContext, csConf, null, "root", + queues, queues, + CapacityScheduler.queueComparator, + CapacityScheduler.applicationComparator, + TestUtils.spyHook); + + queue = (LeafQueue)queues.get(A); + assertEquals(9999, (int)csConf.getMaximumApplicationsPerQueue(queue.getQueuePath())); + assertEquals(9999, queue.getMaxApplications()); + + expectedMaxAppsPerUser = (int)(9999 * + (queue.getUserLimit()/100.0f) * queue.getUserLimitFactor()); + assertEquals(expectedMaxAppsPerUser, queue.getMaxApplicationsPerUser()); } @Test @@ -209,7 +279,7 @@ public class TestApplicationLimits { int APPLICATION_ID = 0; // Submit first application - SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_0, user_0, A); assertEquals(1, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -217,7 +287,7 @@ public class TestApplicationLimits { assertEquals(0, queue.getNumPendingApplications(user_0)); // Submit second application - SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_1, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -225,7 +295,7 @@ public class TestApplicationLimits { assertEquals(0, queue.getNumPendingApplications(user_0)); // Submit third application, should remain pending - SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_2, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -240,7 +310,7 @@ public class TestApplicationLimits { assertEquals(0, queue.getNumPendingApplications(user_0)); // Submit another one for user_0 - SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_3, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -251,7 +321,7 @@ public class TestApplicationLimits { doReturn(3).when(queue).getMaximumActiveApplications(); // Submit first app for user_1 - SchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1); + FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1); queue.submitApplication(app_4, user_1, A); assertEquals(3, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -261,7 +331,7 @@ public class TestApplicationLimits { assertEquals(0, queue.getNumPendingApplications(user_1)); // Submit second app for user_1, should block due to queue-limit - SchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1); + FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1); queue.submitApplication(app_5, user_1, A); assertEquals(3, queue.getNumActiveApplications()); assertEquals(2, queue.getNumPendingApplications()); @@ -290,7 +360,7 @@ public class TestApplicationLimits { doReturn(2).when(queue).getMaximumActiveApplications(); // Submit first application - SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_0, user_0, A); assertEquals(1, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -299,7 +369,7 @@ public class TestApplicationLimits { assertTrue(queue.activeApplications.contains(app_0)); // Submit second application - SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_1, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -308,7 +378,7 @@ public class TestApplicationLimits { assertTrue(queue.activeApplications.contains(app_1)); // Submit third application, should remain pending - SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_2, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -317,7 +387,7 @@ public class TestApplicationLimits { assertTrue(queue.pendingApplications.contains(app_2)); // Submit fourth application, should remain pending - SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_3, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(2, queue.getNumPendingApplications()); @@ -393,7 +463,7 @@ public class TestApplicationLimits { String host_0 = "host_0"; String rack_0 = "rack_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 16*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 16*GB); final String user_0 = "user_0"; final String user_1 = "user_1"; @@ -408,8 +478,8 @@ public class TestApplicationLimits { // and check headroom final ApplicationAttemptId appAttemptId_0_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0_0 = - spy(new SchedulerApp(appAttemptId_0_0, user_0, queue, + FiCaSchedulerApp app_0_0 = + spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue, queue.getActiveUsersManager(), rmContext, null)); queue.submitApplication(app_0_0, user_0, A); @@ -427,8 +497,8 @@ public class TestApplicationLimits { // Submit second application from user_0, check headroom final ApplicationAttemptId appAttemptId_0_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_0_1 = - spy(new SchedulerApp(appAttemptId_0_1, user_0, queue, + FiCaSchedulerApp app_0_1 = + spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue, queue.getActiveUsersManager(), rmContext, null)); queue.submitApplication(app_0_1, user_0, A); @@ -446,8 +516,8 @@ public class TestApplicationLimits { // Submit first application from user_1, check for new headroom final ApplicationAttemptId appAttemptId_1_0 = TestUtils.getMockApplicationAttemptId(2, 0); - SchedulerApp app_1_0 = - spy(new SchedulerApp(appAttemptId_1_0, user_1, queue, + FiCaSchedulerApp app_1_0 = + spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue, queue.getActiveUsersManager(), rmContext, null)); queue.submitApplication(app_1_0, user_1, A); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Fri Aug 3 19:00:15 2012 @@ -62,8 +62,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.junit.After; import org.junit.Before; @@ -171,14 +171,14 @@ public class TestLeafQueue { @Override public Container answer(InvocationOnMock invocation) throws Throwable { - final SchedulerApp application = - (SchedulerApp)(invocation.getArguments()[0]); + final FiCaSchedulerApp application = + (FiCaSchedulerApp)(invocation.getArguments()[0]); final ContainerId containerId = TestUtils.getMockContainerId(application); Container container = TestUtils.getMockContainer( containerId, - ((SchedulerNode)(invocation.getArguments()[1])).getNodeID(), + ((FiCaSchedulerNode)(invocation.getArguments()[1])).getNodeID(), (Resource)(invocation.getArguments()[2]), ((Priority)invocation.getArguments()[3])); return container; @@ -186,8 +186,8 @@ public class TestLeafQueue { } ). when(queue).createContainer( - any(SchedulerApp.class), - any(SchedulerNode.class), + any(FiCaSchedulerApp.class), + any(FiCaSchedulerNode.class), any(Resource.class), any(Priority.class) ); @@ -195,7 +195,7 @@ public class TestLeafQueue { // 2. Stub out LeafQueue.parent.completedContainer CSQueue parent = queue.getParent(); doNothing().when(parent).completedContainer( - any(Resource.class), any(SchedulerApp.class), any(SchedulerNode.class), + any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), any(RMContainer.class), any(ContainerStatus.class), any(RMContainerEventType.class)); @@ -238,22 +238,22 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, B); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_0, B); // same user // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); final int numNodes = 1; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -284,14 +284,14 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 1); - SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_d, d, null, + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_d, d, null, rmContext, null); d.submitApplication(app_0, user_d, D); // Attempt the same application again final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(0, 2); - SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_d, d, null, + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_d, d, null, rmContext, null); d.submitApplication(app_1, user_d, D); // same user } @@ -309,7 +309,7 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 1); - SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_0, a, null, + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, null, rmContext, null); a.submitApplication(app_0, user_0, B); @@ -324,7 +324,7 @@ public class TestLeafQueue { // Attempt the same application again final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(0, 2); - SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_0, a, null, + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, null, rmContext, null); a.submitApplication(app_1, user_0, B); // same user @@ -359,22 +359,22 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_0, A); // same user // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); final int numNodes = 1; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -483,30 +483,30 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_1, user_0, A); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); - SchedulerApp app_2 = - new SchedulerApp(appAttemptId_2, user_1, a, + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_2, user_1, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); String host_1 = "host_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -576,30 +576,30 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_1, user_0, A); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); - SchedulerApp app_2 = - new SchedulerApp(appAttemptId_2, user_1, a, + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_2, user_1, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); String host_1 = "host_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -687,35 +687,35 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_1, user_0, A); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); - SchedulerApp app_2 = - new SchedulerApp(appAttemptId_2, user_1, a, + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_2, user_1, A); final ApplicationAttemptId appAttemptId_3 = TestUtils.getMockApplicationAttemptId(3, 0); - SchedulerApp app_3 = - new SchedulerApp(appAttemptId_3, user_2, a, + FiCaSchedulerApp app_3 = + new FiCaSchedulerApp(appAttemptId_3, user_2, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_3, user_2, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); final int numNodes = 1; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -862,21 +862,21 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_1, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_1, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_1, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (4*GB)); @@ -961,23 +961,23 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_1, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_1, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_1, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); String host_1 = "host_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (4*GB)); @@ -1060,24 +1060,24 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_1, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_1, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_1, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); String host_1 = "host_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (4*GB)); @@ -1175,23 +1175,23 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - spy(new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null)); a.submitApplication(app_0, user_0, A); // Setup some nodes and racks String host_0 = "host_0"; String rack_0 = "rack_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); String host_1 = "host_1"; String rack_1 = "rack_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); String host_2 = "host_2"; String rack_2 = "rack_2"; - SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -1284,7 +1284,7 @@ public class TestLeafQueue { assertEquals(1, app_0.getTotalRequiredResources(priority)); String host_3 = "host_3"; // on rack_1 - SchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); + FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); assignment = a.assignContainers(clusterResource, node_3); verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), @@ -1305,23 +1305,23 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - spy(new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null)); a.submitApplication(app_0, user_0, A); // Setup some nodes and racks String host_0 = "host_0"; String rack_0 = "rack_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); String host_1 = "host_1"; String rack_1 = "rack_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); String host_2 = "host_2"; String rack_2 = "rack_2"; - SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -1435,22 +1435,22 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - spy(new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null)); a.submitApplication(app_0, user_0, A); // Setup some nodes and racks String host_0_0 = "host_0_0"; String rack_0 = "rack_0"; - SchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB); + FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB); String host_0_1 = "host_0_1"; - SchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB); + FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB); String host_1_0 = "host_1_0"; String rack_1 = "rack_1"; - SchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB); + FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (8*GB));