hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gkesa...@apache.org
Subject svn commit: r1369164 [5/7] - in /hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduc...
Date Fri, 03 Aug 2012 19:00:51 GMT
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java Fri Aug  3 19:00:15 2012
@@ -20,10 +20,8 @@ package org.apache.hadoop.yarn.server.re
 
 public enum RMContainerEventType {
 
-  // Source: scheduler
-  START,
-
   // Source: SchedulerApp
+  START,
   ACQUIRED,
   KILL, // Also from Node on NodeRemoval
   RESERVED,

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java Fri Aug  3 19:00:15 2012
@@ -56,7 +56,7 @@ public class ActiveUsersManager {
    * @param user application user 
    * @param applicationId activated application
    */
-  @Lock({Queue.class, SchedulerApp.class})
+  @Lock({Queue.class, SchedulerApplication.class})
   synchronized public void activateApplication(
       String user, ApplicationId applicationId) {
     Set<ApplicationId> userApps = usersApplications.get(user);
@@ -79,7 +79,7 @@ public class ActiveUsersManager {
    * @param user application user 
    * @param applicationId deactivated application
    */
-  @Lock({Queue.class, SchedulerApp.class})
+  @Lock({Queue.class, SchedulerApplication.class})
   synchronized public void deactivateApplication(
       String user, ApplicationId applicationId) {
     Set<ApplicationId> userApps = usersApplications.get(user);
@@ -102,7 +102,7 @@ public class ActiveUsersManager {
    * resource requests.
    * @return number of active users
    */
-  @Lock({Queue.class, SchedulerApp.class})
+  @Lock({Queue.class, SchedulerApplication.class})
   synchronized public int getNumActiveUsers() {
     return activeUsers;
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Fri Aug  3 19:00:15 2012
@@ -245,7 +245,8 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateNodeLocal(SchedulerNode node, Priority priority,
+  synchronized private void allocateNodeLocal( 
+      SchedulerNode node, Priority priority, 
       ResourceRequest nodeLocalRequest, Container container) {
     // Update consumption and track allocations
     allocate(container);
@@ -273,7 +274,8 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateRackLocal(SchedulerNode node, Priority priority,
+  synchronized private void allocateRackLocal(
+      SchedulerNode node, Priority priority,
       ResourceRequest rackLocalRequest, Container container) {
 
     // Update consumption and track allocations
@@ -295,7 +297,8 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateOffSwitch(SchedulerNode node, Priority priority,
+  synchronized private void allocateOffSwitch(
+      SchedulerNode node, Priority priority,
       ResourceRequest offSwitchRequest, Container container) {
 
     // Update consumption and track allocations

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java Fri Aug  3 19:00:15 2012
@@ -36,7 +36,7 @@ public class SchedulerAppReport {
   private final Collection<RMContainer> reserved;
   private final boolean pending;
   
-  public SchedulerAppReport(SchedulerApp app) {
+  public SchedulerAppReport(SchedulerApplication app) {
     this.live = app.getLiveContainers();
     this.reserved = app.getReservedContainers();
     this.pending = app.isPending();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Fri Aug  3 19:00:15 2012
@@ -18,224 +18,45 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
-public class SchedulerNode {
+/**
+ * Represents a YARN Cluster Node from the viewpoint of the scheduler.
+ */
+@Private
+@Unstable
+public abstract class SchedulerNode {
 
-  private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
-
-  private static final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
-
-  private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
-  private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
-
-  private volatile int numContainers;
-
-  private RMContainer reservedContainer;
+  /**
+   * Get hostname.
+   * @return hostname
+   */
+  public abstract String getHostName();
   
-  /* set of containers that are allocated containers */
-  private final Map<ContainerId, RMContainer> launchedContainers = 
-    new HashMap<ContainerId, RMContainer>();
+  /**
+   * Get rackname.
+   * @return rackname
+   */
+  public abstract String getRackName();
   
-  private final RMNode rmNode;
-
-  public static final String ANY = "*";
-
-  public SchedulerNode(RMNode node) {
-    this.rmNode = node;
-    this.availableResource.setMemory(node.getTotalCapability().getMemory());
-  }
-
-  public RMNode getRMNode() {
-    return this.rmNode;
-  }
-
-  public NodeId getNodeID() {
-    return this.rmNode.getNodeID();
-  }
-
-  public String getHttpAddress() {
-    return this.rmNode.getHttpAddress();
-  }
-
-  public String getHostName() {
-    return this.rmNode.getHostName();
-  }
-
-  public String getRackName() {
-    return this.rmNode.getRackName();
-  }
+  /**
+   * Get used resources on the node.
+   * @return used resources on the node
+   */
+  public abstract Resource getUsedResource();
 
   /**
-   * The Scheduler has allocated containers on this node to the 
-   * given application.
-   * 
-   * @param applicationId application
-   * @param rmContainer allocated container
+   * Get available resources on the node.
+   * @return available resources on the node
    */
-  public synchronized void allocateContainer(ApplicationId applicationId, 
-      RMContainer rmContainer) {
-    Container container = rmContainer.getContainer();
-    deductAvailableResource(container.getResource());
-    ++numContainers;
-    
-    launchedContainers.put(container.getId(), rmContainer);
-
-    LOG.info("Assigned container " + container.getId() + 
-        " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + 
-        ", which currently has " + numContainers + " containers, " + 
-        getUsedResource() + " used and " + 
-        getAvailableResource() + " available");
-  }
-
-  public synchronized Resource getAvailableResource() {
-    return this.availableResource;
-  }
-
-  public synchronized Resource getUsedResource() {
-    return this.usedResource;
-  }
-
-  private synchronized boolean isValidContainer(Container c) {    
-    if (launchedContainers.containsKey(c.getId()))
-      return true;
-    return false;
-  }
-
-  private synchronized void updateResource(Container container) {
-    addAvailableResource(container.getResource());
-    --numContainers;
-  }
-  
+  public abstract Resource getAvailableResource();
+
   /**
-   * Release an allocated container on this node.
-   * @param container container to be released
+   * Get number of active containers on the node.
+   * @return number of active containers on the node
    */
-  public synchronized void releaseContainer(Container container) {
-    if (!isValidContainer(container)) {
-      LOG.error("Invalid container released " + container);
-      return;
-    }
-
-    /* remove the containers from the nodemanger */
-    launchedContainers.remove(container.getId());
-    updateResource(container);
-
-    LOG.info("Released container " + container.getId() + 
-        " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + 
-        ", which currently has " + numContainers + " containers, " + 
-        getUsedResource() + " used and " + getAvailableResource()
-        + " available" + ", release resources=" + true);
-  }
-
-
-  private synchronized void addAvailableResource(Resource resource) {
-    if (resource == null) {
-      LOG.error("Invalid resource addition of null resource for "
-          + rmNode.getNodeAddress());
-      return;
-    }
-    Resources.addTo(availableResource, resource);
-    Resources.subtractFrom(usedResource, resource);
-  }
-
-  private synchronized void deductAvailableResource(Resource resource) {
-    if (resource == null) {
-      LOG.error("Invalid deduction of null resource for "
-          + rmNode.getNodeAddress());
-      return;
-    }
-    Resources.subtractFrom(availableResource, resource);
-    Resources.addTo(usedResource, resource);
-  }
-
-  @Override
-  public String toString() {
-    return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +  
-      " available=" + getAvailableResource().getMemory() + 
-      " used=" + getUsedResource().getMemory();
-  }
-
-  public int getNumContainers() {
-    return numContainers;
-  }
-
-  public synchronized List<RMContainer> getRunningContainers() {
-    return new ArrayList<RMContainer>(launchedContainers.values());
-  }
-
-  public synchronized void reserveResource(
-      SchedulerApp application, Priority priority, 
-      RMContainer reservedContainer) {
-    // Check if it's already reserved
-    if (this.reservedContainer != null) {
-      // Sanity check
-      if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) {
-        throw new IllegalStateException("Trying to reserve" +
-            " container " + reservedContainer +
-            " on node " + reservedContainer.getReservedNode() + 
-            " when currently" + " reserved resource " + this.reservedContainer +
-            " on node " + this.reservedContainer.getReservedNode());
-      }
-      
-      // Cannot reserve more than one application on a given node!
-      if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
-          reservedContainer.getContainer().getId().getApplicationAttemptId())) {
-        throw new IllegalStateException("Trying to reserve" +
-        		" container " + reservedContainer + 
-            " for application " + application.getApplicationId() + 
-            " when currently" +
-            " reserved container " + this.reservedContainer +
-            " on node " + this);
-      }
-
-      LOG.info("Updated reserved container " + 
-          reservedContainer.getContainer().getId() + " on node " + 
-          this + " for application " + application);
-    } else {
-      LOG.info("Reserved container " + reservedContainer.getContainer().getId() + 
-          " on node " + this + " for application " + application);
-    }
-    this.reservedContainer = reservedContainer;
-  }
-
-  public synchronized void unreserveResource(SchedulerApp application) {
-    // Cannot unreserve for wrong application...
-    ApplicationAttemptId reservedApplication = 
-        reservedContainer.getContainer().getId().getApplicationAttemptId(); 
-    if (!reservedApplication.equals(
-        application.getApplicationAttemptId())) {
-      throw new IllegalStateException("Trying to unreserve " +  
-          " for application " + application.getApplicationId() + 
-          " when currently reserved " + 
-          " for application " + reservedApplication.getApplicationId() + 
-          " on node " + this);
-    }
-    
-    reservedContainer = null;
-  }
-
-  public synchronized RMContainer getReservedContainer() {
-    return reservedContainer;
-  }
+  public abstract int getNumContainers();
 
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java Fri Aug  3 19:00:15 2012
@@ -45,6 +45,9 @@ public class SchedulerUtils {
   public static final String LOST_CONTAINER = 
       "Container released on a *lost* node";
   
+  public static final String PREEMPTED_CONTAINER = 
+      "Container preempted by scheduler";
+  
   public static final String COMPLETED_APPLICATION = 
       "Container of a completed application";
   

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Fri Aug  3 19:00:15 2012
@@ -33,8 +33,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 
 /**
  * <code>CSQueue</code> represents a node in the tree of 
@@ -150,7 +150,7 @@ extends org.apache.hadoop.yarn.server.re
    * @param user user who submitted the application
    * @param queue queue to which the application is submitted
    */
-  public void submitApplication(SchedulerApp application, String user, 
+  public void submitApplication(FiCaSchedulerApp application, String user, 
       String queue) 
   throws AccessControlException;
   
@@ -159,7 +159,7 @@ extends org.apache.hadoop.yarn.server.re
    * @param application
    * @param queue application queue 
    */
-  public void finishApplication(SchedulerApp application, String queue);
+  public void finishApplication(FiCaSchedulerApp application, String queue);
   
   /**
    * Assign containers to applications in the queue or it's children (if any).
@@ -168,7 +168,7 @@ extends org.apache.hadoop.yarn.server.re
    * @return the assignment
    */
   public CSAssignment assignContainers(
-      Resource clusterResource, SchedulerNode node);
+      Resource clusterResource, FiCaSchedulerNode node);
   
   /**
    * A container assigned to the queue has completed.
@@ -182,7 +182,7 @@ extends org.apache.hadoop.yarn.server.re
    * @param event event to be sent to the container
    */
   public void completedContainer(Resource clusterResource,
-      SchedulerApp application, SchedulerNode node, 
+      FiCaSchedulerApp application, FiCaSchedulerNode node, 
       RMContainer container, ContainerStatus containerStatus, 
       RMContainerEventType event);
 
@@ -219,6 +219,6 @@ extends org.apache.hadoop.yarn.server.re
    * @param application the application for which the container was allocated
    * @param container the container that was recovered.
    */
-  public void recoverContainer(Resource clusterResource, SchedulerApp application, 
+  public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application, 
       Container container);
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Fri Aug  3 19:00:15 2012
@@ -63,11 +63,11 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
@@ -103,10 +103,10 @@ implements ResourceScheduler, CapacitySc
     }
   };
 
-  static final Comparator<SchedulerApp> applicationComparator = 
-    new Comparator<SchedulerApp>() {
+  static final Comparator<FiCaSchedulerApp> applicationComparator = 
+    new Comparator<FiCaSchedulerApp>() {
     @Override
-    public int compare(SchedulerApp a1, SchedulerApp a2) {
+    public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
       return a1.getApplicationId().getId() - a2.getApplicationId().getId();
     }
   };
@@ -131,8 +131,8 @@ implements ResourceScheduler, CapacitySc
 
   private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
 
-  private Map<NodeId, SchedulerNode> nodes = 
-      new ConcurrentHashMap<NodeId, SchedulerNode>();
+  private Map<NodeId, FiCaSchedulerNode> nodes = 
+      new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
 
   private Resource clusterResource = 
     RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
@@ -141,8 +141,8 @@ implements ResourceScheduler, CapacitySc
   private Resource minimumAllocation;
   private Resource maximumAllocation;
 
-  private Map<ApplicationAttemptId, SchedulerApp> applications = 
-      new ConcurrentHashMap<ApplicationAttemptId, SchedulerApp>();
+  private Map<ApplicationAttemptId, FiCaSchedulerApp> applications = 
+      new ConcurrentHashMap<ApplicationAttemptId, FiCaSchedulerApp>();
 
   private boolean initialized = false;
 
@@ -299,7 +299,7 @@ implements ResourceScheduler, CapacitySc
       CSQueue parent, String queueName, Map<String, CSQueue> queues,
       Map<String, CSQueue> oldQueues, 
       Comparator<CSQueue> queueComparator,
-      Comparator<SchedulerApp> applicationComparator,
+      Comparator<FiCaSchedulerApp> applicationComparator,
       QueueHook hook) throws IOException {
     CSQueue queue;
     String[] childQueueNames = 
@@ -370,8 +370,8 @@ implements ResourceScheduler, CapacitySc
     }
 
     // TODO: Fix store
-    SchedulerApp SchedulerApp = 
-        new SchedulerApp(applicationAttemptId, user, queue, 
+    FiCaSchedulerApp SchedulerApp = 
+        new FiCaSchedulerApp(applicationAttemptId, user, queue, 
             queue.getActiveUsersManager(), rmContext, null);
 
     // Submit to the queue
@@ -404,7 +404,7 @@ implements ResourceScheduler, CapacitySc
     LOG.info("Application " + applicationAttemptId + " is done." +
     		" finalState=" + rmAppAttemptFinalState);
     
-    SchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
 
     if (application == null) {
       //      throw new IOException("Unknown application " + applicationId + 
@@ -456,7 +456,7 @@ implements ResourceScheduler, CapacitySc
   public Allocation allocate(ApplicationAttemptId applicationAttemptId,
       List<ResourceRequest> ask, List<ContainerId> release) {
 
-    SchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
       LOG.info("Calling allocate on removed " +
           "or non existant application " + applicationAttemptId);
@@ -551,7 +551,7 @@ implements ResourceScheduler, CapacitySc
       LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
     }
                   
-    SchedulerNode node = getNode(nm.getNodeID());
+    FiCaSchedulerNode node = getNode(nm.getNodeID());
 
     // Processing the newly launched containers
     for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
@@ -578,7 +578,7 @@ implements ResourceScheduler, CapacitySc
 
     RMContainer reservedContainer = node.getReservedContainer();
     if (reservedContainer != null) {
-      SchedulerApp reservedApplication = 
+      FiCaSchedulerApp reservedApplication = 
           getApplication(reservedContainer.getApplicationAttemptId());
       
       // Try to fulfill the reservation
@@ -601,10 +601,10 @@ implements ResourceScheduler, CapacitySc
 
   }
 
-  private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) {
+  private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
     // Get the application for the finished container
     ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
-    SchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
       LOG.info("Unknown application: " + applicationAttemptId + 
           " launched container " + containerId +
@@ -672,7 +672,7 @@ implements ResourceScheduler, CapacitySc
   }
 
   private synchronized void addNode(RMNode nodeManager) {
-    this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
+    this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     root.updateClusterResource(clusterResource);
     ++numNodeManagers;
@@ -681,7 +681,7 @@ implements ResourceScheduler, CapacitySc
   }
 
   private synchronized void removeNode(RMNode nodeInfo) {
-    SchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
+    FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
     if (node == null) {
       return;
     }
@@ -726,7 +726,7 @@ implements ResourceScheduler, CapacitySc
     
     // Get the application for the finished container
     ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
-    SchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
       LOG.info("Container " + container + " of" +
       		" unknown application " + applicationAttemptId + 
@@ -735,7 +735,7 @@ implements ResourceScheduler, CapacitySc
     }
     
     // Get the node on which the container was allocated
-    SchedulerNode node = getNode(container.getNodeId());
+    FiCaSchedulerNode node = getNode(container.getNodeId());
     
     // Inform the queue
     LeafQueue queue = (LeafQueue)application.getQueue();
@@ -749,24 +749,24 @@ implements ResourceScheduler, CapacitySc
   }
 
   @Lock(Lock.NoLock.class)
-  SchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
+  FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
     return applications.get(applicationAttemptId);
   }
 
   @Override
   public SchedulerAppReport getSchedulerAppInfo(
       ApplicationAttemptId applicationAttemptId) {
-    SchedulerApp app = getApplication(applicationAttemptId);
+    FiCaSchedulerApp app = getApplication(applicationAttemptId);
     return app == null ? null : new SchedulerAppReport(app);
   }
   
   @Lock(Lock.NoLock.class)
-  SchedulerNode getNode(NodeId nodeId) {
+  FiCaSchedulerNode getNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }
 
   private RMContainer getRMContainer(ContainerId containerId) {
-    SchedulerApp application = 
+    FiCaSchedulerApp application = 
         getApplication(containerId.getApplicationAttemptId());
     return (application == null) ? null : application.getRMContainer(containerId);
   }
@@ -790,7 +790,7 @@ implements ResourceScheduler, CapacitySc
 
   @Override
   public SchedulerNodeReport getNodeReport(NodeId nodeId) {
-    SchedulerNode node = getNode(nodeId);
+    FiCaSchedulerNode node = getNode(nodeId);
     return node == null ? null : new SchedulerNodeReport(node);
   }
   

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Fri Aug  3 19:00:15 2012
@@ -47,12 +47,20 @@ public class CapacitySchedulerConfigurat
   public static final String DOT = ".";
   
   @Private
+  public static final String MAXIMUM_APPLICATIONS_SUFFIX =
+    "maximum-applications";
+  
+  @Private
   public static final String MAXIMUM_SYSTEM_APPLICATIONS =
-    PREFIX + "maximum-applications";
+    PREFIX + MAXIMUM_APPLICATIONS_SUFFIX;
+  
+  @Private
+  public static final String MAXIMUM_AM_RESOURCE_SUFFIX =
+    "maximum-am-resource-percent";
   
   @Private
   public static final String MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT =
-    PREFIX + "maximum-am-resource-percent";
+    PREFIX + MAXIMUM_AM_RESOURCE_SUFFIX;
   
   @Private
   public static final String QUEUES = "queues";
@@ -131,6 +139,30 @@ public class CapacitySchedulerConfigurat
     return getFloat(MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 
         DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT);
   }
+
+
+  /**
+   * Get the maximum applications per queue setting.
+   * @param queue name of the queue
+   * @return setting specified or -1 if not set
+   */
+  public int getMaximumApplicationsPerQueue(String queue) {
+    int maxApplicationsPerQueue = 
+      getInt(getQueuePrefix(queue) + MAXIMUM_APPLICATIONS_SUFFIX, 
+          (int)UNDEFINED);
+    return maxApplicationsPerQueue;
+  }
+
+  /**
+   * Get the maximum am resource percent per queue setting.
+   * @param queue name of the queue
+   * @return per queue setting or defaults to the global am-resource-percent 
+   *         setting if per queue setting not present
+   */
+  public float getMaximumApplicationMasterResourcePerQueuePercent(String queue) {
+    return getFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX, 
+    		getMaximumApplicationMasterResourcePercent());
+  }
   
   public float getCapacity(String queue) {
     float capacity = getFloat(getQueuePrefix(queue) + CAPACITY, UNDEFINED);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Fri Aug  3 19:00:15 2012
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -54,7 +53,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -63,9 +61,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 
@@ -87,7 +85,7 @@ public class LeafQueue implements CSQueu
   private int maxApplications;
   private int maxApplicationsPerUser;
   
-  private float maxAMResourcePercent;
+  private float maxAMResourcePerQueuePercent;
   private int maxActiveApplications; // Based on absolute max capacity
   private int maxActiveAppsUsingAbsCap; // Based on absolute capacity
   private int maxActiveApplicationsPerUser;
@@ -96,11 +94,11 @@ public class LeafQueue implements CSQueu
   private float usedCapacity = 0.0f;
   private volatile int numContainers;
 
-  Set<SchedulerApp> activeApplications;
-  Map<ApplicationAttemptId, SchedulerApp> applicationsMap = 
-      new HashMap<ApplicationAttemptId, SchedulerApp>();
+  Set<FiCaSchedulerApp> activeApplications;
+  Map<ApplicationAttemptId, FiCaSchedulerApp> applicationsMap = 
+      new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
   
-  Set<SchedulerApp> pendingApplications;
+  Set<FiCaSchedulerApp> pendingApplications;
   
   private final Resource minimumAllocation;
   private final Resource maximumAllocation;
@@ -128,7 +126,7 @@ public class LeafQueue implements CSQueu
   
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, 
-      Comparator<SchedulerApp> applicationComparator, CSQueue old) {
+      Comparator<FiCaSchedulerApp> applicationComparator, CSQueue old) {
     this.scheduler = cs;
     this.queueName = queueName;
     this.parent = parent;
@@ -158,21 +156,25 @@ public class LeafQueue implements CSQueu
     float userLimitFactor = 
       cs.getConfiguration().getUserLimitFactor(getQueuePath());
 
-    int maxSystemJobs = cs.getConfiguration().getMaximumSystemApplications();
-    int maxApplications = (int)(maxSystemJobs * absoluteCapacity);
-    int maxApplicationsPerUser = 
+    int maxApplications = cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath());
+    if (maxApplications < 0) {
+      int maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
+      maxApplications = (int)(maxSystemApps * absoluteCapacity);
+    }
+    maxApplicationsPerUser = 
       (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);
 
-    this.maxAMResourcePercent = 
-        cs.getConfiguration().getMaximumApplicationMasterResourcePercent();
+    this.maxAMResourcePerQueuePercent = 
+        cs.getConfiguration().
+            getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
     int maxActiveApplications = 
         CSQueueUtils.computeMaxActiveApplications(
             cs.getClusterResources(), this.minimumAllocation,
-            maxAMResourcePercent, absoluteMaxCapacity);
+            maxAMResourcePerQueuePercent, absoluteMaxCapacity);
     this.maxActiveAppsUsingAbsCap = 
             CSQueueUtils.computeMaxActiveApplications(
                 cs.getClusterResources(), this.minimumAllocation,
-                maxAMResourcePercent, absoluteCapacity);
+                maxAMResourcePerQueuePercent, absoluteCapacity);
     int maxActiveApplicationsPerUser = 
         CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveAppsUsingAbsCap, userLimit, 
             userLimitFactor);
@@ -201,8 +203,8 @@ public class LeafQueue implements CSQueu
     }
 
     this.pendingApplications = 
-        new TreeSet<SchedulerApp>(applicationComparator);
-    this.activeApplications = new TreeSet<SchedulerApp>(applicationComparator);
+        new TreeSet<FiCaSchedulerApp>(applicationComparator);
+    this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
   }
 
   private synchronized void setupQueueConfigs(
@@ -267,15 +269,16 @@ public class LeafQueue implements CSQueu
         "userLimitFactor = " + userLimitFactor +
         " [= configuredUserLimitFactor ]" + "\n" +
         "maxApplications = " + maxApplications +
-        " [= (int)(configuredMaximumSystemApplications * absoluteCapacity) ]" + 
+        " [= configuredMaximumSystemApplicationsPerQueue or" + 
+        " (int)(configuredMaximumSystemApplications * absoluteCapacity)]" + 
         "\n" +
         "maxApplicationsPerUser = " + maxApplicationsPerUser +
         " [= (int)(maxApplications * (userLimit / 100.0f) * " +
         "userLimitFactor) ]" + "\n" +
         "maxActiveApplications = " + maxActiveApplications +
         " [= max(" + 
-        "(int)ceil((clusterResourceMemory / minimumAllocation) *" + 
-        "maxAMResourcePercent * absoluteMaxCapacity)," + 
+        "(int)ceil((clusterResourceMemory / minimumAllocation) * " + 
+        "maxAMResourcePerQueuePercent * absoluteMaxCapacity)," + 
         "1) ]" + "\n" +
         "maxActiveAppsUsingAbsCap = " + maxActiveAppsUsingAbsCap +
         " [= max(" + 
@@ -292,7 +295,7 @@ public class LeafQueue implements CSQueu
         "(clusterResourceMemory * absoluteCapacity)]" + "\n" +
         "absoluteUsedCapacity = " + absoluteUsedCapacity +
         " [= usedResourcesMemory / clusterResourceMemory]" + "\n" +
-        "maxAMResourcePercent = " + maxAMResourcePercent +
+        "maxAMResourcePerQueuePercent = " + maxAMResourcePerQueuePercent +
         " [= configuredMaximumAMResourcePercent ]" + "\n" +
         "minimumAllocationFactor = " + minimumAllocationFactor +
         " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " +
@@ -582,7 +585,7 @@ public class LeafQueue implements CSQueu
   }
 
   @Override
-  public void submitApplication(SchedulerApp application, String userName,
+  public void submitApplication(FiCaSchedulerApp application, String userName,
       String queue)  throws AccessControlException {
     // Careful! Locking order is important!
 
@@ -646,9 +649,9 @@ public class LeafQueue implements CSQueu
   }
 
   private synchronized void activateApplications() {
-    for (Iterator<SchedulerApp> i=pendingApplications.iterator(); 
+    for (Iterator<FiCaSchedulerApp> i=pendingApplications.iterator(); 
          i.hasNext(); ) {
-      SchedulerApp application = i.next();
+      FiCaSchedulerApp application = i.next();
       
       // Check queue limit
       if (getNumActiveApplications() >= getMaximumActiveApplications()) {
@@ -668,7 +671,7 @@ public class LeafQueue implements CSQueu
     }
   }
   
-  private synchronized void addApplication(SchedulerApp application, User user) {
+  private synchronized void addApplication(FiCaSchedulerApp application, User user) {
     // Accept 
     user.submitApplication();
     pendingApplications.add(application);
@@ -688,7 +691,7 @@ public class LeafQueue implements CSQueu
   }
 
   @Override
-  public void finishApplication(SchedulerApp application, String queue) {
+  public void finishApplication(FiCaSchedulerApp application, String queue) {
     // Careful! Locking order is important!
     synchronized (this) {
       removeApplication(application, getUser(application.getUser()));
@@ -698,7 +701,7 @@ public class LeafQueue implements CSQueu
     parent.finishApplication(application, queue);
   }
 
-  public synchronized void removeApplication(SchedulerApp application, User user) {
+  public synchronized void removeApplication(FiCaSchedulerApp application, User user) {
     boolean wasActive = activeApplications.remove(application);
     if (!wasActive) {
       pendingApplications.remove(application);
@@ -730,7 +733,7 @@ public class LeafQueue implements CSQueu
         );
   }
   
-  private synchronized SchedulerApp getApplication(
+  private synchronized FiCaSchedulerApp getApplication(
       ApplicationAttemptId applicationAttemptId) {
     return applicationsMap.get(applicationAttemptId);
   }
@@ -740,7 +743,7 @@ public class LeafQueue implements CSQueu
   
   @Override
   public synchronized CSAssignment 
-  assignContainers(Resource clusterResource, SchedulerNode node) {
+  assignContainers(Resource clusterResource, FiCaSchedulerNode node) {
 
     if(LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getHostName()
@@ -750,7 +753,7 @@ public class LeafQueue implements CSQueu
     // Check for reserved resources
     RMContainer reservedContainer = node.getReservedContainer();
     if (reservedContainer != null) {
-      SchedulerApp application = 
+      FiCaSchedulerApp application = 
           getApplication(reservedContainer.getApplicationAttemptId());
       return new CSAssignment(
           assignReservedContainer(application, node, reservedContainer, 
@@ -760,7 +763,7 @@ public class LeafQueue implements CSQueu
     }
     
     // Try to assign containers to applications in order
-    for (SchedulerApp application : activeApplications) {
+    for (FiCaSchedulerApp application : activeApplications) {
       
       if(LOG.isDebugEnabled()) {
         LOG.debug("pre-assignContainers for application "
@@ -838,8 +841,8 @@ public class LeafQueue implements CSQueu
 
   }
 
-  private synchronized Resource assignReservedContainer(SchedulerApp application, 
-      SchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
+  private synchronized Resource assignReservedContainer(FiCaSchedulerApp application, 
+      FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
     // Do we still need this reservation?
     Priority priority = rmContainer.getReservedPriority();
     if (application.getTotalRequiredResources(priority) == 0) {
@@ -882,9 +885,9 @@ public class LeafQueue implements CSQueu
     return true;
   }
 
-  @Lock({LeafQueue.class, SchedulerApp.class})
+  @Lock({LeafQueue.class, FiCaSchedulerApp.class})
   private Resource computeUserLimitAndSetHeadroom(
-      SchedulerApp application, Resource clusterResource, Resource required) {
+      FiCaSchedulerApp application, Resource clusterResource, Resource required) {
     
     String user = application.getUser();
     
@@ -921,7 +924,7 @@ public class LeafQueue implements CSQueu
   }
   
   @Lock(NoLock.class)
-  private Resource computeUserLimit(SchedulerApp application, 
+  private Resource computeUserLimit(FiCaSchedulerApp application, 
       Resource clusterResource, Resource required) {
     // What is our current capacity? 
     // * It is equal to the max(required, queue-capacity) if
@@ -1009,7 +1012,7 @@ public class LeafQueue implements CSQueu
     return (a + (b - 1)) / b;
   }
 
-  boolean needContainers(SchedulerApp application, Priority priority, Resource required) {
+  boolean needContainers(FiCaSchedulerApp application, Priority priority, Resource required) {
     int requiredContainers = application.getTotalRequiredResources(priority);
     int reservedContainers = application.getNumReservedContainers(priority);
     int starvation = 0;
@@ -1038,7 +1041,7 @@ public class LeafQueue implements CSQueu
   }
 
   private CSAssignment assignContainersOnNode(Resource clusterResource, 
-      SchedulerNode node, SchedulerApp application, 
+      FiCaSchedulerNode node, FiCaSchedulerApp application, 
       Priority priority, RMContainer reservedContainer) {
 
     Resource assigned = Resources.none();
@@ -1067,7 +1070,7 @@ public class LeafQueue implements CSQueu
   }
 
   private Resource assignNodeLocalContainers(Resource clusterResource, 
-      SchedulerNode node, SchedulerApp application, 
+      FiCaSchedulerNode node, FiCaSchedulerApp application, 
       Priority priority, RMContainer reservedContainer) {
     ResourceRequest request = 
         application.getResourceRequest(priority, node.getHostName());
@@ -1083,7 +1086,7 @@ public class LeafQueue implements CSQueu
   }
 
   private Resource assignRackLocalContainers(Resource clusterResource,  
-      SchedulerNode node, SchedulerApp application, Priority priority,
+      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
       RMContainer reservedContainer) {
     ResourceRequest request = 
       application.getResourceRequest(priority, node.getRackName());
@@ -1097,8 +1100,8 @@ public class LeafQueue implements CSQueu
     return Resources.none();
   }
 
-  private Resource assignOffSwitchContainers(Resource clusterResource, SchedulerNode node, 
-      SchedulerApp application, Priority priority, 
+  private Resource assignOffSwitchContainers(Resource clusterResource, FiCaSchedulerNode node, 
+      FiCaSchedulerApp application, Priority priority, 
       RMContainer reservedContainer) {
     ResourceRequest request = 
       application.getResourceRequest(priority, RMNode.ANY);
@@ -1113,8 +1116,8 @@ public class LeafQueue implements CSQueu
     return Resources.none();
   }
 
-  boolean canAssign(SchedulerApp application, Priority priority, 
-      SchedulerNode node, NodeType type, RMContainer reservedContainer) {
+  boolean canAssign(FiCaSchedulerApp application, Priority priority, 
+      FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) {
 
     // Clearly we need containers for this application...
     if (type == NodeType.OFF_SWITCH) {
@@ -1161,14 +1164,14 @@ public class LeafQueue implements CSQueu
   }
   
   private Container getContainer(RMContainer rmContainer, 
-      SchedulerApp application, SchedulerNode node, 
+      FiCaSchedulerApp application, FiCaSchedulerNode node, 
       Resource capability, Priority priority) {
     return (rmContainer != null) ? rmContainer.getContainer() :
       createContainer(application, node, capability, priority);
   }
   
 
-  public Container createContainer(SchedulerApp application, SchedulerNode node, 
+  public Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, 
       Resource capability, Priority priority) {
 
     NodeId nodeId = node.getRMNode().getNodeID();
@@ -1178,17 +1181,11 @@ public class LeafQueue implements CSQueu
 
     // If security is enabled, send the container-tokens too.
     if (UserGroupInformation.isSecurityEnabled()) {
-      ContainerTokenIdentifier tokenIdentifier = new ContainerTokenIdentifier(
-          containerId, nodeId.toString(), capability);
-      try {
-        containerToken = BuilderUtils.newContainerToken(nodeId, ByteBuffer
-            .wrap(containerTokenSecretManager
-                .createPassword(tokenIdentifier)), tokenIdentifier);
-      } catch (IllegalArgumentException e) {
-         // this could be because DNS is down - in which case we just want
-         // to retry and not bring RM down
-         LOG.error("Error trying to create new container", e);
-         return null;
+      containerToken =
+          containerTokenSecretManager.createContainerToken(containerId, nodeId,
+            capability);
+      if (containerToken == null) {
+        return null; // Try again later.
       }
     }
 
@@ -1200,8 +1197,8 @@ public class LeafQueue implements CSQueu
     return container;
   }
   
-  private Resource assignContainer(Resource clusterResource, SchedulerNode node, 
-      SchedulerApp application, Priority priority, 
+  private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, 
+      FiCaSchedulerApp application, Priority priority, 
       ResourceRequest request, NodeType type, RMContainer rmContainer) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getHostName()
@@ -1275,8 +1272,8 @@ public class LeafQueue implements CSQueu
     }
   }
 
-  private void reserve(SchedulerApp application, Priority priority, 
-      SchedulerNode node, RMContainer rmContainer, Container container) {
+  private void reserve(FiCaSchedulerApp application, Priority priority, 
+      FiCaSchedulerNode node, RMContainer rmContainer, Container container) {
     // Update reserved metrics if this is the first reservation
     if (rmContainer == null) {
       getMetrics().reserveResource(
@@ -1290,8 +1287,8 @@ public class LeafQueue implements CSQueu
     node.reserveResource(application, priority, rmContainer);
   }
 
-  private void unreserve(SchedulerApp application, Priority priority, 
-      SchedulerNode node, RMContainer rmContainer) {
+  private void unreserve(FiCaSchedulerApp application, Priority priority, 
+      FiCaSchedulerNode node, RMContainer rmContainer) {
     // Done with the reservation?
     application.unreserve(node, priority);
     node.unreserveResource(application);
@@ -1304,7 +1301,7 @@ public class LeafQueue implements CSQueu
 
   @Override
   public void completedContainer(Resource clusterResource, 
-      SchedulerApp application, SchedulerNode node, RMContainer rmContainer, 
+      FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, 
       ContainerStatus containerStatus, RMContainerEventType event) {
     if (application != null) {
       // Careful! Locking order is important!
@@ -1346,7 +1343,7 @@ public class LeafQueue implements CSQueu
   }
 
   synchronized void allocateResource(Resource clusterResource, 
-      SchedulerApp application, Resource resource) {
+      FiCaSchedulerApp application, Resource resource) {
     // Update queue metrics
     Resources.addTo(usedResources, resource);
     CSQueueUtils.updateQueueStatistics(
@@ -1371,7 +1368,7 @@ public class LeafQueue implements CSQueu
   }
 
   synchronized void releaseResource(Resource clusterResource, 
-      SchedulerApp application, Resource resource) {
+      FiCaSchedulerApp application, Resource resource) {
     // Update queue metrics
     Resources.subtractFrom(usedResources, resource);
     CSQueueUtils.updateQueueStatistics(
@@ -1395,11 +1392,11 @@ public class LeafQueue implements CSQueu
     maxActiveApplications = 
         CSQueueUtils.computeMaxActiveApplications(
             clusterResource, minimumAllocation, 
-            maxAMResourcePercent, absoluteMaxCapacity);
+            maxAMResourcePerQueuePercent, absoluteMaxCapacity);
     maxActiveAppsUsingAbsCap = 
-            CSQueueUtils.computeMaxActiveApplications(
-                clusterResource, minimumAllocation, 
-                maxAMResourcePercent, absoluteCapacity);
+        CSQueueUtils.computeMaxActiveApplications(
+            clusterResource, minimumAllocation, 
+            maxAMResourcePerQueuePercent, absoluteCapacity);
     maxActiveApplicationsPerUser = 
         CSQueueUtils.computeMaxActiveApplicationsPerUser(
             maxActiveAppsUsingAbsCap, userLimit, userLimitFactor);
@@ -1409,7 +1406,7 @@ public class LeafQueue implements CSQueu
         this, parent, clusterResource, minimumAllocation);
     
     // Update application properties
-    for (SchedulerApp application : activeApplications) {
+    for (FiCaSchedulerApp application : activeApplications) {
       synchronized (application) {
         computeUserLimitAndSetHeadroom(application, clusterResource, 
             Resources.none());
@@ -1472,7 +1469,7 @@ public class LeafQueue implements CSQueu
 
   @Override
   public void recoverContainer(Resource clusterResource,
-      SchedulerApp application, Container container) {
+      FiCaSchedulerApp application, Container container) {
     // Careful! Locking order is important! 
     synchronized (this) {
       allocateResource(clusterResource, application, container.getResource());

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Fri Aug  3 19:00:15 2012
@@ -51,8 +51,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 
 @Private
 @Evolving
@@ -193,7 +193,7 @@ public class ParentQueue implements CSQu
         ", acls=" + aclsString);
   }
 
-  private static float PRECISION = 0.005f; // 0.05% precision
+  private static float PRECISION = 0.0005f; // 0.05% precision
   void setChildQueues(Collection<CSQueue> childQueues) {
     
     // Validate
@@ -421,7 +421,7 @@ public class ParentQueue implements CSQu
   }
 
   @Override
-  public void submitApplication(SchedulerApp application, String user,
+  public void submitApplication(FiCaSchedulerApp application, String user,
       String queue) throws AccessControlException {
     
     synchronized (this) {
@@ -453,7 +453,7 @@ public class ParentQueue implements CSQu
     }
   }
 
-  private synchronized void addApplication(SchedulerApp application, 
+  private synchronized void addApplication(FiCaSchedulerApp application, 
       String user) {
   
     ++numApplications;
@@ -466,7 +466,7 @@ public class ParentQueue implements CSQu
   }
   
   @Override
-  public void finishApplication(SchedulerApp application, String queue) {
+  public void finishApplication(FiCaSchedulerApp application, String queue) {
     
     synchronized (this) {
       removeApplication(application, application.getUser());
@@ -478,7 +478,7 @@ public class ParentQueue implements CSQu
     }
   }
 
-  public synchronized void removeApplication(SchedulerApp application, 
+  public synchronized void removeApplication(FiCaSchedulerApp application, 
       String user) {
     
     --numApplications;
@@ -516,7 +516,7 @@ public class ParentQueue implements CSQu
 
   @Override
   public synchronized CSAssignment assignContainers(
-      Resource clusterResource, SchedulerNode node) {
+      Resource clusterResource, FiCaSchedulerNode node) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
     
@@ -594,14 +594,14 @@ public class ParentQueue implements CSQu
 
   }
   
-  private boolean canAssign(SchedulerNode node) {
+  private boolean canAssign(FiCaSchedulerNode node) {
     return (node.getReservedContainer() == null) && 
         Resources.greaterThanOrEqual(node.getAvailableResource(), 
                                      minimumAllocation);
   }
   
   synchronized CSAssignment assignContainersToChildQueues(Resource cluster, 
-      SchedulerNode node) {
+      FiCaSchedulerNode node) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
     
@@ -654,7 +654,7 @@ public class ParentQueue implements CSQu
   
   @Override
   public void completedContainer(Resource clusterResource,
-      SchedulerApp application, SchedulerNode node, 
+      FiCaSchedulerApp application, FiCaSchedulerNode node, 
       RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) {
     if (application != null) {
       // Careful! Locking order is important!
@@ -715,7 +715,7 @@ public class ParentQueue implements CSQu
   
   @Override
   public void recoverContainer(Resource clusterResource,
-      SchedulerApp application, Container container) {
+      FiCaSchedulerApp application, Container container) {
     // Careful! Locking order is important! 
     synchronized (this) {
       allocateResource(clusterResource, container.getResource());

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri Aug  3 19:00:15 2012
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -32,7 +31,6 @@ import java.util.concurrent.ConcurrentHa
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -55,7 +53,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -74,11 +71,11 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
@@ -106,14 +103,14 @@ public class FifoScheduler implements Re
   private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
   private RMContext rmContext;
 
-  private Map<NodeId, SchedulerNode> nodes = new ConcurrentHashMap<NodeId, SchedulerNode>();
+  private Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
 
   private boolean initialized;
   private Resource minimumAllocation;
   private Resource maximumAllocation;
 
-  private Map<ApplicationAttemptId, SchedulerApp> applications
-      = new TreeMap<ApplicationAttemptId, SchedulerApp>();
+  private Map<ApplicationAttemptId, FiCaSchedulerApp> applications
+      = new TreeMap<ApplicationAttemptId, FiCaSchedulerApp>();
   
   private ActiveUsersManager activeUsersManager;
 
@@ -226,7 +223,7 @@ public class FifoScheduler implements Re
   public Allocation allocate(
       ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
       List<ContainerId> release) {
-    SchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
       LOG.error("Calling allocate on removed " +
           "or non existant application " + applicationAttemptId);
@@ -279,7 +276,7 @@ public class FifoScheduler implements Re
     }
   }
 
-  private SchedulerApp getApplication(
+  private FiCaSchedulerApp getApplication(
       ApplicationAttemptId applicationAttemptId) {
     return applications.get(applicationAttemptId);
   }
@@ -287,19 +284,19 @@ public class FifoScheduler implements Re
   @Override
   public SchedulerAppReport getSchedulerAppInfo(
       ApplicationAttemptId applicationAttemptId) {
-    SchedulerApp app = getApplication(applicationAttemptId);
+    FiCaSchedulerApp app = getApplication(applicationAttemptId);
     return app == null ? null : new SchedulerAppReport(app);
   }
   
-  private SchedulerNode getNode(NodeId nodeId) {
+  private FiCaSchedulerNode getNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }
   
   private synchronized void addApplication(ApplicationAttemptId appAttemptId,
       String user) {
     // TODO: Fix store
-    SchedulerApp schedulerApp = 
-        new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
+    FiCaSchedulerApp schedulerApp = 
+        new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
             this.rmContext, null);
     applications.put(appAttemptId, schedulerApp);
     metrics.submitApp(user, appAttemptId.getAttemptId());
@@ -314,7 +311,7 @@ public class FifoScheduler implements Re
       ApplicationAttemptId applicationAttemptId,
       RMAppAttemptState rmAppAttemptFinalState)
       throws IOException {
-    SchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
       throw new IOException("Unknown application " + applicationAttemptId + 
       " has completed!");
@@ -347,15 +344,15 @@ public class FifoScheduler implements Re
    * 
    * @param node node on which resources are available to be allocated
    */
-  private void assignContainers(SchedulerNode node) {
+  private void assignContainers(FiCaSchedulerNode node) {
     LOG.debug("assignContainers:" +
         " node=" + node.getRMNode().getNodeAddress() + 
         " #applications=" + applications.size());
 
     // Try to assign containers to applications in fifo order
-    for (Map.Entry<ApplicationAttemptId, SchedulerApp> e : applications
+    for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : applications
         .entrySet()) {
-      SchedulerApp application = e.getValue();
+      FiCaSchedulerApp application = e.getValue();
       LOG.debug("pre-assignContainers");
       application.showRequests();
       synchronized (application) {
@@ -375,22 +372,26 @@ public class FifoScheduler implements Re
         }
       }
       
-      application.setHeadroom(clusterResource);
-      
       LOG.debug("post-assignContainers");
       application.showRequests();
 
       // Done
       if (Resources.lessThan(node.getAvailableResource(), minimumAllocation)) {
-        return;
+        break;
       }
     }
+
+    // Update the applications' headroom to correctly take into
+    // account the containers assigned in this update.
+    for (FiCaSchedulerApp application : applications.values()) {
+      application.setHeadroom(Resources.subtract(clusterResource, usedResource));
+    }
   }
 
-  private int getMaxAllocatableContainers(SchedulerApp application,
-      Priority priority, SchedulerNode node, NodeType type) {
+  private int getMaxAllocatableContainers(FiCaSchedulerApp application,
+      Priority priority, FiCaSchedulerNode node, NodeType type) {
     ResourceRequest offSwitchRequest = 
-      application.getResourceRequest(priority, SchedulerNode.ANY);
+      application.getResourceRequest(priority, FiCaSchedulerNode.ANY);
     int maxContainers = offSwitchRequest.getNumContainers();
 
     if (type == NodeType.OFF_SWITCH) {
@@ -419,8 +420,8 @@ public class FifoScheduler implements Re
   }
 
 
-  private int assignContainersOnNode(SchedulerNode node, 
-      SchedulerApp application, Priority priority 
+  private int assignContainersOnNode(FiCaSchedulerNode node, 
+      FiCaSchedulerApp application, Priority priority 
   ) {
     // Data-local
     int nodeLocalContainers = 
@@ -446,8 +447,8 @@ public class FifoScheduler implements Re
     return (nodeLocalContainers + rackLocalContainers + offSwitchContainers);
   }
 
-  private int assignNodeLocalContainers(SchedulerNode node, 
-      SchedulerApp application, Priority priority) {
+  private int assignNodeLocalContainers(FiCaSchedulerNode node, 
+      FiCaSchedulerApp application, Priority priority) {
     int assignedContainers = 0;
     ResourceRequest request = 
       application.getResourceRequest(priority, node.getRMNode().getNodeAddress());
@@ -472,15 +473,15 @@ public class FifoScheduler implements Re
     return assignedContainers;
   }
 
-  private int assignRackLocalContainers(SchedulerNode node, 
-      SchedulerApp application, Priority priority) {
+  private int assignRackLocalContainers(FiCaSchedulerNode node, 
+      FiCaSchedulerApp application, Priority priority) {
     int assignedContainers = 0;
     ResourceRequest request = 
       application.getResourceRequest(priority, node.getRMNode().getRackName());
     if (request != null) {
       // Don't allocate on this rack if the application doens't need containers
       ResourceRequest offSwitchRequest =
-          application.getResourceRequest(priority, SchedulerNode.ANY);
+          application.getResourceRequest(priority, FiCaSchedulerNode.ANY);
       if (offSwitchRequest.getNumContainers() <= 0) {
         return 0;
       }
@@ -497,11 +498,11 @@ public class FifoScheduler implements Re
     return assignedContainers;
   }
 
-  private int assignOffSwitchContainers(SchedulerNode node, 
-      SchedulerApp application, Priority priority) {
+  private int assignOffSwitchContainers(FiCaSchedulerNode node, 
+      FiCaSchedulerApp application, Priority priority) {
     int assignedContainers = 0;
     ResourceRequest request = 
-      application.getResourceRequest(priority, SchedulerNode.ANY);
+      application.getResourceRequest(priority, FiCaSchedulerNode.ANY);
     if (request != null) {
       assignedContainers = 
         assignContainer(node, application, priority, 
@@ -510,7 +511,7 @@ public class FifoScheduler implements Re
     return assignedContainers;
   }
 
-  private int assignContainer(SchedulerNode node, SchedulerApp application, 
+  private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application, 
       Priority priority, int assignableContainers, 
       ResourceRequest request, NodeType type) {
     LOG.debug("assignContainers:" +
@@ -541,11 +542,12 @@ public class FifoScheduler implements Re
 
         // If security is enabled, send the container-tokens too.
         if (UserGroupInformation.isSecurityEnabled()) {
-          ContainerTokenIdentifier tokenIdentifier = new ContainerTokenIdentifier(
-              containerId, nodeId.toString(), capability);
-          containerToken = BuilderUtils.newContainerToken(nodeId, ByteBuffer
-              .wrap(containerTokenSecretManager
-                  .createPassword(tokenIdentifier)), tokenIdentifier);
+          containerToken =
+              containerTokenSecretManager.createContainerToken(containerId,
+                nodeId, capability);
+          if (containerToken == null) {
+            return i; // Try again later.
+          }
         }
 
         // Create the container
@@ -562,11 +564,11 @@ public class FifoScheduler implements Re
         // Inform the node
         node.allocateContainer(application.getApplicationId(), 
             rmContainer);
+
+        // Update usage for this container
+        Resources.addTo(usedResource, capability);
       }
-      
-      // Update total usage
-      Resources.addTo(usedResource,
-          Resources.multiply(capability, assignedContainers));
+
     }
     
     return assignedContainers;
@@ -575,7 +577,7 @@ public class FifoScheduler implements Re
   private synchronized void nodeUpdate(RMNode rmNode, 
       List<ContainerStatus> newlyLaunchedContainers,
       List<ContainerStatus> completedContainers) {
-    SchedulerNode node = getNode(rmNode.getNodeID());
+    FiCaSchedulerNode node = getNode(rmNode.getNodeID());
     
     // Processing the newly launched containers
     for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
@@ -665,10 +667,10 @@ public class FifoScheduler implements Re
     }
   }
 
-  private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) {
+  private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
     // Get the application for the finished container
     ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
-    SchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
       LOG.info("Unknown application: " + applicationAttemptId + 
           " launched container " + containerId +
@@ -694,10 +696,10 @@ public class FifoScheduler implements Re
     // Get the application for the finished container
     Container container = rmContainer.getContainer();
     ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
-    SchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
     
     // Get the node on which the container was allocated
-    SchedulerNode node = getNode(container.getNodeId());
+    FiCaSchedulerNode node = getNode(container.getNodeId());
     
     if (application == null) {
       LOG.info("Unknown application: " + applicationAttemptId + 
@@ -727,7 +729,7 @@ public class FifoScheduler implements Re
   private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
 
   private synchronized void removeNode(RMNode nodeInfo) {
-    SchedulerNode node = getNode(nodeInfo.getNodeID());
+    FiCaSchedulerNode node = getNode(nodeInfo.getNodeID());
     if (node == null) {
       return;
     }
@@ -759,7 +761,7 @@ public class FifoScheduler implements Re
   }
 
   private synchronized void addNode(RMNode nodeManager) {
-    this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
+    this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
   }
 
@@ -776,12 +778,12 @@ public class FifoScheduler implements Re
 
   @Override
   public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) {
-    SchedulerNode node = getNode(nodeId);
+    FiCaSchedulerNode node = getNode(nodeId);
     return node == null ? null : new SchedulerNodeReport(node);
   }
   
   private RMContainer getRMContainer(ContainerId containerId) {
-    SchedulerApp application = 
+    FiCaSchedulerApp application = 
         getApplication(containerId.getApplicationAttemptId());
     return (application == null) ? null : application.getRMContainer(containerId);
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java Fri Aug  3 19:00:15 2012
@@ -18,14 +18,17 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.webapp;
 
-import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp.QUEUE_NAME;
 import static org.apache.hadoop.yarn.util.StringHelper.join;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.QUEUE_NAME;
+
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.util.StringHelper;
 import org.apache.hadoop.yarn.webapp.Controller;
+import org.apache.hadoop.yarn.webapp.WebAppException;
 import org.apache.hadoop.yarn.webapp.YarnWebParams;
 
 import com.google.inject.Inject;
@@ -62,7 +65,8 @@ public class RmController extends Contro
         RMAppState.NEW.toString(),
         RMAppState.SUBMITTED.toString(),
         RMAppState.ACCEPTED.toString(),
-        RMAppState.RUNNING.toString()));
+        RMAppState.RUNNING.toString(),
+        RMAppState.FINISHING.toString()));
 
     ResourceManager rm = getInstance(ResourceManager.class);
     ResourceScheduler rs = rm.getResourceScheduler();
@@ -71,6 +75,12 @@ public class RmController extends Contro
       render(CapacitySchedulerPage.class);
       return;
     }
+    
+    if (rs instanceof FairScheduler) {
+      context().setStatus(404);
+      throw new WebAppException("Fair Scheduler UI not yet supported");
+    }
+    
     setTitle("Default Scheduler");
     render(DefaultSchedulerPage.class);
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Fri Aug  3 19:00:15 2012
@@ -18,13 +18,16 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -33,6 +36,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 
 public class MockNM {
@@ -85,6 +89,20 @@ public class MockNM {
         b, ++responseId);
   }
 
+  public HeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
+      int containerId, ContainerState containerState) throws Exception {
+    HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
+        new HashMap<ApplicationId, List<ContainerStatus>>(1);
+    ContainerStatus amContainerStatus = BuilderUtils.newContainerStatus(
+        BuilderUtils.newContainerId(attemptId, 1),
+        ContainerState.COMPLETE, "Success", 0);
+    ArrayList<ContainerStatus> containerStatusList =
+        new ArrayList<ContainerStatus>(1);
+    containerStatusList.add(amContainerStatus);
+    nodeUpdate.put(attemptId.getApplicationId(), containerStatusList);
+    return nodeHeartbeat(nodeUpdate, true);
+  }
+
   public HeartbeatResponse nodeHeartbeat(Map<ApplicationId, 
       List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
     return nodeHeartbeat(conts, isHealthy, ++responseId);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Fri Aug  3 19:00:15 2012
@@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 
 @Private
@@ -72,7 +72,7 @@ public class NodeManager implements Cont
   Resource used = recordFactory.newRecordInstance(Resource.class);
 
   final ResourceTrackerService resourceTrackerService;
-  final SchedulerNode schedulerNode;
+  final FiCaSchedulerNode schedulerNode;
   final Map<ApplicationId, List<Container>> containers = 
     new HashMap<ApplicationId, List<Container>>();
   
@@ -98,7 +98,7 @@ public class NodeManager implements Cont
     request.setNodeId(this.nodeId);
     resourceTrackerService.registerNodeManager(request)
         .getRegistrationResponse();
-    this.schedulerNode = new SchedulerNode(rmContext.getRMNodes().get(
+    this.schedulerNode = new FiCaSchedulerNode(rmContext.getRMNodes().get(
         this.nodeId));
    
     // Sanity check

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java Fri Aug  3 19:00:15 2012
@@ -91,8 +91,11 @@ public class TestAppManager{
         rmDispatcher);
     AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(
         rmDispatcher);
+    AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
+        rmDispatcher);
     return new RMContextImpl(new MemStore(), rmDispatcher,
-        containerAllocationExpirer, amLivelinessMonitor, null, null) {
+        containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
+        null, null) {
       @Override
       public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
         return map;



Mime
View raw message