hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1153435 - in /hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager: MockAM.java MockNM.java MockRM.java TestApplicationCleanup.java TestRM.java
Date Wed, 03 Aug 2011 11:36:28 GMT
Author: vinodkv
Date: Wed Aug  3 11:36:27 2011
New Revision: 1153435

URL: http://svn.apache.org/viewvc?rev=1153435&view=rev
Log:
Introduced MockAM and MockNM in RM for better testability.

Added:
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
Modified:
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1153435&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
(added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
Wed Aug  3 11:36:27 2011
@@ -0,0 +1,125 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.util.Records;
+
+public class MockAM {
+
+  private volatile int responseId = 0;
+  private final ApplicationAttemptId attemptId;
+  private final RMContext context;
+  private final AMRMProtocol amRMProtocol;
+
+  MockAM(RMContext context, AMRMProtocol amRMProtocol, 
+      ApplicationAttemptId attemptId) {
+    this.context = context;
+    this.amRMProtocol = amRMProtocol;
+    this.attemptId = attemptId;
+  }
+
+  public void waitForState(RMAppAttemptState finalState) throws Exception {
+    RMApp app = context.getRMApps().get(attemptId.getApplicationId());
+    RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
+    int timeoutSecs = 0;
+    while (!finalState.equals(attempt.getAppAttemptState())
+        && timeoutSecs++ < 20) {
+      System.out
+          .println("AppAttempt State is : " + attempt.getAppAttemptState()
+              + " Waiting for state : " + finalState);
+      Thread.sleep(500);
+    }
+    System.out.println("AppAttempt State is : " + attempt.getAppAttemptState());
+    Assert.assertEquals("AppAttempt state is not correct (timedout)",
+        finalState, attempt.getAppAttemptState());
+  }
+
+  public void registerAppAttempt() throws Exception {
+    waitForState(RMAppAttemptState.LAUNCHED);
+    responseId = 0;
+    RegisterApplicationMasterRequest req = Records.newRecord(RegisterApplicationMasterRequest.class);
+    req.setApplicationAttemptId(attemptId);
+    req.setHost("");
+    req.setRpcPort(1);
+    req.setTrackingUrl("");
+    amRMProtocol.registerApplicationMaster(req);
+  }
+
+  public List<Container> allocate( 
+      String host, int memory, int numContainers, 
+      List<ContainerId> releases) throws Exception {
+    List reqs = createReq(host, memory, 1, numContainers);
+    List<Container> toRelease = new ArrayList<Container>();
+    for (ContainerId id : releases) {
+      Container cont = Records.newRecord(Container.class);
+      cont.setId(id);
+      //TOOD: set all fields
+    }
+    return allocate(toRelease, reqs);
+  }
+
+  private List<ResourceRequest> createReq(String host, int memory, int priority, 
+      int containers) throws Exception {
+    ResourceRequest hostReq = createResourceReq(host, memory, priority, 
+        containers);
+    ResourceRequest rackReq = createResourceReq("default-rack", memory, 
+        priority, containers);
+    ResourceRequest offRackReq = createResourceReq("*", memory, priority, 
+        containers);
+    return Arrays.asList(new ResourceRequest[] {hostReq, rackReq, offRackReq});
+    
+  }
+  private ResourceRequest createResourceReq(String resource, int memory, int priority, 
+      int containers) throws Exception {
+    ResourceRequest req = Records.newRecord(ResourceRequest.class);
+    req.setHostName(resource);
+    req.setNumContainers(containers);
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(1);
+    req.setPriority(pri);
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(memory);
+    req.setCapability(capability);
+    return req;
+  }
+
+  public List<Container> allocate(
+      List<Container> releases, List<ResourceRequest> resourceRequest) 
+      throws Exception {
+    AllocateRequest req = Records.newRecord(AllocateRequest.class);
+    req.setResponseId(++responseId);
+    req.setApplicationAttemptId(attemptId);
+    req.addAllAsks(resourceRequest);
+    req.addAllReleases(releases);
+    AllocateResponse resp = amRMProtocol.allocate(req);
+    return resp.getAMResponse().getContainerList();
+  }
+
+  public void unregisterAppAttempt() throws Exception {
+    waitForState(RMAppAttemptState.RUNNING);
+    FinishApplicationMasterRequest req = Records.newRecord(FinishApplicationMasterRequest.class);
+    req.setAppAttemptId(attemptId);
+    req.setDiagnostics("");
+    req.setFinalState("");
+    req.setTrackingUrl("");
+    amRMProtocol.finishApplicationMaster(req);
+  }
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1153435&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
(added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
Wed Aug  3 11:36:27 2011
@@ -0,0 +1,77 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.util.Records;
+
+public class MockNM {
+
+  private int responseId;
+  private NodeId nodeId;
+  private final String nodeIdStr;
+  private final int memory;
+  private final ResourceTrackerService resourceTracker;
+
+  MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
+    this.nodeIdStr = nodeIdStr;
+    this.memory = memory;
+    this.resourceTracker = resourceTracker;
+  }
+
+  public void containerStatus(Container container) throws Exception {
+    Map<ApplicationId, List<Container>> conts = new HashMap<ApplicationId,
List<Container>>();
+    conts.put(container.getId().getAppId(), Arrays.asList(new Container[]{}));
+    nodeHeartbeat(conts, true);
+  }
+
+  public NodeId registerNode() throws Exception {
+    String[] splits = nodeIdStr.split(":");
+    nodeId = Records.newRecord(NodeId.class);
+    nodeId.setHost(splits[0]);
+    nodeId.setPort(Integer.parseInt(splits[1]));
+    RegisterNodeManagerRequest req = Records.newRecord(
+        RegisterNodeManagerRequest.class);
+    req.setNodeId(nodeId);
+    req.setHttpPort(2);
+    Resource resource = Records.newRecord(Resource.class);
+    resource.setMemory(memory);
+    req.setResource(resource);
+    resourceTracker.registerNodeManager(req);
+    return nodeId;
+  }
+
+  public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
+    return nodeHeartbeat(new HashMap<ApplicationId, List<Container>>(), b);
+  }
+
+  public HeartbeatResponse nodeHeartbeat(Map<ApplicationId, 
+      List<Container>> conts, boolean isHealthy) throws Exception {
+    NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
+    NodeStatus status = Records.newRecord(NodeStatus.class);
+    status.setNodeId(nodeId);
+    for (Map.Entry<ApplicationId, List<Container>> entry : conts.entrySet())
{
+      status.setContainers(entry.getKey(), entry.getValue());
+    }
+    NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
+    healthStatus.setHealthReport("");
+    healthStatus.setIsNodeHealthy(isHealthy);
+    healthStatus.setLastHealthReportTime(1);
+    status.setNodeHealthStatus(healthStatus);
+    status.setResponseId(++responseId);
+    req.setNodeStatus(status);
+    return resourceTracker.nodeHeartbeat(req).getHeartbeatResponse();
+  }
+
+}

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1153435&r1=1153434&r2=1153435&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
Wed Aug  3 11:36:27 2011
@@ -1,50 +1,28 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.ams.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -52,10 +30,6 @@ import org.apache.log4j.Logger;
 
 public class MockRM extends ResourceManager {
 
-  private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-  private Map<NodeId, Integer> responseIds = new HashMap<NodeId, Integer>();
-  private Map<ApplicationAttemptId, Integer> AMResponseIds = new HashMap<ApplicationAttemptId,
Integer>();
-
   public MockRM() {
     this(new Configuration());
   }
@@ -82,35 +56,18 @@ public class MockRM extends ResourceMana
         finalState, app.getState());
   }
 
-  public void waitForState(ApplicationAttemptId attemptId, RMAppAttemptState finalState)
-      throws Exception {
-    RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
-    RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
-    int timeoutSecs = 0;
-    while (!finalState.equals(attempt.getAppAttemptState())
-        && timeoutSecs++ < 20) {
-      System.out
-          .println("AppAttempt State is : " + attempt.getAppAttemptState()
-              + " Waiting for state : " + finalState);
-      Thread.sleep(500);
-    }
-    System.out.println("AppAttempt State is : " + attempt.getAppAttemptState());
-    Assert.assertEquals("AppAttempt state is not correct (timedout)",
-        finalState, attempt.getAppAttemptState());
-  }
-
   //client
   public RMApp submitApp(int masterMemory) throws Exception {
     ClientRMProtocol client = getClientRMService();
-    GetNewApplicationIdResponse resp = client.getNewApplicationId(recordFactory.newRecordInstance(GetNewApplicationIdRequest.class));
+    GetNewApplicationIdResponse resp = client.getNewApplicationId(Records.newRecord(GetNewApplicationIdRequest.class));
     ApplicationId appId = resp.getApplicationId();
     
-    SubmitApplicationRequest req = recordFactory.newRecordInstance(SubmitApplicationRequest.class);
-    ApplicationSubmissionContext sub = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+    SubmitApplicationRequest req = Records.newRecord(SubmitApplicationRequest.class);
+    ApplicationSubmissionContext sub = Records.newRecord(ApplicationSubmissionContext.class);
     sub.setApplicationId(appId);
     sub.setApplicationName("");
     sub.setUser("");
-    Resource capability = recordFactory.newRecordInstance(Resource.class);
+    Resource capability = Records.newRecord(Resource.class);
     capability.setMemory(masterMemory);
     sub.setMasterCapability(capability);
     req.setApplicationSubmissionContext(sub);
@@ -120,152 +77,36 @@ public class MockRM extends ResourceMana
     return getRMContext().getRMApps().get(appId);
   }
 
+  public MockNM registerNode(String nodeIdStr, int memory) throws Exception {
+    MockNM nm = new MockNM(nodeIdStr, memory, getResourceTrackerService());
+    nm.registerNode();
+    return nm;
+  }
+
   public void killApp(ApplicationId appId) throws Exception {
     ClientRMProtocol client = getClientRMService();
-    FinishApplicationRequest req = recordFactory.newRecordInstance(FinishApplicationRequest.class);
+    FinishApplicationRequest req = Records.newRecord(FinishApplicationRequest.class);
     req.setApplicationId(appId);
     client.finishApplication(req);
   }
 
   //from AMLauncher
-  public void sendAMLaunched(ApplicationAttemptId appAttemptId) throws Exception {
-    waitForState(appAttemptId, RMAppAttemptState.ALLOCATED);
+  public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId) throws Exception {
+    MockAM am = new MockAM(getRMContext(), masterService, appAttemptId);
+    am.waitForState(RMAppAttemptState.ALLOCATED);
     getRMContext().getDispatcher().getEventHandler().handle(
         new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCHED));
+    return am;
   }
 
+  
   public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId) throws Exception {
-    waitForState(appAttemptId, RMAppAttemptState.ALLOCATED);
+    MockAM am = new MockAM(getRMContext(), masterService, appAttemptId);
+    am.waitForState(RMAppAttemptState.ALLOCATED);
     getRMContext().getDispatcher().getEventHandler().handle(
         new RMAppAttemptLaunchFailedEvent(appAttemptId, "Failed"));
   }
 
-  //from AMS
-  public void registerAppAttempt(ApplicationAttemptId attemptId) throws Exception {
-    waitForState(attemptId, RMAppAttemptState.LAUNCHED);
-    AMResponseIds.put(attemptId, 0);
-    RegisterApplicationMasterRequest req = recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
-    req.setApplicationAttemptId(attemptId);
-    req.setHost("");
-    req.setRpcPort(1);
-    req.setTrackingUrl("");
-    masterService.registerApplicationMaster(req);
-  }
-
-  public List<Container> allocate(ApplicationAttemptId attemptId, 
-      String host, int memory, int numContainers, 
-      List<ContainerId> releases) throws Exception {
-    List reqs = createReq(host, memory, 1, numContainers);
-    List<Container> toRelease = new ArrayList<Container>();
-    for (ContainerId id : releases) {
-      Container cont = recordFactory.newRecordInstance(Container.class);
-      cont.setId(id);
-      //TOOD: set all fields
-    }
-    return allocate(attemptId, toRelease, reqs);
-  }
-
-  private List<ResourceRequest> createReq(String host, int memory, int priority, 
-      int containers) throws Exception {
-    ResourceRequest hostReq = createResourceReq(host, memory, priority, 
-        containers);
-    ResourceRequest rackReq = createResourceReq("default-rack", memory, 
-        priority, containers);
-    ResourceRequest offRackReq = createResourceReq("*", memory, priority, 
-        containers);
-    return Arrays.asList(new ResourceRequest[] {hostReq, rackReq, offRackReq});
-    
-  }
-  private ResourceRequest createResourceReq(String resource, int memory, int priority, 
-      int containers) throws Exception {
-    ResourceRequest req = recordFactory.newRecordInstance(ResourceRequest.class);
-    req.setHostName(resource);
-    req.setNumContainers(containers);
-    Priority pri = recordFactory.newRecordInstance(Priority.class);
-    pri.setPriority(1);
-    req.setPriority(pri);
-    Resource capability = recordFactory.newRecordInstance(Resource.class);
-    capability.setMemory(memory);
-    req.setCapability(capability);
-    return req;
-  }
-
-  public List<Container> allocate(ApplicationAttemptId attemptId, 
-      List<Container> releases, List<ResourceRequest> resourceRequest) 
-      throws Exception {
-    AllocateRequest req = recordFactory.newRecordInstance(AllocateRequest.class);
-    int responseId = AMResponseIds.remove(attemptId) + 1;
-    AMResponseIds.put(attemptId, responseId);
-    req.setResponseId(responseId);
-    req.setApplicationAttemptId(attemptId);
-    req.addAllAsks(resourceRequest);
-    req.addAllReleases(releases);
-    AllocateResponse resp = masterService.allocate(req);
-    return resp.getAMResponse().getContainerList();
-  }
-
-  public void unregisterAppAttempt(ApplicationAttemptId attemptId) throws Exception {
-    AMResponseIds.remove(attemptId);
-    waitForState(attemptId, RMAppAttemptState.RUNNING);
-    FinishApplicationMasterRequest req = recordFactory.newRecordInstance(FinishApplicationMasterRequest.class);
-    req.setAppAttemptId(attemptId);
-    req.setDiagnostics("");
-    req.setFinalState("");
-    req.setTrackingUrl("");
-    masterService.finishApplicationMaster(req);
-  }
-
-  //from Node
-  public void containerStatus(Container container, NodeId nodeId) throws Exception {
-    Map<ApplicationId, List<Container>> conts = new HashMap<ApplicationId,
List<Container>>();
-    conts.put(container.getId().getAppId(), Arrays.asList(new Container[]{}));
-    nodeHeartbeat(nodeId, conts, true);
-  }
-
-  public void registerNode(String nodeIdStr, int memory) throws Exception {
-    String[] splits = nodeIdStr.split(":");
-    NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
-    nodeId.setHost(splits[0]);
-    nodeId.setPort(Integer.parseInt(splits[1]));
-    RegisterNodeManagerRequest req = recordFactory
-        .newRecordInstance(RegisterNodeManagerRequest.class);
-    req.setNodeId(nodeId);
-    req.setHttpPort(2);
-    Resource resource = recordFactory.newRecordInstance(Resource.class);
-    resource.setMemory(memory);
-    req.setResource(resource);
-    getResourceTrackerService().registerNodeManager(req);
-    responseIds.put(nodeId, 0);
-  }
-
-  public HeartbeatResponse nodeHeartbeat(String nodeIdStr, boolean b) throws Exception {
-    String[] splits = nodeIdStr.split(":");
-    NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
-    nodeId.setHost(splits[0]);
-    nodeId.setPort(Integer.parseInt(splits[1]));
-    return nodeHeartbeat(nodeId, new HashMap<ApplicationId, List<Container>>(),
b);
-  }
-
-  public HeartbeatResponse nodeHeartbeat(NodeId nodeId, Map<ApplicationId, 
-      List<Container>> conts, boolean isHealthy) throws Exception {
-    NodeHeartbeatRequest req = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
-    NodeStatus status = recordFactory.newRecordInstance(NodeStatus.class);
-    status.setNodeId(nodeId);
-    for (Map.Entry<ApplicationId, List<Container>> entry : conts.entrySet())
{
-      status.setContainers(entry.getKey(), entry.getValue());
-    }
-    NodeHealthStatus healthStatus = recordFactory.newRecordInstance(NodeHealthStatus.class);
-    healthStatus.setHealthReport("");
-    healthStatus.setIsNodeHealthy(isHealthy);
-    healthStatus.setLastHealthReportTime(1);
-    status.setNodeHealthStatus(healthStatus);
-    int responseId = responseIds.remove(nodeId) + 1;
-    responseIds.put(nodeId, responseId);
-    status.setResponseId(responseId);
-    req.setNodeStatus(status);
-    return getResourceTrackerService().nodeHeartbeat(req).getHeartbeatResponse();
-  }
-
   @Override
   protected ClientRMService createClientRMService() {
     return new ClientRMService(getRMContext(), amLivelinessMonitor,

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1153435&r1=1153434&r2=1153435&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
Wed Aug  3 11:36:27 2011
@@ -7,7 +7,6 @@ import junit.framework.Assert;
 
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -26,29 +25,29 @@ public class TestApplicationCleanup {
     rootLogger.setLevel(Level.DEBUG);
     MockRM rm = new MockRM();
     rm.start();
-    NodeId node1 = rm.registerNode("h1", 5000);
+    MockNM nm1 = rm.registerNode("h1:1234", 5000);
     
     RMApp app = rm.submitApp(2000);
 
     //kick the scheduling
-    rm.nodeHeartbeat(node1, true);
+    nm1.nodeHeartbeat(true);
 
     RMAppAttempt attempt = app.getCurrentAppAttempt();
-    rm.sendAMLaunched(attempt.getAppAttemptId());
-    rm.registerAppAttempt(attempt.getAppAttemptId());
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
     
     //request for containers
     int request = 2;
-    rm.allocate(attempt.getAppAttemptId(), "h1" , 1000, request, 
+    am.allocate("h1" , 1000, request, 
         new ArrayList<ContainerId>());
     
     //kick the scheduler
-    rm.nodeHeartbeat(node1, true);
-    List<Container> conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+    nm1.nodeHeartbeat(true);
+    List<Container> conts = am.allocate(new ArrayList<Container>(),
         new ArrayList<ResourceRequest>());
     int contReceived = conts.size();
     while (contReceived < request) {
-      conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+      conts = am.allocate(new ArrayList<Container>(),
           new ArrayList<ResourceRequest>());
       contReceived += conts.size();
       Log.info("Got " + contReceived + " containers. Waiting to get " + request);
@@ -56,14 +55,14 @@ public class TestApplicationCleanup {
     }
     Assert.assertEquals(request, conts.size());
     
-    rm.unregisterAppAttempt(attempt.getAppAttemptId());
-    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED);
+    am.unregisterAppAttempt();
+    am.waitForState(RMAppAttemptState.FINISHED);
 
-    int size = rm.nodeHeartbeat(node1, true).getApplicationsToCleanupList().size();
+    int size = nm1.nodeHeartbeat(true).getApplicationsToCleanupList().size();
     while(size < 1) {
       Thread.sleep(1000);
       Log.info("Waiting to get application cleanup..");
-      size = rm.nodeHeartbeat(node1, true).getApplicationsToCleanupList().size();
+      size = nm1.nodeHeartbeat(true).getApplicationsToCleanupList().size();
     }
     Assert.assertEquals(1, size);
 

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1153435&r1=1153434&r2=1153435&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
Wed Aug  3 11:36:27 2011
@@ -9,7 +9,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -29,18 +28,18 @@ public class TestRM {
     rootLogger.setLevel(Level.DEBUG);
     MockRM rm = new MockRM();
     rm.start();
-    rm.registerNode("h1:1234", 5000);
+    MockNM nm1 = rm.registerNode("h1:1234", 5000);
     
     RMApp app = rm.submitApp(2000);
 
     //kick the scheduling
-    rm.nodeHeartbeat("h1:1234", true);
+    nm1.nodeHeartbeat(true);
 
     RMAppAttempt attempt = app.getCurrentAppAttempt();
-    rm.sendAMLaunched(attempt.getAppAttemptId());
-    rm.registerAppAttempt(attempt.getAppAttemptId());
-    rm.unregisterAppAttempt(attempt.getAppAttemptId());
-    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED);
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
+    am.unregisterAppAttempt();
+    am.waitForState(RMAppAttemptState.FINISHED);
     rm.stop();
   }
 
@@ -50,30 +49,29 @@ public class TestRM {
     rootLogger.setLevel(Level.DEBUG);
     MockRM rm = new MockRM();
     rm.start();
-    rm.registerNode("h1:1234", 5000);
-    rm.registerNode("h2:5678", 10000);
+    MockNM nm1 = rm.registerNode("h1:1234", 5000);
+    MockNM nm2 = rm.registerNode("h2:5678", 10000);
     
     RMApp app = rm.submitApp(2000);
 
     //kick the scheduling
-    rm.nodeHeartbeat("h1:1234", true);
+    nm1.nodeHeartbeat(true);
 
     RMAppAttempt attempt = app.getCurrentAppAttempt();
-    rm.sendAMLaunched(attempt.getAppAttemptId());
-    rm.registerAppAttempt(attempt.getAppAttemptId());
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
     
     //request for containers
     int request = 13;
-    rm.allocate(attempt.getAppAttemptId(), "h1" , 1000, request, 
-        new ArrayList<ContainerId>());
+    am.allocate("h1" , 1000, request, new ArrayList<ContainerId>());
     
     //kick the scheduler
-    rm.nodeHeartbeat("h1:1234", true);
-    List<Container> conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+    nm1.nodeHeartbeat(true);
+    List<Container> conts = am.allocate(new ArrayList<Container>(),
         new ArrayList<ResourceRequest>());
     int contReceived = conts.size();
     while (contReceived < 3) {//only 3 containers are available on node1
-      conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+      conts = am.allocate(new ArrayList<Container>(),
           new ArrayList<ResourceRequest>());
       contReceived += conts.size();
       LOG.info("Got " + contReceived + " containers. Waiting to get " + 3);
@@ -82,12 +80,12 @@ public class TestRM {
     Assert.assertEquals(3, conts.size());
 
     //send node2 heartbeat
-    rm.nodeHeartbeat("h2:5678", true);
-    conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+    nm2.nodeHeartbeat(true);
+    conts = am.allocate(new ArrayList<Container>(),
         new ArrayList<ResourceRequest>());
     contReceived = conts.size();
     while (contReceived < 10) {
-      conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+      conts = am.allocate(new ArrayList<Container>(),
           new ArrayList<ResourceRequest>());
       contReceived += conts.size();
       LOG.info("Got " + contReceived + " containers. Waiting to get " + 10);
@@ -95,8 +93,8 @@ public class TestRM {
     }
     Assert.assertEquals(10, conts.size());
 
-    rm.unregisterAppAttempt(attempt.getAppAttemptId());
-    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED);
+    am.unregisterAppAttempt();
+    am.waitForState(RMAppAttemptState.FINISHED);
 
     rm.stop();
   }



Mime
View raw message