hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1153434 - in /hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src: main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/ main/java/o...
Date Wed, 03 Aug 2011 11:36:07 GMT
Author: vinodkv
Date: Wed Aug  3 11:36:05 2011
New Revision: 1153434

URL: http://svn.apache.org/viewvc?rev=1153434&view=rev
Log:
More tests from Sharad ; multinode scheduling and application-cleanup.

Added:
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
Modified:
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java?rev=1153434&r1=1153433&r2=1153434&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java
Wed Aug  3 11:36:05 2011
@@ -190,16 +190,15 @@ AMRMProtocol, EventHandler<ApplicationMa
       throws YarnRemoteException {
 
     ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
-    ApplicationId applicationId = appAttemptId.getApplicationId();
 
     this.amLivelinessMonitor.receivedPing(appAttemptId);
 
     /* check if its in cache */
     AllocateResponse allocateResponse = recordFactory
         .newRecordInstance(AllocateResponse.class);
-    AMResponse lastResponse = responseMap.get(applicationId);
+    AMResponse lastResponse = responseMap.get(appAttemptId);
     if (lastResponse == null) {
-      LOG.error("Application doesnt exist in cache " + applicationId);
+      LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
       allocateResponse.setAMResponse(reboot);
       return allocateResponse;
     }
@@ -208,7 +207,7 @@ AMRMProtocol, EventHandler<ApplicationMa
       allocateResponse.setAMResponse(lastResponse);
       return allocateResponse;
     } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
-      LOG.error("Invalid responseid from application " + applicationId);
+      LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
       // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
       allocateResponse.setAMResponse(reboot);
       return allocateResponse;
@@ -237,7 +236,7 @@ AMRMProtocol, EventHandler<ApplicationMa
                 RMContainerEventType.RELEASED));
       }
 
-      RMApp app = this.rmContext.getRMApps().get(applicationId);
+      RMApp app = this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
       RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
 
       // Get the list of finished containers.

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1153434&r1=1153433&r2=1153434&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
Wed Aug  3 11:36:05 2011
@@ -1,8 +1,11 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
+import java.util.Set;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -37,4 +40,5 @@ public interface RMApp extends EventHand
 
   StringBuilder getDiagnostics();
 
+  Set<NodeId> getRanNodes();
 }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1153434&r1=1153433&r2=1153434&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
Wed Aug  3 11:36:05 2011
@@ -1,8 +1,10 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -16,6 +18,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -25,6 +28,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -96,7 +100,7 @@ public class RMAppImpl implements RMApp 
 
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
-        RMAppEventType.ATTEMPT_FINISHED)
+        RMAppEventType.ATTEMPT_FINISHED, new FinalTransition())
     .addTransition(RMAppState.RUNNING,
         EnumSet.of(RMAppState.RESTARTING, RMAppState.FAILED),
         RMAppEventType.ATTEMPT_FAILED,
@@ -317,6 +321,21 @@ public class RMAppImpl implements RMApp 
   }
 
   @Override
+  public Set<NodeId> getRanNodes() {
+    this.readLock.lock();
+
+    try {
+      Set<NodeId> ranNodes = new HashSet<NodeId>();
+      for (RMAppAttempt attempt : attempts.values()) {
+        ranNodes.addAll(attempt.getRanNodes());
+      }
+      return ranNodes;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
   public void handle(RMAppEvent event) {
 
     this.writeLock.lock();
@@ -381,8 +400,19 @@ public class RMAppImpl implements RMApp 
     };
   }
 
-  private static final class AttemptFailedTransition implements
-      MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
+  private static class FinalTransition extends RMAppTransition {
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      Set<NodeId> ranNodes = app.getRanNodes();
+      for (NodeId nodeId : ranNodes) {
+        app.dispatcher.getEventHandler().handle(
+            new RMNodeCleanAppEvent(nodeId, app.applicationId));
+      }
+    };
+  }
+
+  private static final class AttemptFailedTransition extends FinalTransition 
+    implements
+      MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState>  {
 
     private final RMAppState initialState;
 

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java?rev=1153434&r1=1153433&r2=1153434&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
Wed Aug  3 11:36:05 2011
@@ -1,10 +1,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.EventHandler;
 
 public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent>{
@@ -25,6 +27,8 @@ public interface RMAppAttempt extends Ev
 
   float getProgress();
 
+  Set<NodeId> getRanNodes();
+
   List<Container> pullJustFinishedContainers();
 
   List<Container> pullNewlyAllocatedContainers();

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1153434&r1=1153433&r2=1153434&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
Wed Aug  3 11:36:05 2011
@@ -4,8 +4,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -16,6 +18,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -79,8 +82,13 @@ public class RMAppAttemptImpl implements
   private Map<ContainerId, ContainerId> liveContainers
     = new HashMap<ContainerId, ContainerId>();
 
-  private List<Container> newlyAllocatedContainers;
-  private List<ContainerId> justFinishedContainers;
+  //nodes on while this attempt's containers ran
+  private final Set<NodeId> ranNodes = 
+    new HashSet<NodeId>();
+  private final List<Container> newlyAllocatedContainers = 
+    new ArrayList<Container>();
+  private final List<ContainerId> justFinishedContainers = 
+    new ArrayList<ContainerId>();
   private Container masterContainer;
 
   private float progress = 0;
@@ -317,7 +325,7 @@ public class RMAppAttemptImpl implements
     this.writeLock.lock();
 
     try {
-      return null;  // TODO: Should just be ContainerId 
+      return new ArrayList<Container>();  // TODO: Should just be ContainerId 
 //      List<Container> returnList = new ArrayList<Container>(
 //          this.justFinishedContainers.size());
 //      returnList.addAll(this.justFinishedContainers);
@@ -329,6 +337,11 @@ public class RMAppAttemptImpl implements
   }
 
   @Override
+  public Set<NodeId> getRanNodes() {
+    return ranNodes;
+  }
+
+  @Override
   public List<Container> pullNewlyAllocatedContainers() {
     this.writeLock.lock();
 
@@ -336,6 +349,10 @@ public class RMAppAttemptImpl implements
       List<Container> returnList = new ArrayList<Container>(
           this.newlyAllocatedContainers.size());
       returnList.addAll(this.newlyAllocatedContainers);
+      for (Container cont : newlyAllocatedContainers) {
+        ranNodes.add(cont.getNodeId());//add to the nodes set when these containers
+        //are pulled by AM
+      }
       this.newlyAllocatedContainers.clear();
       return returnList;
     } finally {

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1153434&r1=1153433&r2=1153434&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
Wed Aug  3 11:36:05 2011
@@ -373,11 +373,12 @@ public class AppSchedulingInfo {
     for (Container container : containers) {
 
       allocated.add(container);
-      try {
+      //TODO: fixme sharad
+     /* try {
         store.storeContainer(container);
       } catch (IOException ie) {
         // TODO fix this. we shouldnt ignore
-      }
+      }*/
       LOG.debug("allocate: applicationId=" + applicationId + " container="
           + container.getId() + " host="
           + container.getNodeId().toString());

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java?rev=1153434&r1=1153433&r2=1153434&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
Wed Aug  3 11:36:05 2011
@@ -8,7 +8,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -16,9 +15,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 public class SchedulerApp {
 
@@ -81,12 +78,6 @@ public class SchedulerApp {
     return this.appSchedulingInfo.getResource(priority);
   }
 
-  public void allocate(NodeType type, SchedulerNode node, Priority priority,
-      ResourceRequest request, List<Container> containers) {
-    this.appSchedulingInfo
-        .allocate(type, node, priority, request, containers);
-  }
-
   public boolean isPending() {
     return this.appSchedulingInfo.isPending();
   }
@@ -113,7 +104,8 @@ public class SchedulerApp {
     Resources.subtractFrom(currentConsumption, containerResource);
   }
 
-  synchronized public void allocate(List<Container> containers) {
+  synchronized public void allocate(NodeType type, SchedulerNode node,
+      Priority priority, ResourceRequest request, List<Container> containers) {
     // Update consumption and track allocations
     for (Container container : containers) {
       Resources.addTo(currentConsumption, container.getResource());
@@ -121,6 +113,7 @@ public class SchedulerApp {
           + " container=" + container.getId() + " host="
           + container.getNodeId().toString());
     }
+    appSchedulingInfo.allocate(type, node, priority, request, containers);
   }
 
   public Resource getCurrentConsumption() {
@@ -133,8 +126,8 @@ public class SchedulerApp {
         Map<String, ResourceRequest> requests = getResourceRequests(priority);
         if (requests != null) {
           LOG.debug("showRequests:" + " application=" + getApplicationId() + 
-              " available=" + getHeadroom() + 
-              " current=" + currentConsumption);
+              " headRoom=" + getHeadroom() + 
+              " currentConsumption=" + currentConsumption.getMemory());
           for (ResourceRequest request : requests.values()) {
             LOG.debug("showRequests:" + " application=" + getApplicationId()
                 + " request=" + request);

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1153434&r1=1153433&r2=1153434&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
Wed Aug  3 11:36:05 2011
@@ -494,7 +494,7 @@ public class FifoScheduler implements Re
         }
         containers.add(container);
       }
-      application.allocate(containers);
+      application.allocate(type, node, priority, request, containers);
       addAllocatedContainers(node, application.getApplicationAttemptId(),
           containers);
       Resources.addTo(usedResource,

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1153434&r1=1153433&r2=1153434&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
Wed Aug  3 11:36:05 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
@@ -52,6 +53,8 @@ import org.apache.log4j.Logger;
 public class MockRM extends ResourceManager {
 
   private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  private Map<NodeId, Integer> responseIds = new HashMap<NodeId, Integer>();
+  private Map<ApplicationAttemptId, Integer> AMResponseIds = new HashMap<ApplicationAttemptId,
Integer>();
 
   public MockRM() {
     this(new Configuration());
@@ -140,6 +143,7 @@ public class MockRM extends ResourceMana
   //from AMS
   public void registerAppAttempt(ApplicationAttemptId attemptId) throws Exception {
     waitForState(attemptId, RMAppAttemptState.LAUNCHED);
+    AMResponseIds.put(attemptId, 0);
     RegisterApplicationMasterRequest req = recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
     req.setApplicationAttemptId(attemptId);
     req.setHost("");
@@ -148,32 +152,51 @@ public class MockRM extends ResourceMana
     masterService.registerApplicationMaster(req);
   }
 
-  public List<Container> allocateFromAM(ApplicationAttemptId attemptId, 
+  public List<Container> allocate(ApplicationAttemptId attemptId, 
       String host, int memory, int numContainers, 
       List<ContainerId> releases) throws Exception {
+    List reqs = createReq(host, memory, 1, numContainers);
+    List<Container> toRelease = new ArrayList<Container>();
+    for (ContainerId id : releases) {
+      Container cont = recordFactory.newRecordInstance(Container.class);
+      cont.setId(id);
+      //TOOD: set all fields
+    }
+    return allocate(attemptId, toRelease, reqs);
+  }
+
+  private List<ResourceRequest> createReq(String host, int memory, int priority, 
+      int containers) throws Exception {
+    ResourceRequest hostReq = createResourceReq(host, memory, priority, 
+        containers);
+    ResourceRequest rackReq = createResourceReq("default-rack", memory, 
+        priority, containers);
+    ResourceRequest offRackReq = createResourceReq("*", memory, priority, 
+        containers);
+    return Arrays.asList(new ResourceRequest[] {hostReq, rackReq, offRackReq});
+    
+  }
+  private ResourceRequest createResourceReq(String resource, int memory, int priority, 
+      int containers) throws Exception {
     ResourceRequest req = recordFactory.newRecordInstance(ResourceRequest.class);
-    req.setHostName(host);
-    req.setNumContainers(numContainers);
+    req.setHostName(resource);
+    req.setNumContainers(containers);
     Priority pri = recordFactory.newRecordInstance(Priority.class);
     pri.setPriority(1);
     req.setPriority(pri);
     Resource capability = recordFactory.newRecordInstance(Resource.class);
     capability.setMemory(memory);
     req.setCapability(capability);
-    List<Container> toRelease = new ArrayList<Container>();
-    for (ContainerId id : releases) {
-      Container cont = recordFactory.newRecordInstance(Container.class);
-      cont.setId(id);
-      //TOOD: set all fields
-    }
-    return allocateFromAM(attemptId, toRelease, 
-        Arrays.asList(new ResourceRequest[] {req}));
+    return req;
   }
 
-  public List<Container> allocateFromAM(ApplicationAttemptId attemptId, 
+  public List<Container> allocate(ApplicationAttemptId attemptId, 
       List<Container> releases, List<ResourceRequest> resourceRequest) 
       throws Exception {
     AllocateRequest req = recordFactory.newRecordInstance(AllocateRequest.class);
+    int responseId = AMResponseIds.remove(attemptId) + 1;
+    AMResponseIds.put(attemptId, responseId);
+    req.setResponseId(responseId);
     req.setApplicationAttemptId(attemptId);
     req.addAllAsks(resourceRequest);
     req.addAllReleases(releases);
@@ -182,6 +205,7 @@ public class MockRM extends ResourceMana
   }
 
   public void unregisterAppAttempt(ApplicationAttemptId attemptId) throws Exception {
+    AMResponseIds.remove(attemptId);
     waitForState(attemptId, RMAppAttemptState.RUNNING);
     FinishApplicationMasterRequest req = recordFactory.newRecordInstance(FinishApplicationMasterRequest.class);
     req.setAppAttemptId(attemptId);
@@ -211,17 +235,18 @@ public class MockRM extends ResourceMana
     resource.setMemory(memory);
     req.setResource(resource);
     getResourceTrackerService().registerNodeManager(req);
+    responseIds.put(nodeId, 0);
   }
 
-  public void nodeHeartbeat(String nodeIdStr, boolean b) throws Exception {
+  public HeartbeatResponse nodeHeartbeat(String nodeIdStr, boolean b) throws Exception {
     String[] splits = nodeIdStr.split(":");
     NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
     nodeId.setHost(splits[0]);
     nodeId.setPort(Integer.parseInt(splits[1]));
-    nodeHeartbeat(nodeId, new HashMap<ApplicationId, List<Container>>(), b);
+    return nodeHeartbeat(nodeId, new HashMap<ApplicationId, List<Container>>(),
b);
   }
 
-  public void nodeHeartbeat(NodeId nodeId, Map<ApplicationId, 
+  public HeartbeatResponse nodeHeartbeat(NodeId nodeId, Map<ApplicationId, 
       List<Container>> conts, boolean isHealthy) throws Exception {
     NodeHeartbeatRequest req = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
     NodeStatus status = recordFactory.newRecordInstance(NodeStatus.class);
@@ -234,9 +259,11 @@ public class MockRM extends ResourceMana
     healthStatus.setIsNodeHealthy(isHealthy);
     healthStatus.setLastHealthReportTime(1);
     status.setNodeHealthStatus(healthStatus);
-    status.setResponseId(1);
+    int responseId = responseIds.remove(nodeId) + 1;
+    responseIds.put(nodeId, responseId);
+    status.setResponseId(responseId);
     req.setNodeStatus(status);
-    getResourceTrackerService().nodeHeartbeat(req);
+    return getResourceTrackerService().nodeHeartbeat(req).getHeartbeatResponse();
   }
 
   @Override

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1153434&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
(added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
Wed Aug  3 11:36:05 2011
@@ -0,0 +1,77 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+public class TestApplicationCleanup {
+
+  @Test
+  public void testAppCleanup() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    MockRM rm = new MockRM();
+    rm.start();
+    NodeId node1 = rm.registerNode("h1", 5000);
+    
+    RMApp app = rm.submitApp(2000);
+
+    //kick the scheduling
+    rm.nodeHeartbeat(node1, true);
+
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    rm.sendAMLaunched(attempt.getAppAttemptId());
+    rm.registerAppAttempt(attempt.getAppAttemptId());
+    
+    //request for containers
+    int request = 2;
+    rm.allocate(attempt.getAppAttemptId(), "h1" , 1000, request, 
+        new ArrayList<ContainerId>());
+    
+    //kick the scheduler
+    rm.nodeHeartbeat(node1, true);
+    List<Container> conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+        new ArrayList<ResourceRequest>());
+    int contReceived = conts.size();
+    while (contReceived < request) {
+      conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+          new ArrayList<ResourceRequest>());
+      contReceived += conts.size();
+      Log.info("Got " + contReceived + " containers. Waiting to get " + request);
+      Thread.sleep(2000);
+    }
+    Assert.assertEquals(request, conts.size());
+    
+    rm.unregisterAppAttempt(attempt.getAppAttemptId());
+    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED);
+
+    int size = rm.nodeHeartbeat(node1, true).getApplicationsToCleanupList().size();
+    while(size < 1) {
+      Thread.sleep(1000);
+      Log.info("Waiting to get application cleanup..");
+      size = rm.nodeHeartbeat(node1, true).getApplicationsToCleanupList().size();
+    }
+    Assert.assertEquals(1, size);
+
+    rm.stop();
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestApplicationCleanup t = new TestApplicationCleanup();
+    t.testAppCleanup();
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1153434&r1=1153433&r2=1153434&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
Wed Aug  3 11:36:05 2011
@@ -1,5 +1,16 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -10,8 +21,31 @@ import org.junit.Test;
 
 public class TestRM {
 
+  private static final Log LOG = LogFactory.getLog(TestRM.class);
+
   @Test
-  public void testApp() throws Exception {
+  public void testAppWithNoContainers() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    MockRM rm = new MockRM();
+    rm.start();
+    rm.registerNode("h1:1234", 5000);
+    
+    RMApp app = rm.submitApp(2000);
+
+    //kick the scheduling
+    rm.nodeHeartbeat("h1:1234", true);
+
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    rm.sendAMLaunched(attempt.getAppAttemptId());
+    rm.registerAppAttempt(attempt.getAppAttemptId());
+    rm.unregisterAppAttempt(attempt.getAppAttemptId());
+    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED);
+    rm.stop();
+  }
+
+  @Test
+  public void testAppOnMultiNode() throws Exception {
     Logger rootLogger = LogManager.getRootLogger();
     rootLogger.setLevel(Level.DEBUG);
     MockRM rm = new MockRM();
@@ -27,6 +61,40 @@ public class TestRM {
     RMAppAttempt attempt = app.getCurrentAppAttempt();
     rm.sendAMLaunched(attempt.getAppAttemptId());
     rm.registerAppAttempt(attempt.getAppAttemptId());
+    
+    //request for containers
+    int request = 13;
+    rm.allocate(attempt.getAppAttemptId(), "h1" , 1000, request, 
+        new ArrayList<ContainerId>());
+    
+    //kick the scheduler
+    rm.nodeHeartbeat("h1:1234", true);
+    List<Container> conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+        new ArrayList<ResourceRequest>());
+    int contReceived = conts.size();
+    while (contReceived < 3) {//only 3 containers are available on node1
+      conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+          new ArrayList<ResourceRequest>());
+      contReceived += conts.size();
+      LOG.info("Got " + contReceived + " containers. Waiting to get " + 3);
+      Thread.sleep(2000);
+    }
+    Assert.assertEquals(3, conts.size());
+
+    //send node2 heartbeat
+    rm.nodeHeartbeat("h2:5678", true);
+    conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+        new ArrayList<ResourceRequest>());
+    contReceived = conts.size();
+    while (contReceived < 10) {
+      conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+          new ArrayList<ResourceRequest>());
+      contReceived += conts.size();
+      LOG.info("Got " + contReceived + " containers. Waiting to get " + 10);
+      Thread.sleep(2000);
+    }
+    Assert.assertEquals(10, conts.size());
+
     rm.unregisterAppAttempt(attempt.getAppAttemptId());
     rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED);
 
@@ -35,6 +103,7 @@ public class TestRM {
 
   public static void main(String[] args) throws Exception {
     TestRM t = new TestRM();
-    t.testApp();
+    t.testAppWithNoContainers();
+    t.testAppOnMultiNode();
   }
 }



Mime
View raw message