Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DBC047FA4 for ; Wed, 3 Aug 2011 11:36:59 +0000 (UTC) Received: (qmail 77133 invoked by uid 500); 3 Aug 2011 11:36:59 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 77036 invoked by uid 500); 3 Aug 2011 11:36:57 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 77025 invoked by uid 99); 3 Aug 2011 11:36:55 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Aug 2011 11:36:55 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Aug 2011 11:36:50 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 54AE1238885D; Wed, 3 Aug 2011 11:36:28 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1153435 - in /hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager: MockAM.java MockNM.java MockRM.java TestApplicationCleanup.java TestRM.java Date: Wed, 03 Aug 2011 11:36:28 -0000 To: mapreduce-commits@hadoop.apache.org From: vinodkv@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110803113628.54AE1238885D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: vinodkv Date: Wed Aug 3 11:36:27 2011 New Revision: 1153435 URL: http://svn.apache.org/viewvc?rev=1153435&view=rev Log: Introduced MockAM and MockNM in RM for better testability. Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1153435&view=auto ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (added) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Wed Aug 3 11:36:27 2011 @@ -0,0 +1,125 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.yarn.api.AMRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.util.Records; + +public class MockAM { + + private volatile int responseId = 0; + private final ApplicationAttemptId attemptId; + private final RMContext context; + private final AMRMProtocol amRMProtocol; + + MockAM(RMContext context, AMRMProtocol amRMProtocol, + ApplicationAttemptId attemptId) { + this.context = context; + this.amRMProtocol = amRMProtocol; + this.attemptId = attemptId; + } + + public void waitForState(RMAppAttemptState finalState) throws Exception { + RMApp app = context.getRMApps().get(attemptId.getApplicationId()); + RMAppAttempt attempt = app.getRMAppAttempt(attemptId); + int timeoutSecs = 0; + while (!finalState.equals(attempt.getAppAttemptState()) + && timeoutSecs++ < 20) { + System.out + .println("AppAttempt State is : " + attempt.getAppAttemptState() + + " Waiting for state : " + finalState); + Thread.sleep(500); + } + System.out.println("AppAttempt State is : " + attempt.getAppAttemptState()); + Assert.assertEquals("AppAttempt state is not correct (timedout)", + finalState, attempt.getAppAttemptState()); + } + + public void registerAppAttempt() throws Exception { + waitForState(RMAppAttemptState.LAUNCHED); + responseId = 0; + RegisterApplicationMasterRequest req = Records.newRecord(RegisterApplicationMasterRequest.class); + req.setApplicationAttemptId(attemptId); + req.setHost(""); + req.setRpcPort(1); + req.setTrackingUrl(""); + amRMProtocol.registerApplicationMaster(req); + } + + public List allocate( + String host, int memory, int numContainers, + List releases) throws Exception { + List reqs = createReq(host, memory, 1, numContainers); + List toRelease = new ArrayList(); + for (ContainerId id : releases) { + Container cont = Records.newRecord(Container.class); + cont.setId(id); + //TOOD: set all fields + } + return allocate(toRelease, reqs); + } + + private List createReq(String host, int memory, int priority, + int containers) throws Exception { + ResourceRequest hostReq = createResourceReq(host, memory, priority, + containers); + ResourceRequest rackReq = createResourceReq("default-rack", memory, + priority, containers); + ResourceRequest offRackReq = createResourceReq("*", memory, priority, + containers); + return Arrays.asList(new ResourceRequest[] {hostReq, rackReq, offRackReq}); + + } + private ResourceRequest createResourceReq(String resource, int memory, int priority, + int containers) throws Exception { + ResourceRequest req = Records.newRecord(ResourceRequest.class); + req.setHostName(resource); + req.setNumContainers(containers); + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(1); + req.setPriority(pri); + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(memory); + req.setCapability(capability); + return req; + } + + public List allocate( + List releases, List resourceRequest) + throws Exception { + AllocateRequest req = Records.newRecord(AllocateRequest.class); + req.setResponseId(++responseId); + req.setApplicationAttemptId(attemptId); + req.addAllAsks(resourceRequest); + req.addAllReleases(releases); + AllocateResponse resp = amRMProtocol.allocate(req); + return resp.getAMResponse().getContainerList(); + } + + public void unregisterAppAttempt() throws Exception { + waitForState(RMAppAttemptState.RUNNING); + FinishApplicationMasterRequest req = Records.newRecord(FinishApplicationMasterRequest.class); + req.setAppAttemptId(attemptId); + req.setDiagnostics(""); + req.setFinalState(""); + req.setTrackingUrl(""); + amRMProtocol.finishApplicationMaster(req); + } +} Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1153435&view=auto ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (added) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Wed Aug 3 11:36:27 2011 @@ -0,0 +1,77 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.util.Records; + +public class MockNM { + + private int responseId; + private NodeId nodeId; + private final String nodeIdStr; + private final int memory; + private final ResourceTrackerService resourceTracker; + + MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { + this.nodeIdStr = nodeIdStr; + this.memory = memory; + this.resourceTracker = resourceTracker; + } + + public void containerStatus(Container container) throws Exception { + Map> conts = new HashMap>(); + conts.put(container.getId().getAppId(), Arrays.asList(new Container[]{})); + nodeHeartbeat(conts, true); + } + + public NodeId registerNode() throws Exception { + String[] splits = nodeIdStr.split(":"); + nodeId = Records.newRecord(NodeId.class); + nodeId.setHost(splits[0]); + nodeId.setPort(Integer.parseInt(splits[1])); + RegisterNodeManagerRequest req = Records.newRecord( + RegisterNodeManagerRequest.class); + req.setNodeId(nodeId); + req.setHttpPort(2); + Resource resource = Records.newRecord(Resource.class); + resource.setMemory(memory); + req.setResource(resource); + resourceTracker.registerNodeManager(req); + return nodeId; + } + + public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception { + return nodeHeartbeat(new HashMap>(), b); + } + + public HeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy) throws Exception { + NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); + NodeStatus status = Records.newRecord(NodeStatus.class); + status.setNodeId(nodeId); + for (Map.Entry> entry : conts.entrySet()) { + status.setContainers(entry.getKey(), entry.getValue()); + } + NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class); + healthStatus.setHealthReport(""); + healthStatus.setIsNodeHealthy(isHealthy); + healthStatus.setLastHealthReportTime(1); + status.setNodeHealthStatus(healthStatus); + status.setResponseId(++responseId); + req.setNodeStatus(status); + return resourceTracker.nodeHeartbeat(req).getHeartbeatResponse(); + } + +} Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1153435&r1=1153434&r2=1153435&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Wed Aug 3 11:36:27 2011 @@ -1,50 +1,28 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeHealthStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; -import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; -import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.ams.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; +import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -52,10 +30,6 @@ import org.apache.log4j.Logger; public class MockRM extends ResourceManager { - private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - private Map responseIds = new HashMap(); - private Map AMResponseIds = new HashMap(); - public MockRM() { this(new Configuration()); } @@ -82,35 +56,18 @@ public class MockRM extends ResourceMana finalState, app.getState()); } - public void waitForState(ApplicationAttemptId attemptId, RMAppAttemptState finalState) - throws Exception { - RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId()); - RMAppAttempt attempt = app.getRMAppAttempt(attemptId); - int timeoutSecs = 0; - while (!finalState.equals(attempt.getAppAttemptState()) - && timeoutSecs++ < 20) { - System.out - .println("AppAttempt State is : " + attempt.getAppAttemptState() - + " Waiting for state : " + finalState); - Thread.sleep(500); - } - System.out.println("AppAttempt State is : " + attempt.getAppAttemptState()); - Assert.assertEquals("AppAttempt state is not correct (timedout)", - finalState, attempt.getAppAttemptState()); - } - //client public RMApp submitApp(int masterMemory) throws Exception { ClientRMProtocol client = getClientRMService(); - GetNewApplicationIdResponse resp = client.getNewApplicationId(recordFactory.newRecordInstance(GetNewApplicationIdRequest.class)); + GetNewApplicationIdResponse resp = client.getNewApplicationId(Records.newRecord(GetNewApplicationIdRequest.class)); ApplicationId appId = resp.getApplicationId(); - SubmitApplicationRequest req = recordFactory.newRecordInstance(SubmitApplicationRequest.class); - ApplicationSubmissionContext sub = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); + SubmitApplicationRequest req = Records.newRecord(SubmitApplicationRequest.class); + ApplicationSubmissionContext sub = Records.newRecord(ApplicationSubmissionContext.class); sub.setApplicationId(appId); sub.setApplicationName(""); sub.setUser(""); - Resource capability = recordFactory.newRecordInstance(Resource.class); + Resource capability = Records.newRecord(Resource.class); capability.setMemory(masterMemory); sub.setMasterCapability(capability); req.setApplicationSubmissionContext(sub); @@ -120,152 +77,36 @@ public class MockRM extends ResourceMana return getRMContext().getRMApps().get(appId); } + public MockNM registerNode(String nodeIdStr, int memory) throws Exception { + MockNM nm = new MockNM(nodeIdStr, memory, getResourceTrackerService()); + nm.registerNode(); + return nm; + } + public void killApp(ApplicationId appId) throws Exception { ClientRMProtocol client = getClientRMService(); - FinishApplicationRequest req = recordFactory.newRecordInstance(FinishApplicationRequest.class); + FinishApplicationRequest req = Records.newRecord(FinishApplicationRequest.class); req.setApplicationId(appId); client.finishApplication(req); } //from AMLauncher - public void sendAMLaunched(ApplicationAttemptId appAttemptId) throws Exception { - waitForState(appAttemptId, RMAppAttemptState.ALLOCATED); + public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId) throws Exception { + MockAM am = new MockAM(getRMContext(), masterService, appAttemptId); + am.waitForState(RMAppAttemptState.ALLOCATED); getRMContext().getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCHED)); + return am; } + public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId) throws Exception { - waitForState(appAttemptId, RMAppAttemptState.ALLOCATED); + MockAM am = new MockAM(getRMContext(), masterService, appAttemptId); + am.waitForState(RMAppAttemptState.ALLOCATED); getRMContext().getDispatcher().getEventHandler().handle( new RMAppAttemptLaunchFailedEvent(appAttemptId, "Failed")); } - //from AMS - public void registerAppAttempt(ApplicationAttemptId attemptId) throws Exception { - waitForState(attemptId, RMAppAttemptState.LAUNCHED); - AMResponseIds.put(attemptId, 0); - RegisterApplicationMasterRequest req = recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class); - req.setApplicationAttemptId(attemptId); - req.setHost(""); - req.setRpcPort(1); - req.setTrackingUrl(""); - masterService.registerApplicationMaster(req); - } - - public List allocate(ApplicationAttemptId attemptId, - String host, int memory, int numContainers, - List releases) throws Exception { - List reqs = createReq(host, memory, 1, numContainers); - List toRelease = new ArrayList(); - for (ContainerId id : releases) { - Container cont = recordFactory.newRecordInstance(Container.class); - cont.setId(id); - //TOOD: set all fields - } - return allocate(attemptId, toRelease, reqs); - } - - private List createReq(String host, int memory, int priority, - int containers) throws Exception { - ResourceRequest hostReq = createResourceReq(host, memory, priority, - containers); - ResourceRequest rackReq = createResourceReq("default-rack", memory, - priority, containers); - ResourceRequest offRackReq = createResourceReq("*", memory, priority, - containers); - return Arrays.asList(new ResourceRequest[] {hostReq, rackReq, offRackReq}); - - } - private ResourceRequest createResourceReq(String resource, int memory, int priority, - int containers) throws Exception { - ResourceRequest req = recordFactory.newRecordInstance(ResourceRequest.class); - req.setHostName(resource); - req.setNumContainers(containers); - Priority pri = recordFactory.newRecordInstance(Priority.class); - pri.setPriority(1); - req.setPriority(pri); - Resource capability = recordFactory.newRecordInstance(Resource.class); - capability.setMemory(memory); - req.setCapability(capability); - return req; - } - - public List allocate(ApplicationAttemptId attemptId, - List releases, List resourceRequest) - throws Exception { - AllocateRequest req = recordFactory.newRecordInstance(AllocateRequest.class); - int responseId = AMResponseIds.remove(attemptId) + 1; - AMResponseIds.put(attemptId, responseId); - req.setResponseId(responseId); - req.setApplicationAttemptId(attemptId); - req.addAllAsks(resourceRequest); - req.addAllReleases(releases); - AllocateResponse resp = masterService.allocate(req); - return resp.getAMResponse().getContainerList(); - } - - public void unregisterAppAttempt(ApplicationAttemptId attemptId) throws Exception { - AMResponseIds.remove(attemptId); - waitForState(attemptId, RMAppAttemptState.RUNNING); - FinishApplicationMasterRequest req = recordFactory.newRecordInstance(FinishApplicationMasterRequest.class); - req.setAppAttemptId(attemptId); - req.setDiagnostics(""); - req.setFinalState(""); - req.setTrackingUrl(""); - masterService.finishApplicationMaster(req); - } - - //from Node - public void containerStatus(Container container, NodeId nodeId) throws Exception { - Map> conts = new HashMap>(); - conts.put(container.getId().getAppId(), Arrays.asList(new Container[]{})); - nodeHeartbeat(nodeId, conts, true); - } - - public void registerNode(String nodeIdStr, int memory) throws Exception { - String[] splits = nodeIdStr.split(":"); - NodeId nodeId = recordFactory.newRecordInstance(NodeId.class); - nodeId.setHost(splits[0]); - nodeId.setPort(Integer.parseInt(splits[1])); - RegisterNodeManagerRequest req = recordFactory - .newRecordInstance(RegisterNodeManagerRequest.class); - req.setNodeId(nodeId); - req.setHttpPort(2); - Resource resource = recordFactory.newRecordInstance(Resource.class); - resource.setMemory(memory); - req.setResource(resource); - getResourceTrackerService().registerNodeManager(req); - responseIds.put(nodeId, 0); - } - - public HeartbeatResponse nodeHeartbeat(String nodeIdStr, boolean b) throws Exception { - String[] splits = nodeIdStr.split(":"); - NodeId nodeId = recordFactory.newRecordInstance(NodeId.class); - nodeId.setHost(splits[0]); - nodeId.setPort(Integer.parseInt(splits[1])); - return nodeHeartbeat(nodeId, new HashMap>(), b); - } - - public HeartbeatResponse nodeHeartbeat(NodeId nodeId, Map> conts, boolean isHealthy) throws Exception { - NodeHeartbeatRequest req = recordFactory.newRecordInstance(NodeHeartbeatRequest.class); - NodeStatus status = recordFactory.newRecordInstance(NodeStatus.class); - status.setNodeId(nodeId); - for (Map.Entry> entry : conts.entrySet()) { - status.setContainers(entry.getKey(), entry.getValue()); - } - NodeHealthStatus healthStatus = recordFactory.newRecordInstance(NodeHealthStatus.class); - healthStatus.setHealthReport(""); - healthStatus.setIsNodeHealthy(isHealthy); - healthStatus.setLastHealthReportTime(1); - status.setNodeHealthStatus(healthStatus); - int responseId = responseIds.remove(nodeId) + 1; - responseIds.put(nodeId, responseId); - status.setResponseId(responseId); - req.setNodeStatus(status); - return getResourceTrackerService().nodeHeartbeat(req).getHeartbeatResponse(); - } - @Override protected ClientRMService createClientRMService() { return new ClientRMService(getRMContext(), amLivelinessMonitor, Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1153435&r1=1153434&r2=1153435&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Wed Aug 3 11:36:27 2011 @@ -7,7 +7,6 @@ import junit.framework.Assert; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -26,29 +25,29 @@ public class TestApplicationCleanup { rootLogger.setLevel(Level.DEBUG); MockRM rm = new MockRM(); rm.start(); - NodeId node1 = rm.registerNode("h1", 5000); + MockNM nm1 = rm.registerNode("h1:1234", 5000); RMApp app = rm.submitApp(2000); //kick the scheduling - rm.nodeHeartbeat(node1, true); + nm1.nodeHeartbeat(true); RMAppAttempt attempt = app.getCurrentAppAttempt(); - rm.sendAMLaunched(attempt.getAppAttemptId()); - rm.registerAppAttempt(attempt.getAppAttemptId()); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); //request for containers int request = 2; - rm.allocate(attempt.getAppAttemptId(), "h1" , 1000, request, + am.allocate("h1" , 1000, request, new ArrayList()); //kick the scheduler - rm.nodeHeartbeat(node1, true); - List conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList(), + nm1.nodeHeartbeat(true); + List conts = am.allocate(new ArrayList(), new ArrayList()); int contReceived = conts.size(); while (contReceived < request) { - conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList(), + conts = am.allocate(new ArrayList(), new ArrayList()); contReceived += conts.size(); Log.info("Got " + contReceived + " containers. Waiting to get " + request); @@ -56,14 +55,14 @@ public class TestApplicationCleanup { } Assert.assertEquals(request, conts.size()); - rm.unregisterAppAttempt(attempt.getAppAttemptId()); - rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED); + am.unregisterAppAttempt(); + am.waitForState(RMAppAttemptState.FINISHED); - int size = rm.nodeHeartbeat(node1, true).getApplicationsToCleanupList().size(); + int size = nm1.nodeHeartbeat(true).getApplicationsToCleanupList().size(); while(size < 1) { Thread.sleep(1000); Log.info("Waiting to get application cleanup.."); - size = rm.nodeHeartbeat(node1, true).getApplicationsToCleanupList().size(); + size = nm1.nodeHeartbeat(true).getApplicationsToCleanupList().size(); } Assert.assertEquals(1, size); Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1153435&r1=1153434&r2=1153435&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Wed Aug 3 11:36:27 2011 @@ -9,7 +9,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -29,18 +28,18 @@ public class TestRM { rootLogger.setLevel(Level.DEBUG); MockRM rm = new MockRM(); rm.start(); - rm.registerNode("h1:1234", 5000); + MockNM nm1 = rm.registerNode("h1:1234", 5000); RMApp app = rm.submitApp(2000); //kick the scheduling - rm.nodeHeartbeat("h1:1234", true); + nm1.nodeHeartbeat(true); RMAppAttempt attempt = app.getCurrentAppAttempt(); - rm.sendAMLaunched(attempt.getAppAttemptId()); - rm.registerAppAttempt(attempt.getAppAttemptId()); - rm.unregisterAppAttempt(attempt.getAppAttemptId()); - rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + am.unregisterAppAttempt(); + am.waitForState(RMAppAttemptState.FINISHED); rm.stop(); } @@ -50,30 +49,29 @@ public class TestRM { rootLogger.setLevel(Level.DEBUG); MockRM rm = new MockRM(); rm.start(); - rm.registerNode("h1:1234", 5000); - rm.registerNode("h2:5678", 10000); + MockNM nm1 = rm.registerNode("h1:1234", 5000); + MockNM nm2 = rm.registerNode("h2:5678", 10000); RMApp app = rm.submitApp(2000); //kick the scheduling - rm.nodeHeartbeat("h1:1234", true); + nm1.nodeHeartbeat(true); RMAppAttempt attempt = app.getCurrentAppAttempt(); - rm.sendAMLaunched(attempt.getAppAttemptId()); - rm.registerAppAttempt(attempt.getAppAttemptId()); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); //request for containers int request = 13; - rm.allocate(attempt.getAppAttemptId(), "h1" , 1000, request, - new ArrayList()); + am.allocate("h1" , 1000, request, new ArrayList()); //kick the scheduler - rm.nodeHeartbeat("h1:1234", true); - List conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList(), + nm1.nodeHeartbeat(true); + List conts = am.allocate(new ArrayList(), new ArrayList()); int contReceived = conts.size(); while (contReceived < 3) {//only 3 containers are available on node1 - conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList(), + conts = am.allocate(new ArrayList(), new ArrayList()); contReceived += conts.size(); LOG.info("Got " + contReceived + " containers. Waiting to get " + 3); @@ -82,12 +80,12 @@ public class TestRM { Assert.assertEquals(3, conts.size()); //send node2 heartbeat - rm.nodeHeartbeat("h2:5678", true); - conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList(), + nm2.nodeHeartbeat(true); + conts = am.allocate(new ArrayList(), new ArrayList()); contReceived = conts.size(); while (contReceived < 10) { - conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList(), + conts = am.allocate(new ArrayList(), new ArrayList()); contReceived += conts.size(); LOG.info("Got " + contReceived + " containers. Waiting to get " + 10); @@ -95,8 +93,8 @@ public class TestRM { } Assert.assertEquals(10, conts.size()); - rm.unregisterAppAttempt(attempt.getAppAttemptId()); - rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED); + am.unregisterAppAttempt(); + am.waitForState(RMAppAttemptState.FINISHED); rm.stop(); }