hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1153446 - in /hadoop/common/branches/MR-279/mapreduce/yarn: yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ yarn-api/src/main/proto/ yarn-common/src/main/java/org/apache/hadoop/yarn/util/ yarn-server/yarn-server-nodemanager/src/...
Date Wed, 03 Aug 2011 11:48:14 GMT
Author: vinodkv
Date: Wed Aug  3 11:48:09 2011
New Revision: 1153446

URL: http://svn.apache.org/viewvc?rev=1153446&view=rev
Log:
Fixing Tests : compilation and execution.

Added:
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java
Removed:
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerFinishedSchedulerEvent.java
Modified:
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java Wed Aug  3 11:48:09 2011
@@ -1,5 +1,5 @@
 package org.apache.hadoop.yarn.api.records;
 
 public enum ContainerState {
-  INITIALIZING, RUNNING, COMPLETE
-}
+  NEW, RUNNING, COMPLETE
+}
\ No newline at end of file

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto Wed Aug  3 11:48:09 2011
@@ -31,7 +31,7 @@ message ResourceProto {
 }
 
 enum ContainerStateProto {
-  C_INITIALIZING = 1;
+  C_NEW = 1;
   C_RUNNING = 2;
   C_COMPLETE = 3;
 }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java Wed Aug  3 11:48:09 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -164,23 +165,24 @@ public class BuilderUtils {
 
   public static Container newContainer(RecordFactory recordFactory,
       ApplicationAttemptId appAttemptId, int containerId, NodeId nodeId,
-      String containerManagerAddress, String nodeHttpAddress,
-      Resource resource) {
+      String nodeHttpAddress, Resource resource) {
     ContainerId containerID =
         newContainerId(recordFactory, appAttemptId, containerId);
-    return newContainer(containerID, nodeId, containerManagerAddress,
-        nodeHttpAddress, resource);
+    return newContainer(containerID, nodeId, nodeHttpAddress, resource);
   }
 
-  public static Container newContainer(ContainerId containerId, NodeId nodeId,
-      String containerManagerAddress, String nodeHttpAddress,
-      Resource resource) {
+  public static Container newContainer(ContainerId containerId,
+      NodeId nodeId, String nodeHttpAddress, Resource resource) {
     Container container = recordFactory.newRecordInstance(Container.class);
     container.setId(containerId);
     container.setNodeId(nodeId);
     container.setNodeHttpAddress(nodeHttpAddress);
     container.setResource(resource);
-    container.setState(ContainerState.INITIALIZING);
+    container.setState(ContainerState.NEW);
+    ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
+    containerStatus.setContainerId(containerId);
+    containerStatus.setState(ContainerState.NEW);
+    container.setContainerStatus(containerStatus);
     return container;
   }
 

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Wed Aug  3 11:48:09 2011
@@ -251,7 +251,6 @@ public class ContainerImpl implements Co
     case LOCALIZING:
     case LOCALIZATION_FAILED:
     case LOCALIZED:
-      return org.apache.hadoop.yarn.api.records.ContainerState.INITIALIZING;
     case RUNNING:
     case EXITED_WITH_SUCCESS:
     case EXITED_WITH_FAILURE:

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Wed Aug  3 11:48:09 2011
@@ -18,6 +18,7 @@ public enum RMAppAttemptEventType {
   UNREGISTERED,
 
   // Source: Containers
+  CONTAINER_ACQUIRED,
   CONTAINER_ALLOCATED,
   CONTAINER_FINISHED,
 

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Aug  3 11:48:09 2011
@@ -3,10 +3,8 @@ package org.apache.hadoop.yarn.server.re
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -17,7 +15,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -31,15 +28,14 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
@@ -93,9 +89,6 @@ public class RMAppAttemptImpl implements
   private String finalState;
   private final StringBuilder diagnostics = new StringBuilder();
 
-  private static final CancelContainerTransition CANCEL_CONTAINER_TRANSITION
-      = new CancelContainerTransition();
-
   private static final StateMachineFactory<RMAppAttemptImpl,
                                            RMAppAttemptState,
                                            RMAppAttemptEventType,
@@ -130,6 +123,10 @@ public class RMAppAttemptImpl implements
           new BaseFinalTransition(RMAppAttemptState.KILLED))
 
        // Transitions from ALLOCATED State
+      .addTransition(RMAppAttemptState.ALLOCATED,
+          RMAppAttemptState.ALLOCATED,
+          RMAppAttemptEventType.CONTAINER_ACQUIRED,
+          new ContainerAcquiredTransition())       
       .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
           RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
       .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FAILED,
@@ -159,6 +156,10 @@ public class RMAppAttemptImpl implements
       .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
           RMAppAttemptEventType.CONTAINER_ALLOCATED)
       .addTransition(
+                RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
+                RMAppAttemptEventType.CONTAINER_ACQUIRED,
+                new ContainerAcquiredTransition())
+      .addTransition(
           RMAppAttemptState.RUNNING,
           EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FAILED),
           RMAppAttemptEventType.CONTAINER_FINISHED,
@@ -173,34 +174,29 @@ public class RMAppAttemptImpl implements
           new FinalTransition(RMAppAttemptState.KILLED))
 
       // Transitions from FAILED State
-      .addTransition(RMAppAttemptState.FAILED, RMAppAttemptState.FAILED,
-          RMAppAttemptEventType.CONTAINER_ALLOCATED,
-          CANCEL_CONTAINER_TRANSITION)
       .addTransition(
           RMAppAttemptState.FAILED,
           RMAppAttemptState.FAILED,
-          EnumSet.of(RMAppAttemptEventType.EXPIRE,
+          EnumSet.of(
+              RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.KILL,
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.STATUS_UPDATE,
+              RMAppAttemptEventType.CONTAINER_ALLOCATED,
               RMAppAttemptEventType.CONTAINER_FINISHED))
 
       // Transitions from FINISHED State
-      .addTransition(RMAppAttemptState.FINISHED, RMAppAttemptState.FINISHED,
-          RMAppAttemptEventType.CONTAINER_ALLOCATED,
-          CANCEL_CONTAINER_TRANSITION)
       .addTransition(
           RMAppAttemptState.FINISHED,
           RMAppAttemptState.FINISHED,
-          EnumSet.of(RMAppAttemptEventType.EXPIRE,
+          EnumSet.of(
+              RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.UNREGISTERED,
+              RMAppAttemptEventType.CONTAINER_ALLOCATED,
               RMAppAttemptEventType.CONTAINER_FINISHED,
               RMAppAttemptEventType.KILL))
 
       // Transitions from KILLED State
-      .addTransition(RMAppAttemptState.KILLED, RMAppAttemptState.KILLED,
-          RMAppAttemptEventType.CONTAINER_ALLOCATED,
-          CANCEL_CONTAINER_TRANSITION)
       .addTransition(
           RMAppAttemptState.KILLED,
           RMAppAttemptState.KILLED,
@@ -209,6 +205,7 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.LAUNCH_FAILED,
               RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.REGISTERED,
+              RMAppAttemptEventType.CONTAINER_ALLOCATED,
               RMAppAttemptEventType.CONTAINER_FINISHED,
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.KILL,
@@ -424,8 +421,11 @@ public class RMAppAttemptImpl implements
     }
   }
 
-  private static final List<Container> EMPTY_CONTAINER_LIST = 
+  private static final List<Container> EMPTY_CONTAINER_RELEASE_LIST = 
       new ArrayList<Container>();
+  private static final List<ResourceRequest> EMPTY_CONTAINER_REQUEST_LIST = 
+    new ArrayList<ResourceRequest>();
+
   private static final class ScheduleTransition extends BaseTransition {
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
@@ -444,7 +444,7 @@ public class RMAppAttemptImpl implements
           + appAttempt.applicationAttemptId + " required " + request);
 
       appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
-          Collections.singletonList(request), EMPTY_CONTAINER_LIST);
+          Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST);
     }
   }
 
@@ -453,14 +453,14 @@ public class RMAppAttemptImpl implements
     public void transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
 
+      // Acquire the AM container from the scheduler.
+      Allocation amContainerAllocation = appAttempt.scheduler.allocate(
+          appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
+          EMPTY_CONTAINER_RELEASE_LIST);
+
       // Set the masterContainer
-      RMAppAttemptContainerAllocatedEvent allocatedEvent
-        = (RMAppAttemptContainerAllocatedEvent) event;
-      appAttempt.masterContainer = allocatedEvent.getContainer();
-
-      // Make the AM container as acquired.
-      appAttempt.eventHandler.handle(new RMContainerEvent(allocatedEvent
-          .getContainer().getId(), RMContainerEventType.ACQUIRED));
+      appAttempt.masterContainer = amContainerAllocation.getContainers().get(
+          0);
 
       // Send event to launch the AM Container
       appAttempt.eventHandler.handle(new AMLauncherEvent(
@@ -665,6 +665,17 @@ public class RMAppAttemptImpl implements
     }
   }
 
+  private static final class ContainerAcquiredTransition extends
+      BaseTransition {
+    @Override
+    public void transition(RMAppAttemptImpl appAttempt,
+        RMAppAttemptEvent event) {
+      RMAppAttemptContainerAcquiredEvent acquiredEvent
+        = (RMAppAttemptContainerAcquiredEvent) event;
+      appAttempt.ranNodes.add(acquiredEvent.getContainer().getNodeId());
+    }
+  }
+
   private static final class ContainerFinishedTransition
       implements
       MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@@ -692,18 +703,4 @@ public class RMAppAttemptImpl implements
       return RMAppAttemptState.RUNNING;
     }
   }
-
-  private static final class CancelContainerTransition extends BaseTransition {
-    @Override
-    public void transition(RMAppAttemptImpl appAttempt,
-        RMAppAttemptEvent event) {
-      RMAppAttemptContainerAllocatedEvent containerAllocatedEvent
-          = (RMAppAttemptContainerAllocatedEvent) event;
-      // Kill this container.
-      appAttempt.eventHandler.handle(new RMContainerEvent(
-          containerAllocatedEvent.getContainer().getId(),
-          RMContainerEventType.KILL));
-    }
-  }
-
 }

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java?rev=1153446&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java Wed Aug  3 11:48:09 2011
@@ -0,0 +1,22 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+
+public class RMAppAttemptContainerAcquiredEvent extends RMAppAttemptEvent {
+
+  private final Container container;
+
+  public RMAppAttemptContainerAcquiredEvent(ApplicationAttemptId appAttemptId, 
+      Container container) {
+    super(appAttemptId, RMAppAttemptEventType.CONTAINER_ACQUIRED);
+    this.container = container;
+  }
+
+  public Container getContainer() {
+    return this.container;
+  }
+
+}

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java Wed Aug  3 11:48:09 2011
@@ -5,14 +5,14 @@ public enum RMContainerEventType {
   // Source: scheduler
   START,
 
-  // Source: App
+  // Source: SchedulerApp
   ACQUIRED,
   KILL, // Also from Node on NodeRemoval
 
   LAUNCHED,
   FINISHED,
 
-  // Source: ApplicationMasterService
+  // Source: ApplicationMasterService->Scheduler
   RELEASED,
 
   // Source: ContainerAllocationExpirer  

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Wed Aug  3 11:48:09 2011
@@ -12,6 +12,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
@@ -58,7 +59,7 @@ public class RMContainerImpl implements 
 
     // Transitions from RUNNING state
     .addTransition(RMContainerState.RUNNING, RMContainerState.COMPLETED,
-        RMContainerEventType.FINISHED, new FinishedTransition())
+        RMContainerEventType.FINISHED, new ContainerCompletedTransition())
     .addTransition(RMContainerState.RUNNING, RMContainerState.KILLED,
         RMContainerEventType.KILL, new KillTransition())
 
@@ -147,8 +148,8 @@ public class RMContainerImpl implements 
             " on container " + this.containerId);
       }
       if (oldState != getState()) {
-        LOG.info(nodeId + " Container Transitioned from " + oldState + " to "
-                 + getState());
+        LOG.info(event.getContainerId() + " Container Transitioned from "
+            + oldState + " to " + getState());
       }
     }
     
@@ -157,7 +158,7 @@ public class RMContainerImpl implements 
     }
   }
 
-  private static class RMContainerTransition implements
+  private static class BaseTransition implements
       SingleArcTransition<RMContainerImpl, RMContainerEvent> {
 
     @Override
@@ -167,7 +168,7 @@ public class RMContainerImpl implements 
   }
 
   private static final class ContainerStartedTransition extends
-      RMContainerTransition {
+      BaseTransition {
 
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
@@ -176,16 +177,20 @@ public class RMContainerImpl implements 
     }
 }
 
-  private static final class AcquiredTransition extends RMContainerTransition {
+  private static final class AcquiredTransition extends BaseTransition {
 
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
       // Register with containerAllocationExpirer.
       container.containerAllocationExpirer.register(container.getContainer());
+
+      // Tell the appAttempt
+      container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
+          container.getApplicationAttemptId(), container.getContainer()));
     }
   }
 
-  private static final class LaunchedTransition extends RMContainerTransition {
+  private static final class LaunchedTransition extends BaseTransition {
 
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
@@ -194,19 +199,11 @@ public class RMContainerImpl implements 
     }
   }
 
-  private static class FinishedTransition extends RMContainerTransition {
+  private static class FinishedTransition extends BaseTransition {
 
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
 
-      RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event;
-
-      // Update container-status for diagnostics. Today we completely
-      // replace it on finish. We may just need to update diagnostics.
-      // ^TODO
-      container.container.setContainerStatus(finishedEvent
-          .getRemoteContainerStatus());
-
       // Inform AppAttempt
       container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
           container.appAttemptId, container.container));
@@ -221,7 +218,7 @@ public class RMContainerImpl implements 
       // Unregister from containerAllocationExpirer.
       container.containerAllocationExpirer.unregister(container.getContainer());
 
-      // Inform AppAttempt, scheduler etc.
+      // Inform AppAttempt
       super.transition(container, event);
     }
   }
@@ -238,11 +235,27 @@ public class RMContainerImpl implements 
       container.eventHandler.handle(new RMNodeCleanContainerEvent(
           container.nodeId, container.containerId));
 
-      // Inform appAttempt and scheduler
+      // Inform appAttempt
       super.transition(container, event);
     }
   }
 
-  
+  private static final class ContainerCompletedTransition extends
+      FinishedTransition {
 
+    @Override
+    public void transition(RMContainerImpl container, RMContainerEvent event) {
+
+      RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event;
+
+      // Update container-status for diagnostics. Today we completely
+      // replace it on finish. We may just need to update diagnostics.
+      // ^TODO
+      container.container.setContainerStatus(finishedEvent
+          .getRemoteContainerStatus());
+
+      // Inform appAttempt
+      super.transition(container, event);
+    }
+  }
 }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Wed Aug  3 11:48:09 2011
@@ -20,7 +20,10 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -33,7 +36,9 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -42,6 +47,9 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -81,6 +89,11 @@ public class RMNodeImpl implements RMNod
   private final NodeHealthStatus nodeHealthStatus = recordFactory
       .newRecordInstance(NodeHealthStatus.class);
   
+  /* set of containers that have just launched */
+  private final Map<ContainerId, Container> justLaunchedContainers = 
+    new HashMap<ContainerId, Container>();
+  
+
   /* set of containers that need to be cleaned */
   private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
       new ContainerIdComparator());
@@ -338,9 +351,45 @@ public class RMNodeImpl implements RMNod
         return RMNodeState.UNHEALTHY;
       }
 
+      // Filter the map to only obtain just launched containers and finished
+      // containers.
+      Map<ApplicationId, List<Container>> remoteAppContainersMap = statusEvent
+          .getContainersCollection();
+      Map<ApplicationId, List<Container>> containersMapForScheduler = new HashMap<ApplicationId, List<Container>>(
+          remoteAppContainersMap.size());
+      for (Entry<ApplicationId, List<Container>> entrySet : remoteAppContainersMap
+          .entrySet()) {
+
+        ApplicationId appId = entrySet.getKey();
+        List<Container> remoteContainerList = entrySet.getValue();
+
+        if (!containersMapForScheduler.containsKey(appId)) {
+          containersMapForScheduler.put(appId, new ArrayList<Container>(
+              remoteContainerList.size()));
+        }
+        List<Container> entryForThisApp = containersMapForScheduler
+            .get(appId);
+
+        for (Container remoteContainer : remoteContainerList) {
+
+          // Process running containers
+          ContainerId containerId = remoteContainer.getId();
+          if (remoteContainer.getState() == ContainerState.RUNNING) {
+            if (!rmNode.justLaunchedContainers.containsKey(containerId)) {
+              // Just launched container. RM knows about it the first time.
+              rmNode.justLaunchedContainers.put(containerId, remoteContainer);
+              entryForThisApp.add(remoteContainer);
+            }
+          } else {
+            // A finished container
+            rmNode.justLaunchedContainers.remove(containerId);
+            entryForThisApp.add(remoteContainer);
+          }
+        }
+      }
+
       rmNode.context.getDispatcher().getEventHandler().handle(
-          new NodeUpdateSchedulerEvent(rmNode, statusEvent
-              .getContainersCollection()));
+          new NodeUpdateSchedulerEvent(rmNode, containersMapForScheduler));
 
       return RMNodeState.RUNNING;
     }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java Wed Aug  3 11:48:09 2011
@@ -41,8 +41,8 @@ public class SchedulerApp {
 
   private Map<ContainerId, RMContainer> liveContainers
   = new HashMap<ContainerId, RMContainer>();
-  private List<Container> newlyAllocatedContainers = 
-      new ArrayList<Container>();
+  private List<RMContainer> newlyAllocatedContainers = 
+      new ArrayList<RMContainer>();
 
   public SchedulerApp(AppSchedulingInfo application, Queue queue) {
     this.appSchedulingInfo = application;
@@ -78,10 +78,6 @@ public class SchedulerApp {
   public List<Container> getCurrentContainers() {
     return this.appSchedulingInfo.getCurrentContainers();
   }
-
-  public synchronized Collection<RMContainer> getLiveContainers() {
-    return liveContainers.values();
-  }
   
   public Collection<Priority> getPriorities() {
     return this.appSchedulingInfo.getPriorities();
@@ -107,18 +103,16 @@ public class SchedulerApp {
     return this.queue;
   }
 
+  public synchronized Collection<RMContainer> getLiveContainers() {
+    return this.liveContainers.values();
+  }
+
   public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
-    // Kill all 'live' containers
-    for (RMContainer container : getLiveContainers()) {
-      completedContainer(container.getContainer(), 
-          RMContainerEventType.KILL); 
-    }
-    
     // Cleanup all scheduling information
     this.appSchedulingInfo.stop(rmAppAttemptFinalState);
   }
 
-  synchronized public void launchContainer(ContainerId containerId) {
+  synchronized public void containerLaunchedOnNode(ContainerId containerId) {
     // Inform the container
     RMContainer rmContainer = 
         getRMContainer(containerId);
@@ -131,7 +125,7 @@ public class SchedulerApp {
       SchedulerApp application) {
   }
 
-  synchronized public void completedContainer(Container cont,
+  synchronized public void containerCompleted(Container cont,
       RMContainerEventType event) {
     ContainerId containerId = cont.getId();
     // Inform the container
@@ -143,7 +137,7 @@ public class SchedulerApp {
     } else {
       container.handle(new RMContainerEvent(containerId, event));
     }
-    LOG.info("Completed container: " + container + 
+    LOG.info("Completed container: " + container.getContainerId() + 
         " in state: " + container.getState());
     
     // Remove from the list of containers
@@ -174,7 +168,7 @@ public class SchedulerApp {
           + c.getNodeId().toString());
       
       // Add it to allContainers list.
-      newlyAllocatedContainers.add(c);
+      newlyAllocatedContainers.add(container);
       liveContainers.put(c.getId(), container);
     }
     
@@ -183,9 +177,15 @@ public class SchedulerApp {
   }
   
   synchronized public List<Container> pullNewlyAllocatedContainers() {
-    List<Container> allocatedContainers = newlyAllocatedContainers;
-    newlyAllocatedContainers = new ArrayList<Container>();
-    return allocatedContainers;
+    List<Container> returnContainerList = new ArrayList<Container>(
+        newlyAllocatedContainers.size());
+    for (RMContainer rmContainer : newlyAllocatedContainers) {
+      rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
+          RMContainerEventType.ACQUIRED));
+      returnContainerList.add(rmContainer.getContainer());
+    }
+    newlyAllocatedContainers.clear();
+    return returnContainerList;
   }
 
   public Resource getCurrentConsumption() {

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Aug  3 11:48:09 2011
@@ -859,7 +859,7 @@ public class LeafQueue implements Queue 
          BuilderUtils.newContainer(this.recordFactory,
                     application.getApplicationAttemptId(),
                     application.getNewContainerId(),
-                    node.getNodeID(), node.getNodeAddress(),
+                    node.getNodeID(),
                     node.getHttpAddress(), capability);
       
       // If security is enabled, send the container-tokens too.

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Aug  3 11:48:09 2011
@@ -224,28 +224,30 @@ public class FifoScheduler implements Re
 
     // Release containers
     for (Container releasedContainer : release) {
-      completedContainer(releasedContainer, RMContainerEventType.RELEASED);
+      containerCompleted(releasedContainer, RMContainerEventType.RELEASED);
     }
 
     synchronized (application) {
 
-      LOG.debug("allocate: pre-update" +
-          " applicationId=" + applicationAttemptId + 
-          " application=" + application);
-      application.showRequests();
-
-      // Update application requests
-      application.updateResourceRequests(ask);
-
-      LOG.debug("allocate: post-update" +
-          " applicationId=" + applicationAttemptId + 
-          " application=" + application);
-      application.showRequests();
+      if (!ask.isEmpty()) {
+        LOG.debug("allocate: pre-update" +
+            " applicationId=" + applicationAttemptId + 
+            " application=" + application);
+        application.showRequests();
+  
+        // Update application requests
+        application.updateResourceRequests(ask);
+  
+        LOG.debug("allocate: post-update" +
+            " applicationId=" + applicationAttemptId + 
+            " application=" + application);
+        application.showRequests();
+  
+        LOG.debug("allocate:" +
+            " applicationId=" + applicationAttemptId + 
+            " #ask=" + ask.size());
+      }
 
-      LOG.debug("allocate:" +
-          " applicationId=" + applicationAttemptId + 
-          " #ask=" + ask.size());
-      
       return new Allocation(
           application.pullNewlyAllocatedContainers(), 
           application.getHeadroom());
@@ -300,6 +302,11 @@ public class FifoScheduler implements Re
       " has completed!");
     }
 
+    // Kill all 'live' containers
+    for (RMContainer container : application.getLiveContainers()) {
+      containerCompleted(container.getContainer(), RMContainerEventType.KILL);
+    }
+
     // Clean up pending requests, metrics etc.
     application.stop(rmAppAttemptFinalState);
 
@@ -488,12 +495,12 @@ public class FifoScheduler implements Re
             BuilderUtils.newContainer(recordFactory,
                 application.getApplicationAttemptId(),
                 application.getNewContainerId(),
-                node.getRMNode().getNodeID(), node.getRMNode().getNodeAddress(),
+                node.getRMNode().getNodeID(),
                 node.getRMNode().getHttpAddress(), capability);
-        RMContainer rmContainer = 
-            new RMContainerImpl(container, 
-                application.getApplicationAttemptId(), node.getNodeID(), 
-                null, this.rmContext.getContainerAllocationExpirer());
+        RMContainer rmContainer = new RMContainerImpl(container, application
+            .getApplicationAttemptId(), node.getNodeID(), this.rmContext
+            .getDispatcher().getEventHandler(), this.rmContext
+            .getContainerAllocationExpirer());
         
         // If security is enabled, send the container-tokens too.
         if (UserGroupInformation.isSecurityEnabled()) {
@@ -528,17 +535,15 @@ public class FifoScheduler implements Re
   }
 
   private synchronized void nodeUpdate(RMNode rmNode,
-      Map<ApplicationId, List<Container>> containers) {
+      Map<ApplicationId, List<Container>> remoteContainers) {
     SchedulerNode node = getNode(rmNode.getNodeID());
     
-    // Process completed containers
-    for (List<Container> appContainers : containers.values()) {
+    for (List<Container> appContainers : remoteContainers.values()) {
       for (Container container : appContainers) {
-        if (container.getContainerStatus().getState() == ContainerState.RUNNING
-            || container.getContainerStatus().getState() == ContainerState.INITIALIZING) {
-          launchContainer(container, node);
+        if (container.getState() == ContainerState.RUNNING) {
+          containerLaunchedOnNode(container, node);
         } else { // has to COMPLETE
-          completedContainer(container, RMContainerEventType.FINISHED);
+          containerCompleted(container, RMContainerEventType.FINISHED);
         }
       }
     }
@@ -603,7 +608,7 @@ public class FifoScheduler implements Re
     {
       ContainerExpiredSchedulerEvent containerExpiredEvent = 
           (ContainerExpiredSchedulerEvent) event;
-      completedContainer(containerExpiredEvent.getContainer(), 
+      containerCompleted(containerExpiredEvent.getContainer(), 
           RMContainerEventType.EXPIRE);
     }
     break;
@@ -612,7 +617,7 @@ public class FifoScheduler implements Re
     }
   }
 
-  private void launchContainer(Container container, SchedulerNode node) {
+  private void containerLaunchedOnNode(Container container, SchedulerNode node) {
     // Get the application for the finished container
     ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
     SchedulerApp application = getApplication(applicationAttemptId);
@@ -623,11 +628,11 @@ public class FifoScheduler implements Re
       return;
     }
     
-    application.launchContainer(container.getId());
+    application.containerLaunchedOnNode(container.getId());
   }
 
   @Lock(FifoScheduler.class)
-  private synchronized void completedContainer(Container container, RMContainerEventType event) {
+  private synchronized void containerCompleted(Container container, RMContainerEventType event) {
     // Get the application for the finished container
     ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
     SchedulerApp application = getApplication(applicationAttemptId);
@@ -644,7 +649,7 @@ public class FifoScheduler implements Re
     }
     
     // Inform the application
-    application.completedContainer(container, event);
+    application.containerCompleted(container, event);
     
     // Inform the node
     node.releaseContainer(container);
@@ -663,7 +668,7 @@ public class FifoScheduler implements Re
     SchedulerNode node = getNode(nodeInfo.getNodeID());
     // Kill running containers
     for(Container container : node.getRunningContainers()) {
-      completedContainer(container, RMContainerEventType.KILL);
+      containerCompleted(container, RMContainerEventType.KILL);
     }
     
     //Remove the node

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Wed Aug  3 11:48:09 2011
@@ -266,14 +266,16 @@ public class Application {
     
     // Get resources from the ResourceManager
     resourceManager.getResourceScheduler().allocate(applicationAttemptId,
-        new ArrayList<ResourceRequest>(ask));
+        new ArrayList<ResourceRequest>(ask), new ArrayList<Container>());
     System.out.println("-=======" + applicationAttemptId);
     System.out.println("----------" + resourceManager.getRMContext().getRMApps()
         .get(applicationId).getRMAppAttempt(applicationAttemptId));
     
-     List<Container> containers = resourceManager.getRMContext().getRMApps()
-        .get(applicationId).getRMAppAttempt(applicationAttemptId)
-        .pullNewlyAllocatedContainers();
+     List<Container> containers = null;
+     // TODO: Fix
+//       resourceManager.getRMContext().getRMApps()
+//        .get(applicationId).getRMAppAttempt(applicationAttemptId)
+//        .pullNewlyAllocatedContainers();
 
     // Clear state for next interaction with ResourceManager
     ask.clear();

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Wed Aug  3 11:48:09 2011
@@ -72,7 +72,10 @@ public class MockAM {
   }
 
   public AMResponse schedule() throws Exception {
-    return allocate(releases, requests);
+    AMResponse response = allocate(requests, releases);
+    requests.clear();
+    releases.clear();
+    return response;
   }
 
   public AMResponse allocate( 
@@ -85,7 +88,7 @@ public class MockAM {
       cont.setId(id);
       //TOOD: set all fields
     }
-    return allocate(toRelease, reqs);
+    return allocate(reqs, toRelease);
   }
 
   public List<ResourceRequest> createReq(String[] hosts, int memory, int priority, 
@@ -122,7 +125,7 @@ public class MockAM {
   }
 
   public AMResponse allocate(
-      List<Container> releases, List<ResourceRequest> resourceRequest) 
+      List<ResourceRequest> resourceRequest, List<Container> releases) 
       throws Exception {
     AllocateRequest req = Records.newRecord(AllocateRequest.class);
     req.setResponseId(++responseId);

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Wed Aug  3 11:48:09 2011
@@ -93,7 +93,6 @@ public class MockNodes {
         recordFactory.newRecordInstance(NodeHealthStatus.class);
     final Resource used = newUsedResource(perNode);
     final Resource avail = newAvailResource(perNode, used);
-    final int containers = (int)(Math.random() * 8);
     return new RMNode() {
       @Override
       public NodeId getNodeID() {
@@ -126,22 +125,11 @@ public class MockNodes {
       }
 
       @Override
-      public int getNumContainers() {
-        return containers;
-      }
-
-      @Override
       public NodeHealthStatus getNodeHealthStatus() {
         return nodeHealthStatus;
       }
 
       @Override
-      public List<Container> getRunningContainers() {
-        // TODO Auto-generated method stub
-        return null;
-      }
-
-      @Override
       public int getCommandPort() {
         return nid;
       }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Wed Aug  3 11:48:09 2011
@@ -170,7 +170,7 @@ public class NodeManager implements Cont
 
     Container container =
         BuilderUtils.newContainer(containerLaunchContext.getContainerId(),
-            this.nodeId, containerManagerAddress, nodeHttpAddress,
+            this.nodeId, nodeHttpAddress,
             containerLaunchContext.getResource());
 
     applicationContainers.add(container);

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Wed Aug  3 11:48:09 2011
@@ -47,12 +47,12 @@ public class TestApplicationCleanup {
     
     //kick the scheduler
     nm1.nodeHeartbeat(true);
-    List<Container> conts = am.allocate(new ArrayList<Container>(),
-        new ArrayList<ResourceRequest>()).getNewContainerList();
+    List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<Container>()).getNewContainerList();
     int contReceived = conts.size();
     while (contReceived < request) {
-      conts = am.allocate(new ArrayList<Container>(),
-          new ArrayList<ResourceRequest>()).getNewContainerList();
+      conts = am.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<Container>()).getNewContainerList();
       contReceived += conts.size();
       Log.info("Got " + contReceived + " containers. Waiting to get " + request);
       Thread.sleep(2000);
@@ -73,7 +73,8 @@ public class TestApplicationCleanup {
       HeartbeatResponse resp = nm1.nodeHeartbeat(true);
       contsToClean = resp.getContainersToCleanupList();
       apps = resp.getApplicationsToCleanupList();
-      Log.info("Waiting to get cleanup events.." + cleanedConts);
+      Log.info("Waiting to get cleanup events.. cleanedConts: "
+          + cleanedConts + " cleanedApps: " + cleanedApps);
       cleanedConts += contsToClean.size();
       cleanedApps += apps.size();
       Thread.sleep(1000);

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Wed Aug  3 11:48:09 2011
@@ -66,7 +66,7 @@ public class TestFifoScheduler {
     MockNM nm2 = rm.registerNode("h2:5678", 4 * GB);
 
     RMApp app1 = rm.submitApp(2048);
-    // kick the scheduling, 2 GB given to AM1, remaining 4GB
+    // kick the scheduling, 2 GB given to AM1, remaining 4GB on nm1
     nm1.nodeHeartbeat(true);
     RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
     MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
@@ -75,7 +75,7 @@ public class TestFifoScheduler {
         nm1.getNodeId()).getMemory());
 
     RMApp app2 = rm.submitApp(2048);
-    // kick the scheduling, 2GB given to AM, remaining 2 GB
+    // kick the scheduling, 2GB given to AM, remaining 2 GB on nm2
     nm2.nodeHeartbeat(true);
     RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
     MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId());
@@ -85,32 +85,32 @@ public class TestFifoScheduler {
 
     // add request for containers
     am1.addRequests(new String[] { "h1", "h2" }, GB, 1, 1);
-    am1.schedule(); // send the request
+    AMResponse am1Response = am1.schedule(); // send the request
     // add request for containers
     am2.addRequests(new String[] { "h1", "h2" }, 3 * GB, 0, 1);
-    am2.schedule(); // send the request
+    AMResponse am2Response = am2.schedule(); // send the request
 
     // kick the scheduler, 1 GB and 3 GB given to AM1 and AM2, remaining 0
     nm1.nodeHeartbeat(true);
-    while (attempt1.getNewlyAllocatedContainers().size() < 1) {
+    while (am1Response.getNewContainerCount() < 1) {
       LOG.info("Waiting for containers to be created for app 1...");
       Thread.sleep(1000);
+      am1Response = am1.schedule();
     }
-    while (attempt2.getNewlyAllocatedContainers().size() < 1) {
+    while (am2Response.getNewContainerCount() < 1) {
       LOG.info("Waiting for containers to be created for app 2...");
       Thread.sleep(1000);
+      am2Response = am2.schedule();
     }
     // kick the scheduler, nothing given remaining 2 GB.
     nm2.nodeHeartbeat(true);
 
-    AMResponse resp1 = am1.schedule(); // get allocations
-    List<Container> allocated1 = resp1.getNewContainerList();
+    List<Container> allocated1 = am1Response.getNewContainerList();
     Assert.assertEquals(1, allocated1.size());
     Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory());
     Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
 
-    AMResponse resp2 = am2.schedule(); // get allocations
-    List<Container> allocated2 = resp2.getNewContainerList();
+    List<Container> allocated2 = am2Response.getNewContainerList();
     Assert.assertEquals(1, allocated2.size());
     Assert.assertEquals(3 * GB, allocated2.get(0).getResource().getMemory());
     Assert.assertEquals(nm1.getNodeId(), allocated2.get(0).getNodeId());

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Wed Aug  3 11:48:09 2011
@@ -28,7 +28,7 @@ public class TestRM {
     rootLogger.setLevel(Level.DEBUG);
     MockRM rm = new MockRM();
     rm.start();
-    MockNM nm1 = rm.registerNode("h1:1234", 5000);
+    MockNM nm1 = rm.registerNode("h1:1234", 5120);
     
     RMApp app = rm.submitApp(2000);
 
@@ -49,8 +49,8 @@ public class TestRM {
     rootLogger.setLevel(Level.DEBUG);
     MockRM rm = new MockRM();
     rm.start();
-    MockNM nm1 = rm.registerNode("h1:1234", 5000);
-    MockNM nm2 = rm.registerNode("h2:5678", 10000);
+    MockNM nm1 = rm.registerNode("h1:1234", 5120);
+    MockNM nm2 = rm.registerNode("h2:5678", 10240);
     
     RMApp app = rm.submitApp(2000);
 
@@ -67,13 +67,13 @@ public class TestRM {
     
     //kick the scheduler
     nm1.nodeHeartbeat(true);
-    List<Container> conts = am.allocate(new ArrayList<Container>(),
-        new ArrayList<ResourceRequest>()).getNewContainerList();
+    List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<Container>()).getNewContainerList();
     int contReceived = conts.size();
     while (contReceived < 3) {//only 3 containers are available on node1
-      conts = am.allocate(new ArrayList<Container>(),
-          new ArrayList<ResourceRequest>()).getNewContainerList();
-      contReceived += conts.size();
+      conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<Container>()).getNewContainerList());
+      contReceived = conts.size();
       LOG.info("Got " + contReceived + " containers. Waiting to get " + 3);
       Thread.sleep(2000);
     }
@@ -81,13 +81,13 @@ public class TestRM {
 
     //send node2 heartbeat
     nm2.nodeHeartbeat(true);
-    conts = am.allocate(new ArrayList<Container>(),
-        new ArrayList<ResourceRequest>()).getNewContainerList();
+    conts = am.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<Container>()).getNewContainerList();
     contReceived = conts.size();
     while (contReceived < 10) {
-      conts = am.allocate(new ArrayList<Container>(),
-          new ArrayList<ResourceRequest>()).getNewContainerList();
-      contReceived += conts.size();
+      conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<Container>()).getNewContainerList());
+      contReceived = conts.size();
       LOG.info("Got " + contReceived + " containers. Waiting to get " + 10);
       Thread.sleep(2000);
     }



Mime
View raw message