hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1537330 [7/12] - in /hadoop/common/branches/YARN-321/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/java...
Date Wed, 30 Oct 2013 22:22:36 GMT
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Wed Oct 30 22:21:59 2013
@@ -17,44 +17,385 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
 
 /**
- * Represents an Application from the viewpoint of the scheduler.
- * Each running Application in the RM corresponds to one instance
+ * Represents an application attempt from the viewpoint of the scheduler.
+ * Each running app attempt in the RM corresponds to one instance
  * of this class.
  */
 @Private
 @Unstable
 public abstract class SchedulerApplication {
+  
+  private static final Log LOG = LogFactory.getLog(SchedulerApplication.class);
+
+  protected final AppSchedulingInfo appSchedulingInfo;
+  
+  protected final Map<ContainerId, RMContainer> liveContainers =
+      new HashMap<ContainerId, RMContainer>();
+  protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
+      new HashMap<Priority, Map<NodeId, RMContainer>>();
+
+  private final Multiset<Priority> reReservations = HashMultiset.create();
+  
+  protected final Resource currentReservation = Resource.newInstance(0, 0);
+  private Resource resourceLimit = Resource.newInstance(0, 0);
+  protected final Resource currentConsumption = Resource.newInstance(0, 0);
+
+  protected List<RMContainer> newlyAllocatedContainers = 
+      new ArrayList<RMContainer>();
 
   /**
-   * Get {@link ApplicationAttemptId} of the application master.
-   * @return <code>ApplicationAttemptId</code> of the application master
+   * Count how many times the application has been given an opportunity
+   * to schedule a task at each priority. Each time the scheduler
+   * asks the application for a task at this priority, it is incremented,
+   * and each time the application successfully schedules a task, it
+   * is reset to 0.
    */
-  public abstract ApplicationAttemptId getApplicationAttemptId();
+  Multiset<Priority> schedulingOpportunities = HashMultiset.create();
+  
+  // Time of the last container scheduled at the current allowed level
+  protected Map<Priority, Long> lastScheduledContainer =
+      new HashMap<Priority, Long>();
+
+  protected final Queue queue;
+  protected boolean isStopped = false;
+  
+  protected final RMContext rmContext;
+  
+  public SchedulerApplication(ApplicationAttemptId applicationAttemptId, 
+      String user, Queue queue, ActiveUsersManager activeUsersManager,
+      RMContext rmContext) {
+    this.rmContext = rmContext;
+    this.appSchedulingInfo = 
+        new AppSchedulingInfo(applicationAttemptId, user, queue,  
+            activeUsersManager);
+    this.queue = queue;
+  }
   
   /**
    * Get the live containers of the application.
    * @return live containers of the application
    */
-  public abstract Collection<RMContainer> getLiveContainers();
+  public synchronized Collection<RMContainer> getLiveContainers() {
+    return new ArrayList<RMContainer>(liveContainers.values());
+  }
   
   /**
-   * Get the reserved containers of the application.
-   * @return the reserved containers of the application
+   * Is this application pending?
+   * @return true if it is else false.
    */
-  public abstract Collection<RMContainer> getReservedContainers();
+  public boolean isPending() {
+    return appSchedulingInfo.isPending();
+  }
   
   /**
-   * Is this application pending?
-   * @return true if it is else false.
+   * Get {@link ApplicationAttemptId} of the application master.
+   * @return <code>ApplicationAttemptId</code> of the application master
+   */
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return appSchedulingInfo.getApplicationAttemptId();
+  }
+  
+  public ApplicationId getApplicationId() {
+    return appSchedulingInfo.getApplicationId();
+  }
+  
+  public String getUser() {
+    return appSchedulingInfo.getUser();
+  }
+
+  public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
+    return appSchedulingInfo.getResourceRequests(priority);
+  }
+
+  public int getNewContainerId() {
+    return appSchedulingInfo.getNewContainerId();
+  }
+
+  public Collection<Priority> getPriorities() {
+    return appSchedulingInfo.getPriorities();
+  }
+  
+  public ResourceRequest getResourceRequest(Priority priority, String resourceName) {
+    return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
+  }
+
+  public synchronized int getTotalRequiredResources(Priority priority) {
+    return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
+  }
+
+  public Resource getResource(Priority priority) {
+    return appSchedulingInfo.getResource(priority);
+  }
+
+  public String getQueueName() {
+    return appSchedulingInfo.getQueueName();
+  }
+  
+  public synchronized RMContainer getRMContainer(ContainerId id) {
+    return liveContainers.get(id);
+  }
+
+  protected synchronized void resetReReservations(Priority priority) {
+    reReservations.setCount(priority, 0);
+  }
+
+  protected synchronized void addReReservation(Priority priority) {
+    reReservations.add(priority);
+  }
+
+  public synchronized int getReReservations(Priority priority) {
+    return reReservations.count(priority);
+  }
+
+  /**
+   * Get total current reservations.
+   * Used only by unit tests
+   * @return total current reservations
+   */
+  @Stable
+  @Private
+  public synchronized Resource getCurrentReservation() {
+    return currentReservation;
+  }
+  
+  public Queue getQueue() {
+    return queue;
+  }
+  
+  public synchronized void updateResourceRequests(
+      List<ResourceRequest> requests) {
+    if (!isStopped) {
+      appSchedulingInfo.updateResourceRequests(requests);
+    }
+  }
+  
+  public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
+    // Cleanup all scheduling information
+    isStopped = true;
+    appSchedulingInfo.stop(rmAppAttemptFinalState);
+  }
+
+  public synchronized boolean isStopped() {
+    return isStopped;
+  }
+
+  /**
+   * Get the list of reserved containers
+   * @return All of the reserved containers.
+   */
+  public synchronized List<RMContainer> getReservedContainers() {
+    List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
+    for (Map.Entry<Priority, Map<NodeId, RMContainer>> e : 
+      this.reservedContainers.entrySet()) {
+      reservedContainers.addAll(e.getValue().values());
+    }
+    return reservedContainers;
+  }
+  
+  public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
+      RMContainer rmContainer, Container container) {
+    // Create RMContainer if necessary
+    if (rmContainer == null) {
+      rmContainer = 
+          new RMContainerImpl(container, getApplicationAttemptId(), 
+              node.getNodeID(), rmContext.getDispatcher().getEventHandler(), 
+              rmContext.getContainerAllocationExpirer());
+        
+      Resources.addTo(currentReservation, container.getResource());
+      
+      // Reset the re-reservation count
+      resetReReservations(priority);
+    } else {
+      // Note down the re-reservation
+      addReReservation(priority);
+    }
+    rmContainer.handle(new RMContainerReservedEvent(container.getId(), 
+        container.getResource(), node.getNodeID(), priority));
+    
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    if (reservedContainers == null) {
+      reservedContainers = new HashMap<NodeId, RMContainer>();
+      this.reservedContainers.put(priority, reservedContainers);
+    }
+    reservedContainers.put(node.getNodeID(), rmContainer);
+    
+    LOG.info("Application " + getApplicationId() 
+        + " reserved container " + rmContainer
+        + " on node " + node + ", currently has " + reservedContainers.size()
+        + " at priority " + priority 
+        + "; currentReservation " + currentReservation.getMemory());
+    
+    return rmContainer;
+  }
+  
+  /**
+   * Has the application reserved the given <code>node</code> at the
+   * given <code>priority</code>?
+   * @param node node to be checked
+   * @param priority priority of reserved container
+   * @return true is reserved, false if not
+   */
+  public synchronized boolean isReserved(SchedulerNode node, Priority priority) {
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    if (reservedContainers != null) {
+      return reservedContainers.containsKey(node.getNodeID());
+    }
+    return false;
+  }
+  
+  public synchronized void setHeadroom(Resource globalLimit) {
+    this.resourceLimit = globalLimit; 
+  }
+
+  /**
+   * Get available headroom in terms of resources for the application's user.
+   * @return available resource headroom
+   */
+  public synchronized Resource getHeadroom() {
+    // Corner case to deal with applications being slightly over-limit
+    if (resourceLimit.getMemory() < 0) {
+      resourceLimit.setMemory(0);
+    }
+    
+    return resourceLimit;
+  }
+  
+  public synchronized int getNumReservedContainers(Priority priority) {
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    return (reservedContainers == null) ? 0 : reservedContainers.size();
+  }
+  
+  @SuppressWarnings("unchecked")
+  public synchronized void containerLaunchedOnNode(ContainerId containerId,
+      NodeId nodeId) {
+    // Inform the container
+    RMContainer rmContainer = getRMContainer(containerId);
+    if (rmContainer == null) {
+      // Some unknown container sneaked into the system. Kill it.
+      rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
+      return;
+    }
+
+    rmContainer.handle(new RMContainerEvent(containerId,
+        RMContainerEventType.LAUNCHED));
+  }
+  
+  public synchronized void showRequests() {
+    if (LOG.isDebugEnabled()) {
+      for (Priority priority : getPriorities()) {
+        Map<String, ResourceRequest> requests = getResourceRequests(priority);
+        if (requests != null) {
+          LOG.debug("showRequests:" + " application=" + getApplicationId() + 
+              " headRoom=" + getHeadroom() + 
+              " currentConsumption=" + currentConsumption.getMemory());
+          for (ResourceRequest request : requests.values()) {
+            LOG.debug("showRequests:" + " application=" + getApplicationId()
+                + " request=" + request);
+          }
+        }
+      }
+    }
+  }
+  
+  public Resource getCurrentConsumption() {
+    return currentConsumption;
+  }
+
+  public synchronized List<Container> pullNewlyAllocatedContainers() {
+    List<Container> returnContainerList = new ArrayList<Container>(
+        newlyAllocatedContainers.size());
+    for (RMContainer rmContainer : newlyAllocatedContainers) {
+      rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
+          RMContainerEventType.ACQUIRED));
+      returnContainerList.add(rmContainer.getContainer());
+    }
+    newlyAllocatedContainers.clear();
+    return returnContainerList;
+  }
+
+  public synchronized void updateBlacklist(
+      List<String> blacklistAdditions, List<String> blacklistRemovals) {
+    if (!isStopped) {
+      this.appSchedulingInfo.updateBlacklist(
+          blacklistAdditions, blacklistRemovals);
+    }
+  }
+  
+  public boolean isBlacklisted(String resourceName) {
+    return this.appSchedulingInfo.isBlacklisted(resourceName);
+  }
+
+  public synchronized void addSchedulingOpportunity(Priority priority) {
+    schedulingOpportunities.setCount(priority,
+        schedulingOpportunities.count(priority) + 1);
+  }
+  
+  public synchronized void subtractSchedulingOpportunity(Priority priority) {
+    int count = schedulingOpportunities.count(priority) - 1;
+    this.schedulingOpportunities.setCount(priority, Math.max(count,  0));
+  }
+
+  /**
+   * Return the number of times the application has been given an opportunity
+   * to schedule a task at the given priority since the last time it
+   * successfully did so.
+   */
+  public synchronized int getSchedulingOpportunities(Priority priority) {
+    return schedulingOpportunities.count(priority);
+  }
+  
+  /**
+   * Should be called when an application has successfully scheduled a container,
+   * or when the scheduling locality threshold is relaxed.
+   * Reset various internal counters which affect delay scheduling
+   *
+   * @param priority The priority of the container scheduled.
    */
-  public abstract boolean isPending();
+  public synchronized void resetSchedulingOpportunities(Priority priority) {
+    resetSchedulingOpportunities(priority, System.currentTimeMillis());
+  }
+  // used for continuous scheduling
+  public synchronized void resetSchedulingOpportunities(Priority priority,
+      long currentTimeMs) {
+    lastScheduledContainer.put(priority, currentTimeMs);
+    schedulingOpportunities.setCount(priority, 0);
+  }
 
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Wed Oct 30 22:21:59 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
@@ -67,4 +68,16 @@ public abstract class SchedulerNode {
    */
   public abstract int getNumContainers();
 
+  /**
+   * Get total resources on the node.
+   * @return total resources on the node.
+   */
+  public abstract Resource getTotalResource();
+  
+  /**
+   * Get the ID of the node which contains both its hostname and port.
+   * @return the ID of the node
+   */
+  public abstract NodeId getNodeID();
+
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java Wed Oct 30 22:21:59 2013
@@ -63,11 +63,11 @@ public class SchedulerUtils {
   
   public static final String UNRESERVED_CONTAINER =
       "Container reservation no longer required.";
-  
+
   /**
    * Utility to create a {@link ContainerStatus} during exceptional
    * circumstances.
-   * 
+   *
    * @param containerId {@link ContainerId} of returned/released/lost container.
    * @param diagnostics diagnostic message
    * @return <code>ContainerStatus</code> for an returned/released/lost 
@@ -75,12 +75,41 @@ public class SchedulerUtils {
    */
   public static ContainerStatus createAbnormalContainerStatus(
       ContainerId containerId, String diagnostics) {
+    return createAbnormalContainerStatus(containerId, 
+        ContainerExitStatus.ABORTED, diagnostics);
+  }
+
+  /**
+   * Utility to create a {@link ContainerStatus} during exceptional
+   * circumstances.
+   *
+   * @param containerId {@link ContainerId} of returned/released/lost container.
+   * @param diagnostics diagnostic message
+   * @return <code>ContainerStatus</code> for an returned/released/lost
+   *         container
+   */
+  public static ContainerStatus createPreemptedContainerStatus(
+      ContainerId containerId, String diagnostics) {
+    return createAbnormalContainerStatus(containerId, 
+        ContainerExitStatus.PREEMPTED, diagnostics);
+  }
+
+  /**
+   * Utility to create a {@link ContainerStatus} during exceptional
+   * circumstances.
+   * 
+   * @param containerId {@link ContainerId} of returned/released/lost container.
+   * @param diagnostics diagnostic message
+   * @return <code>ContainerStatus</code> for an returned/released/lost 
+   *         container
+   */
+  private static ContainerStatus createAbnormalContainerStatus(
+      ContainerId containerId, int exitStatus, String diagnostics) {
     ContainerStatus containerStatus = 
         recordFactory.newRecordInstance(ContainerStatus.class);
     containerStatus.setContainerId(containerId);
     containerStatus.setDiagnostics(diagnostics);
-    containerStatus.setExitStatus(
-        ContainerExitStatus.ABORTED);
+    containerStatus.setExitStatus(exitStatus);
     containerStatus.setState(ContainerState.COMPLETE);
     return containerStatus;
   }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Wed Oct 30 22:21:59 2013
@@ -25,9 +25,11 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -134,4 +136,17 @@ public interface YarnScheduler extends E
   @LimitedPrivate("yarn")
   @Evolving
   QueueMetrics getRootQueueMetrics();
+
+  /**
+   * Check if the user has permission to perform the operation.
+   * If the user has {@link QueueACL#ADMINISTER_QUEUE} permission,
+   * this user can view/modify the applications in this queue
+   * @param callerUGI
+   * @param acl
+   * @param queueName
+   * @return <code>true</code> if the user has the permission,
+   *         <code>false</code> otherwise
+   */
+  boolean checkAccess(UserGroupInformation callerUGI,
+      QueueACL acl, String queueName);
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java Wed Oct 30 22:21:59 2013
@@ -99,15 +99,11 @@ class CSQueueUtils {
           Resources.divide(calculator, clusterResource, 
               usedResources, queueLimit);
     }
-    
+
     childQueue.setUsedCapacity(usedCapacity);
     childQueue.setAbsoluteUsedCapacity(absoluteUsedCapacity);
     
-    Resource available = 
-        Resources.roundUp(
-            calculator, 
-            Resources.subtract(queueLimit, usedResources), 
-            minimumAllocation);
+    Resource available = Resources.subtract(queueLimit, usedResources);
     childQueue.getMetrics().setAvailableResourcesToQueue(
         Resources.max(
             calculator, 

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Wed Oct 30 22:21:59 2013
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -80,6 +81,8 @@ import org.apache.hadoop.yarn.server.uti
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.annotations.VisibleForTesting;
+
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
@@ -179,7 +182,8 @@ public class CapacityScheduler
   private Resource minimumAllocation;
   private Resource maximumAllocation;
 
-  private Map<ApplicationAttemptId, FiCaSchedulerApp> applications = 
+  @VisibleForTesting
+  protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications = 
       new ConcurrentHashMap<ApplicationAttemptId, FiCaSchedulerApp>();
 
   private boolean initialized = false;
@@ -568,8 +572,7 @@ public class CapacityScheduler
         application.showRequests();
   
         // Update application requests
-        application.updateResourceRequests(ask, 
-            blacklistAdditions, blacklistRemovals);
+        application.updateResourceRequests(ask);
   
         LOG.debug("allocate: post-update");
         application.showRequests();
@@ -581,6 +584,8 @@ public class CapacityScheduler
           " #ask=" + ask.size());
       }
 
+      application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+
       return application.getAllocation(getResourceCalculator(),
                    clusterResource, getMinimumResourceCapability());
     }
@@ -904,10 +909,24 @@ public class CapacityScheduler
       LOG.debug("KILL_CONTAINER: container" + cont.toString());
     }
     completedContainer(cont,
-        SchedulerUtils.createAbnormalContainerStatus(
+        SchedulerUtils.createPreemptedContainerStatus(
             cont.getContainerId(),"Container being forcibly preempted:"
         + cont.getContainerId()),
         RMContainerEventType.KILL);
   }
 
+  @Override
+  public synchronized boolean checkAccess(UserGroupInformation callerUGI,
+      QueueACL acl, String queueName) {
+    CSQueue queue = getQueue(queueName);
+    if (queue == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("ACL not found for queue access-type " + acl
+            + " for queue " + queueName);
+      }
+      return false;
+    }
+    return queue.hasAccess(acl, callerUGI);
+  }
+
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Oct 30 22:21:59 2013
@@ -57,9 +57,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
@@ -527,11 +527,6 @@ public class LeafQueue implements CSQueu
   }
 
   @Override
-  public synchronized Map<QueueACL, AccessControlList> getQueueAcls() {
-    return new HashMap<QueueACL, AccessControlList>(acls);
-  }
-
-  @Override
   public synchronized QueueInfo getQueueInfo(
       boolean includeChildQueues, boolean recursive) {
     queueInfo.setCurrentCapacity(usedCapacity);
@@ -644,7 +639,8 @@ public class LeafQueue implements CSQueu
 
     // Check queue ACLs
     UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(userName);
-    if (!hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)) {
+    if (!hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
+        && !hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
       throw new AccessControlException("User " + userName + " cannot submit" +
           " applications to queue " + getQueuePath());
     }
@@ -827,7 +823,7 @@ public class LeafQueue implements CSQueu
 
       synchronized (application) {
         // Check if this resource is on the blacklist
-        if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) {
+        if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
           continue;
         }
         
@@ -1308,9 +1304,15 @@ public class LeafQueue implements CSQueu
         + " request=" + request + " type=" + type);
     }
     Resource capability = request.getCapability();
-
     Resource available = node.getAvailableResource();
+    Resource totalResource = node.getTotalResource();
 
+    if (!Resources.fitsIn(capability, totalResource)) {
+      LOG.warn("Node : " + node.getNodeID()
+          + " does not have sufficient resource for request : " + request
+          + " node total capability : " + node.getTotalResource());
+      return Resources.none();
+    }
     assert Resources.greaterThan(
         resourceCalculator, clusterResource, available, Resources.none());
 
@@ -1603,9 +1605,12 @@ public class LeafQueue implements CSQueu
 
   }
 
-  // need to access the list of apps from the preemption monitor
+  /**
+   * Obtain (read-only) collection of active applications.
+   */
   public Set<FiCaSchedulerApp> getApplications() {
-    return Collections.unmodifiableSet(activeApplications);
+    // need to access the list of apps from the preemption monitor
+    return activeApplications;
   }
 
   // return a single Resource capturing the overal amount of pending resources

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Wed Oct 30 22:21:59 2013
@@ -300,11 +300,6 @@ public class ParentQueue implements CSQu
   }
 
   @Override
-  public synchronized Map<QueueACL, AccessControlList> getQueueAcls() {
-    return new HashMap<QueueACL, AccessControlList>(acls);
-  }
-
-  @Override
   public synchronized QueueInfo getQueueInfo( 
       boolean includeChildQueues, boolean recursive) {
     queueInfo.setCurrentCapacity(usedCapacity);

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Wed Oct 30 22:21:59 2013
@@ -18,22 +18,16 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -41,188 +35,39 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
-
 /**
- * Represents an Application from the viewpoint of the scheduler.
- * Each running Application in the RM corresponds to one instance
- * of this class.
+ * Represents an application attempt from the viewpoint of the FIFO or Capacity
+ * scheduler.
  */
-@SuppressWarnings("unchecked")
 @Private
 @Unstable
 public class FiCaSchedulerApp extends SchedulerApplication {
 
   private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
 
-  private final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
-
-  private final AppSchedulingInfo appSchedulingInfo;
-  private final Queue queue;
-
-  private final Resource currentConsumption = recordFactory
-      .newRecordInstance(Resource.class);
-  private Resource resourceLimit = recordFactory
-      .newRecordInstance(Resource.class);
-
-  private Map<ContainerId, RMContainer> liveContainers =
-    new HashMap<ContainerId, RMContainer>();
-  private List<RMContainer> newlyAllocatedContainers =
-    new ArrayList<RMContainer>();
-
-  final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
-      new HashMap<Priority, Map<NodeId, RMContainer>>();
-
-  private boolean isStopped = false;
-
   private final Set<ContainerId> containersToPreempt =
     new HashSet<ContainerId>();
 
-  /**
-   * Count how many times the application has been given an opportunity
-   * to schedule a task at each priority. Each time the scheduler
-   * asks the application for a task at this priority, it is incremented,
-   * and each time the application successfully schedules a task, it
-   * is reset to 0.
-   */
-  Multiset<Priority> schedulingOpportunities = HashMultiset.create();
-  
-  Multiset<Priority> reReservations = HashMultiset.create();
-
-  Resource currentReservation = recordFactory
-      .newRecordInstance(Resource.class);
-
-  private final RMContext rmContext;
   public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, 
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       RMContext rmContext) {
-    this.rmContext = rmContext;
-    this.appSchedulingInfo = 
-        new AppSchedulingInfo(applicationAttemptId, user, queue,  
-            activeUsersManager);
-    this.queue = queue;
-  }
-
-  public ApplicationId getApplicationId() {
-    return this.appSchedulingInfo.getApplicationId();
-  }
-
-  @Override
-  public ApplicationAttemptId getApplicationAttemptId() {
-    return this.appSchedulingInfo.getApplicationAttemptId();
-  }
-
-  public String getUser() {
-    return this.appSchedulingInfo.getUser();
-  }
-
-  public synchronized void updateResourceRequests(
-      List<ResourceRequest> requests, 
-      List<String> blacklistAdditions, List<String> blacklistRemovals) {
-    if (!isStopped) {
-      this.appSchedulingInfo.updateResourceRequests(requests, 
-          blacklistAdditions, blacklistRemovals);
-    }
-  }
-
-  public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
-    return this.appSchedulingInfo.getResourceRequests(priority);
-  }
-
-  public int getNewContainerId() {
-    return this.appSchedulingInfo.getNewContainerId();
-  }
-  
-  public Collection<Priority> getPriorities() {
-    return this.appSchedulingInfo.getPriorities();
-  }
-
-  public ResourceRequest getResourceRequest(Priority priority, String resourceName) {
-    return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
-  }
-
-  public synchronized int getTotalRequiredResources(Priority priority) {
-    return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
-  }
-  
-  public Resource getResource(Priority priority) {
-    return this.appSchedulingInfo.getResource(priority);
-  }
-  
-  public boolean isBlacklisted(String resourceName) {
-    return this.appSchedulingInfo.isBlacklisted(resourceName);
-  }
-
-  /**
-   * Is this application pending?
-   * @return true if it is else false.
-   */
-  @Override
-  public boolean isPending() {
-    return this.appSchedulingInfo.isPending();
-  }
-
-  public synchronized boolean isStopped() {
-    return this.isStopped;
-  }
-
-  public String getQueueName() {
-    return this.appSchedulingInfo.getQueueName();
-  }
-
-  /**
-   * Get the list of live containers
-   * @return All of the live containers
-   */
-  @Override
-  public synchronized Collection<RMContainer> getLiveContainers() {
-    return new ArrayList<RMContainer>(liveContainers.values());
-  }
-
-  public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
-    // Cleanup all scheduling information
-    this.isStopped = true;
-    this.appSchedulingInfo.stop(rmAppAttemptFinalState);
-  }
-
-  public synchronized void containerLaunchedOnNode(ContainerId containerId,
-      NodeId nodeId) {
-    // Inform the container
-    RMContainer rmContainer = 
-        getRMContainer(containerId);
-    if (rmContainer == null) {
-      // Some unknown container sneaked into the system. Kill it.
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
-      return;
-    }
-
-    rmContainer.handle(new RMContainerEvent(containerId,
-      RMContainerEventType.LAUNCHED));
+    super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
   }
 
   synchronized public boolean containerCompleted(RMContainer rmContainer,
@@ -304,133 +149,6 @@ public class FiCaSchedulerApp extends Sc
     
     return rmContainer;
   }
-  
-  synchronized public List<Container> pullNewlyAllocatedContainers() {
-    List<Container> returnContainerList = new ArrayList<Container>(
-        newlyAllocatedContainers.size());
-    for (RMContainer rmContainer : newlyAllocatedContainers) {
-      rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
-          RMContainerEventType.ACQUIRED));
-      returnContainerList.add(rmContainer.getContainer());
-    }
-    newlyAllocatedContainers.clear();
-    return returnContainerList;
-  }
-
-  public Resource getCurrentConsumption() {
-    return this.currentConsumption;
-  }
-
-  synchronized public void showRequests() {
-    if (LOG.isDebugEnabled()) {
-      for (Priority priority : getPriorities()) {
-        Map<String, ResourceRequest> requests = getResourceRequests(priority);
-        if (requests != null) {
-          LOG.debug("showRequests:" + " application=" + getApplicationId() + 
-              " headRoom=" + getHeadroom() + 
-              " currentConsumption=" + currentConsumption.getMemory());
-          for (ResourceRequest request : requests.values()) {
-            LOG.debug("showRequests:" + " application=" + getApplicationId()
-                + " request=" + request);
-          }
-        }
-      }
-    }
-  }
-
-  public synchronized RMContainer getRMContainer(ContainerId id) {
-    return liveContainers.get(id);
-  }
-
-  synchronized public void resetSchedulingOpportunities(Priority priority) {
-    this.schedulingOpportunities.setCount(priority, 0);
-  }
-
-  synchronized public void addSchedulingOpportunity(Priority priority) {
-    this.schedulingOpportunities.setCount(priority,
-        schedulingOpportunities.count(priority) + 1);
-  }
-
-  synchronized public void subtractSchedulingOpportunity(Priority priority) {
-    int count = schedulingOpportunities.count(priority) - 1;
-    this.schedulingOpportunities.setCount(priority, Math.max(count,  0));
-  }
-  
-  /**
-   * @param priority Target priority
-   * @return the number of times the application has been given an opportunity
-   * to schedule a task at the given priority since the last time it
-   * successfully did so.
-   */
-  synchronized public int getSchedulingOpportunities(Priority priority) {
-    return this.schedulingOpportunities.count(priority);
-  }
-
-  synchronized void resetReReservations(Priority priority) {
-    this.reReservations.setCount(priority, 0);
-  }
-
-  synchronized void addReReservation(Priority priority) {
-    this.reReservations.add(priority);
-  }
-
-  synchronized public int getReReservations(Priority priority) {
-    return this.reReservations.count(priority);
-  }
-
-  public synchronized int getNumReservedContainers(Priority priority) {
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
-    return (reservedContainers == null) ? 0 : reservedContainers.size();
-  }
-  
-  /**
-   * Get total current reservations.
-   * Used only by unit tests
-   * @return total current reservations
-   */
-  @Stable
-  @Private
-  public synchronized Resource getCurrentReservation() {
-    return currentReservation;
-  }
-
-  public synchronized RMContainer reserve(FiCaSchedulerNode node, Priority priority,
-      RMContainer rmContainer, Container container) {
-    // Create RMContainer if necessary
-    if (rmContainer == null) {
-      rmContainer = 
-          new RMContainerImpl(container, getApplicationAttemptId(), 
-              node.getNodeID(), rmContext.getDispatcher().getEventHandler(), 
-              rmContext.getContainerAllocationExpirer());
-        
-      Resources.addTo(currentReservation, container.getResource());
-      
-      // Reset the re-reservation count
-      resetReReservations(priority);
-    } else {
-      // Note down the re-reservation
-      addReReservation(priority);
-    }
-    rmContainer.handle(new RMContainerReservedEvent(container.getId(), 
-        container.getResource(), node.getNodeID(), priority));
-    
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
-    if (reservedContainers == null) {
-      reservedContainers = new HashMap<NodeId, RMContainer>();
-      this.reservedContainers.put(priority, reservedContainers);
-    }
-    reservedContainers.put(node.getNodeID(), rmContainer);
-    
-    LOG.info("Application " + getApplicationId() 
-        + " reserved container " + rmContainer
-        + " on node " + node + ", currently has " + reservedContainers.size()
-        + " at priority " + priority 
-        + "; currentReservation " + currentReservation.getMemory());
-    
-    return rmContainer;
-  }
 
   public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) {
     Map<NodeId, RMContainer> reservedContainers =
@@ -464,22 +182,6 @@ public class FiCaSchedulerApp extends Sc
     return false;
   }
 
-  /**
-   * Has the application reserved the given <code>node</code> at the
-   * given <code>priority</code>?
-   * @param node node to be checked
-   * @param priority priority of reserved container
-   * @return true is reserved, false if not
-   */
-  public synchronized boolean isReserved(FiCaSchedulerNode node, Priority priority) {
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
-    if (reservedContainers != null) {
-      return reservedContainers.containsKey(node.getNodeID());
-    }
-    return false;
-  }
-
   public synchronized float getLocalityWaitFactor(
       Priority priority, int clusterNodes) {
     // Estimate: Required unique resources (i.e. hosts + racks)
@@ -491,41 +193,6 @@ public class FiCaSchedulerApp extends Sc
     return Math.min(((float)requiredResources / clusterNodes), 1.0f);
   }
 
-  /**
-   * Get the list of reserved containers
-   * @return All of the reserved containers.
-   */
-  @Override
-  public synchronized List<RMContainer> getReservedContainers() {
-    List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
-    for (Map.Entry<Priority, Map<NodeId, RMContainer>> e : 
-      this.reservedContainers.entrySet()) {
-      reservedContainers.addAll(e.getValue().values());
-    }
-    return reservedContainers;
-  }
-  
-  public synchronized void setHeadroom(Resource globalLimit) {
-    this.resourceLimit = globalLimit; 
-  }
-
-  /**
-   * Get available headroom in terms of resources for the application's user.
-   * @return available resource headroom
-   */
-  public synchronized Resource getHeadroom() {
-    // Corner case to deal with applications being slightly over-limit
-    if (resourceLimit.getMemory() < 0) {
-      resourceLimit.setMemory(0);
-    }
-    
-    return resourceLimit;
-  }
-
-  public Queue getQueue() {
-    return queue;
-  }
-
   public Resource getTotalPendingRequests() {
     Resource ret = Resource.newInstance(0, 0);
     for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Wed Oct 30 22:21:59 2013
@@ -49,6 +49,7 @@ public class FiCaSchedulerNode extends S
 
   private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
   private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
+  private Resource totalResourceCapability;
 
   private volatile int numContainers;
 
@@ -65,6 +66,9 @@ public class FiCaSchedulerNode extends S
     this.rmNode = node;
     this.availableResource.setMemory(node.getTotalCapability().getMemory());
     this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores());
+    totalResourceCapability =
+        Resource.newInstance(node.getTotalCapability().getMemory(), node
+            .getTotalCapability().getVirtualCores());
     if (usePortForNodeName) {
       nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
     } else {
@@ -126,6 +130,11 @@ public class FiCaSchedulerNode extends S
     return this.usedResource;
   }
 
+  @Override
+  public Resource getTotalResource() {
+    return this.totalResourceCapability;
+  }
+
   private synchronized boolean isValidContainer(Container c) {    
     if (launchedContainers.containsKey(c.getId()))
       return true;

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Wed Oct 30 22:21:59 2013
@@ -192,10 +192,6 @@ public class AppSchedulable extends Sche
       RMContainer rmContainer = app.reserve(node, priority, null,
           container);
       node.reserveResource(app, priority, rmContainer);
-      getMetrics().reserveResource(app.getUser(),
-          container.getResource());
-      scheduler.getRootQueueMetrics().reserveResource(app.getUser(),
-          container.getResource());
     }
 
     else {
@@ -216,8 +212,6 @@ public class AppSchedulable extends Sche
     node.unreserveResource(app);
     getMetrics().unreserveResource(
         app.getUser(), rmContainer.getContainer().getResource());
-    scheduler.getRootQueueMetrics().unreserveResource(
-        app.getUser(), rmContainer.getContainer().getResource());
   }
 
   /**
@@ -274,7 +268,9 @@ public class AppSchedulable extends Sche
   }
 
   private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
-    LOG.info("Node offered to app: " + getName() + " reserved: " + reserved);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved);
+    }
 
     if (reserved) {
       RMContainer rmContainer = node.getReservedContainer();
@@ -316,10 +312,19 @@ public class AppSchedulable extends Sche
               + localRequest);
         }
         
-        NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
-            scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(),
-            scheduler.getRackLocalityThreshold());
-        
+        NodeType allowedLocality;
+        if (scheduler.isContinuousSchedulingEnabled()) {
+          allowedLocality = app.getAllowedLocalityLevelByTime(priority,
+                  scheduler.getNodeLocalityDelayMs(),
+                  scheduler.getRackLocalityDelayMs(),
+                  scheduler.getClock().getTime());
+        } else {
+          allowedLocality = app.getAllowedLocalityLevel(priority,
+                  scheduler.getNumClusterNodes(),
+                  scheduler.getNodeLocalityThreshold(),
+                  scheduler.getRackLocalityThreshold());
+        }
+
         if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
             && localRequest != null && localRequest.getNumContainers() != 0) {
           return assignContainer(node, priority,

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Wed Oct 30 22:21:59 2013
@@ -24,17 +24,16 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 @Private
@@ -146,7 +145,7 @@ public class FSLeafQueue extends FSQueue
   public Resource assignContainer(FSSchedulerNode node) {
     Resource assigned = Resources.none();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Node offered to queue: " + getName());
+      LOG.debug("Node " + node.getNodeName() + " offered to queue: " + getName());
     }
 
     if (!assignContainerPreCheck(node)) {
@@ -157,6 +156,10 @@ public class FSLeafQueue extends FSQueue
     Collections.sort(appScheds, comparator);
     for (AppSchedulable sched : appScheds) {
       if (sched.getRunnable()) {
+        if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) {
+          continue;
+        }
+
         assigned = sched.assignContainer(node);
         if (!assigned.equals(Resources.none())) {
           break;
@@ -177,8 +180,7 @@ public class FSLeafQueue extends FSQueue
       recordFactory.newRecordInstance(QueueUserACLInfo.class);
     List<QueueACL> operations = new ArrayList<QueueACL>();
     for (QueueACL operation : QueueACL.values()) {
-      Map<QueueACL, AccessControlList> acls = queueMgr.getQueueAcls(getName());
-      if (acls.get(operation).isUserAllowed(user)) {
+      if (hasAccess(operation, user)) {
         operations.add(operation);
       }
     }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Wed Oct 30 22:21:59 2013
@@ -20,13 +20,10 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -136,12 +133,6 @@ public abstract class FSQueue extends Sc
   }
   
   @Override
-  public Map<QueueACL, AccessControlList> getQueueAcls() {
-    Map<QueueACL, AccessControlList> acls = queueMgr.getQueueAcls(getName());
-    return new HashMap<QueueACL, AccessControlList>(acls);
-  }
-  
-  @Override
   public FSQueueMetrics getMetrics() {
     return metrics;
   }
@@ -154,7 +145,7 @@ public abstract class FSQueue extends Sc
   
   public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
     // Check if the leaf-queue allows access
-    if (queueMgr.getQueueAcls(getName()).get(acl).isUserAllowed(user)) {
+    if (queueMgr.getQueueAcl(getName(), acl).isUserAllowed(user)) {
       return true;
     }
 

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java Wed Oct 30 22:21:59 2013
@@ -21,12 +21,14 @@ package org.apache.hadoop.yarn.server.re
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 
+@Metrics(context="yarn")
 public class FSQueueMetrics extends QueueMetrics {
 
   @Metric("Fair share of memory in MB") MutableGaugeInt fairShareMB;

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Wed Oct 30 22:21:59 2013
@@ -18,10 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -30,7 +27,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -38,92 +34,39 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
-
+/**
+ * Represents an application attempt from the viewpoint of the Fair Scheduler.
+ */
 @Private
 @Unstable
 public class FSSchedulerApp extends SchedulerApplication {
 
   private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
 
-  private final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
-
-  private final AppSchedulingInfo appSchedulingInfo;
   private AppSchedulable appSchedulable;
-  private final Queue queue;
-
-  private final Resource currentConsumption = recordFactory
-      .newRecordInstance(Resource.class);
-  private Resource resourceLimit = recordFactory
-      .newRecordInstance(Resource.class);
-
-  private Map<ContainerId, RMContainer> liveContainers
-  = new HashMap<ContainerId, RMContainer>();
-  private List<RMContainer> newlyAllocatedContainers = 
-      new ArrayList<RMContainer>();
-
-  final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
-      new HashMap<Priority, Map<NodeId, RMContainer>>();
 
   final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
-
-  /**
-   * Count how many times the application has been given an opportunity
-   * to schedule a task at each priority. Each time the scheduler
-   * asks the application for a task at this priority, it is incremented,
-   * and each time the application successfully schedules a task, it
-   * is reset to 0.
-   */
-  Multiset<Priority> schedulingOpportunities = HashMultiset.create();
   
-  Multiset<Priority> reReservations = HashMultiset.create();
-
-  Resource currentReservation = recordFactory
-      .newRecordInstance(Resource.class);
-
-  private final RMContext rmContext;
   public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, 
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       RMContext rmContext) {
-    this.rmContext = rmContext;
-    this.appSchedulingInfo = 
-        new AppSchedulingInfo(applicationAttemptId, user, queue,  
-            activeUsersManager);
-    this.queue = queue;
+    super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
   }
 
-  public ApplicationId getApplicationId() {
-    return appSchedulingInfo.getApplicationId();
-  }
-
-  @Override
-  public ApplicationAttemptId getApplicationAttemptId() {
-    return appSchedulingInfo.getApplicationAttemptId();
-  }
-  
   public void setAppSchedulable(AppSchedulable appSchedulable) {
     this.appSchedulable = appSchedulable;
   }
@@ -132,83 +75,6 @@ public class FSSchedulerApp extends Sche
     return appSchedulable;
   }
 
-  public String getUser() {
-    return appSchedulingInfo.getUser();
-  }
-
-  public synchronized void updateResourceRequests(
-      List<ResourceRequest> requests) {
-    this.appSchedulingInfo.updateResourceRequests(requests, null, null);
-  }
-
-  public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
-    return appSchedulingInfo.getResourceRequests(priority);
-  }
-
-  public int getNewContainerId() {
-    return appSchedulingInfo.getNewContainerId();
-  }
-  
-  public Collection<Priority> getPriorities() {
-    return appSchedulingInfo.getPriorities();
-  }
-
-  public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) {
-    return appSchedulingInfo.getResourceRequest(priority, nodeAddress);
-  }
-
-  public synchronized int getTotalRequiredResources(Priority priority) {
-    return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
-  }
-  
-  public Resource getResource(Priority priority) {
-    return appSchedulingInfo.getResource(priority);
-  }
-
-  /**
-   * Is this application pending?
-   * @return true if it is else false.
-   */
-  @Override
-  public boolean isPending() {
-    return appSchedulingInfo.isPending();
-  }
-
-  public String getQueueName() {
-    return appSchedulingInfo.getQueueName();
-  }
-
-  /**
-   * Get the list of live containers
-   * @return All of the live containers
-   */
-  @Override
-  public synchronized Collection<RMContainer> getLiveContainers() {
-    return new ArrayList<RMContainer>(liveContainers.values());
-  }
-
-  public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
-    // Cleanup all scheduling information
-    appSchedulingInfo.stop(rmAppAttemptFinalState);
-  }
-
-  @SuppressWarnings("unchecked")
-  public synchronized void containerLaunchedOnNode(ContainerId containerId,
-      NodeId nodeId) {
-    // Inform the container
-    RMContainer rmContainer = 
-        getRMContainer(containerId);
-    if (rmContainer == null) {
-      // Some unknown container sneaked into the system. Kill it.
-      rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
-      return;
-    }
-
-    rmContainer.handle(new RMContainerEvent(containerId,
-      RMContainerEventType.LAUNCHED));
-  }
-
   synchronized public void containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {
     
@@ -241,122 +107,6 @@ public class FSSchedulerApp extends Sche
     preemptionMap.remove(rmContainer);
   }
 
-  synchronized public List<Container> pullNewlyAllocatedContainers() {
-    List<Container> returnContainerList = new ArrayList<Container>(
-        newlyAllocatedContainers.size());
-    for (RMContainer rmContainer : newlyAllocatedContainers) {
-      rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
-          RMContainerEventType.ACQUIRED));
-      returnContainerList.add(rmContainer.getContainer());
-    }
-    newlyAllocatedContainers.clear();
-    return returnContainerList;
-  }
-
-  public Resource getCurrentConsumption() {
-    return this.currentConsumption;
-  }
-
-  synchronized public void showRequests() {
-    if (LOG.isDebugEnabled()) {
-      for (Priority priority : getPriorities()) {
-        Map<String, ResourceRequest> requests = getResourceRequests(priority);
-        if (requests != null) {
-          LOG.debug("showRequests:" + " application=" + getApplicationId() + 
-              " headRoom=" + getHeadroom() + 
-              " currentConsumption=" + currentConsumption.getMemory());
-          for (ResourceRequest request : requests.values()) {
-            LOG.debug("showRequests:" + " application=" + getApplicationId()
-                + " request=" + request);
-          }
-        }
-      }
-    }
-  }
-
-  public synchronized RMContainer getRMContainer(ContainerId id) {
-    return liveContainers.get(id);
-  }
-
-  synchronized public void addSchedulingOpportunity(Priority priority) {
-    schedulingOpportunities.setCount(priority,
-        schedulingOpportunities.count(priority) + 1);
-  }
-
-  /**
-   * Return the number of times the application has been given an opportunity
-   * to schedule a task at the given priority since the last time it
-   * successfully did so.
-   */
-  synchronized public int getSchedulingOpportunities(Priority priority) {
-    return schedulingOpportunities.count(priority);
-  }
-
-  synchronized void resetReReservations(Priority priority) {
-    reReservations.setCount(priority, 0);
-  }
-
-  synchronized void addReReservation(Priority priority) {
-    reReservations.add(priority);
-  }
-
-  synchronized public int getReReservations(Priority priority) {
-    return reReservations.count(priority);
-  }
-
-  public synchronized int getNumReservedContainers(Priority priority) {
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
-    return (reservedContainers == null) ? 0 : reservedContainers.size();
-  }
-  
-  /**
-   * Get total current reservations.
-   * Used only by unit tests
-   * @return total current reservations
-   */
-  @VisibleForTesting
-  public synchronized Resource getCurrentReservation() {
-    return currentReservation;
-  }
-
-  public synchronized RMContainer reserve(FSSchedulerNode node, Priority priority,
-      RMContainer rmContainer, Container container) {
-    // Create RMContainer if necessary
-    if (rmContainer == null) {
-      rmContainer = 
-          new RMContainerImpl(container, getApplicationAttemptId(), 
-              node.getNodeID(), rmContext.getDispatcher().getEventHandler(), 
-              rmContext.getContainerAllocationExpirer());
-        
-      Resources.addTo(currentReservation, container.getResource());
-      
-      // Reset the re-reservation count
-      resetReReservations(priority);
-    } else {
-      // Note down the re-reservation
-      addReReservation(priority);
-    }
-    rmContainer.handle(new RMContainerReservedEvent(container.getId(), 
-        container.getResource(), node.getNodeID(), priority));
-    
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
-    if (reservedContainers == null) {
-      reservedContainers = new HashMap<NodeId, RMContainer>();
-      this.reservedContainers.put(priority, reservedContainers);
-    }
-    reservedContainers.put(node.getNodeID(), rmContainer);
-    
-    LOG.info("Application " + getApplicationId() 
-        + " reserved container " + rmContainer
-        + " on node " + node + ", currently has " + reservedContainers.size()
-        + " at priority " + priority 
-        + "; currentReservation " + currentReservation.getMemory());
-    
-    return rmContainer;
-  }
-
   public synchronized void unreserve(FSSchedulerNode node, Priority priority) {
     Map<NodeId, RMContainer> reservedContainers = 
         this.reservedContainers.get(priority);
@@ -376,22 +126,6 @@ public class FSSchedulerApp extends Sche
         + priority + "; currentReservation " + currentReservation);
   }
 
-  /**
-   * Has the application reserved the given <code>node</code> at the
-   * given <code>priority</code>?
-   * @param node node to be checked
-   * @param priority priority of reserved container
-   * @return true is reserved, false if not
-   */
-  public synchronized boolean isReserved(FSSchedulerNode node, Priority priority) {
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
-    if (reservedContainers != null) {
-      return reservedContainers.containsKey(node.getNodeID());
-    }
-    return false;
-  }
-
   public synchronized float getLocalityWaitFactor(
       Priority priority, int clusterNodes) {
     // Estimate: Required unique resources (i.e. hosts + racks)
@@ -402,42 +136,7 @@ public class FSSchedulerApp extends Sche
     // i.e. no point skipping more than clustersize opportunities
     return Math.min(((float)requiredResources / clusterNodes), 1.0f);
   }
-
-  /**
-   * Get the list of reserved containers
-   * @return All of the reserved containers.
-   */
-  @Override
-  public synchronized List<RMContainer> getReservedContainers() {
-    List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
-    for (Map.Entry<Priority, Map<NodeId, RMContainer>> e : 
-      this.reservedContainers.entrySet()) {
-      reservedContainers.addAll(e.getValue().values());
-    }
-    return reservedContainers;
-  }
   
-  public synchronized void setHeadroom(Resource globalLimit) {
-    this.resourceLimit = globalLimit; 
-  }
-
-  /**
-   * Get available headroom in terms of resources for the application's user.
-   * @return available resource headroom
-   */
-  public synchronized Resource getHeadroom() {
-    // Corner case to deal with applications being slightly over-limit
-    if (resourceLimit.getMemory() < 0) {
-      resourceLimit.setMemory(0);
-    }
-    
-    return resourceLimit;
-  }
-
-  public Queue getQueue() {
-    return queue;
-  }
-
   /**
    * Delay scheduling: We often want to prioritize scheduling of node-local
    * containers over rack-local or off-switch containers. To acheive this
@@ -453,21 +152,6 @@ public class FSSchedulerApp extends Sche
   final Map<Priority, NodeType> allowedLocalityLevel = new HashMap<
       Priority, NodeType>();
 
-  // Time of the last container scheduled at the current allowed level
-  Map<Priority, Long> lastScheduledContainer = new HashMap<Priority, Long>();
-
-  /**
-   * Should be called when an application has successfully scheduled a container,
-   * or when the scheduling locality threshold is relaxed.
-   * Reset various internal counters which affect delay scheduling
-   *
-   * @param priority The priority of the container scheduled.
-   */
-  synchronized public void resetSchedulingOpportunities(Priority priority) {
-    lastScheduledContainer.put(priority, System.currentTimeMillis());
-    schedulingOpportunities.setCount(priority, 0);
-  }
-
   /**
    * Return the level at which we are allowed to schedule containers, given the
    * current size of the cluster and thresholds indicating how many nodes to
@@ -513,6 +197,55 @@ public class FSSchedulerApp extends Sche
     return allowedLocalityLevel.get(priority);
   }
 
+  /**
+   * Return the level at which we are allowed to schedule containers.
+   * Given the thresholds indicating how much time passed before relaxing
+   * scheduling constraints.
+   */
+  public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority,
+          long nodeLocalityDelayMs, long rackLocalityDelayMs,
+          long currentTimeMs) {
+
+    // if not being used, can schedule anywhere
+    if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
+      return NodeType.OFF_SWITCH;
+    }
+
+    // default level is NODE_LOCAL
+    if (! allowedLocalityLevel.containsKey(priority)) {
+      allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
+      return NodeType.NODE_LOCAL;
+    }
+
+    NodeType allowed = allowedLocalityLevel.get(priority);
+
+    // if level is already most liberal, we're done
+    if (allowed.equals(NodeType.OFF_SWITCH)) {
+      return NodeType.OFF_SWITCH;
+    }
+
+    // check waiting time
+    long waitTime = currentTimeMs;
+    if (lastScheduledContainer.containsKey(priority)) {
+      waitTime -= lastScheduledContainer.get(priority);
+    } else {
+      waitTime -= appSchedulable.getStartTime();
+    }
+
+    long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
+            nodeLocalityDelayMs : rackLocalityDelayMs;
+
+    if (waitTime > thresholdTime) {
+      if (allowed.equals(NodeType.NODE_LOCAL)) {
+        allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
+        resetSchedulingOpportunities(priority, currentTimeMs);
+      } else if (allowed.equals(NodeType.RACK_LOCAL)) {
+        allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
+        resetSchedulingOpportunities(priority, currentTimeMs);
+      }
+    }
+    return allowedLocalityLevel.get(priority);
+  }
 
   synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
       Priority priority, ResourceRequest request,

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Wed Oct 30 22:21:59 2013
@@ -52,6 +52,7 @@ public class FSSchedulerNode extends Sch
 
   private Resource availableResource;
   private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
+  private Resource totalResourceCapability;
 
   private volatile int numContainers;
 
@@ -68,6 +69,9 @@ public class FSSchedulerNode extends Sch
   public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
     this.rmNode = node;
     this.availableResource = Resources.clone(node.getTotalCapability());
+    totalResourceCapability =
+        Resource.newInstance(node.getTotalCapability().getMemory(), node
+            .getTotalCapability().getVirtualCores());
     if (usePortForNodeName) {
       nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
     } else {
@@ -173,6 +177,11 @@ public class FSSchedulerNode extends Sch
     Resources.subtractFrom(usedResource, resource);
   }
 
+  @Override
+  public Resource getTotalResource() {
+    return this.totalResourceCapability;
+  }
+
   private synchronized void deductAvailableResource(Resource resource) {
     if (resource == null) {
       LOG.error("Invalid deduction of null resource for "



Mime
View raw message