hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1565792 [2/5] - in /hadoop/common/branches/branch-2.3/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/record...
Date Fri, 07 Feb 2014 20:33:02 GMT
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Fri Feb  7 20:33:01 2014
@@ -59,10 +59,10 @@ public class AppSchedulingInfo {
 
   final Set<Priority> priorities = new TreeSet<Priority>(
       new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
-  final Map<Priority, Map<String, ResourceRequest>> requests =
+  final Map<Priority, Map<String, ResourceRequest>> requests = 
     new HashMap<Priority, Map<String, ResourceRequest>>();
-  private Set<String> blacklist = new HashSet<String>();
-
+  final Set<String> blacklist = new HashSet<String>();
+  
   //private final ApplicationStore store;
   private final ActiveUsersManager activeUsersManager;
   
@@ -260,7 +260,7 @@ public class AppSchedulingInfo {
       // once an allocation is done we assume the application is
       // running from scheduler's POV.
       pending = false;
-      metrics.runAppAttempt(applicationId, user);
+      metrics.incrAppsRunning(this, user);
     }
     LOG.debug("allocate: user: " + user + ", memory: "
         + request.getCapability());
@@ -390,7 +390,7 @@ public class AppSchedulingInfo {
                 .getNumContainers()));
       }
     }
-    metrics.finishAppAttempt(applicationId, pending, user);
+    metrics.finishApp(this, rmAppAttemptFinalState);
     
     // Clear requests themselves
     clearRequests();
@@ -399,15 +399,4 @@ public class AppSchedulingInfo {
   public synchronized void setQueue(Queue queue) {
     this.queue = queue;
   }
-
-  public synchronized Set<String> getBlackList() {
-    return this.blacklist;
-  }
-
-  public synchronized void transferStateFromPreviousAppSchedulingInfo(
-      AppSchedulingInfo appInfo) {
-    //    this.priorities = appInfo.getPriorities();
-    //    this.requests = appInfo.getRequests();
-    this.blacklist = appInfo.getBlackList();
-  }
 }

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Fri Feb  7 20:33:01 2014
@@ -41,7 +41,7 @@ import org.apache.hadoop.metrics2.lib.Mu
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
@@ -57,7 +57,7 @@ public class QueueMetrics implements Met
   @Metric("# of pending apps") MutableGaugeInt appsPending;
   @Metric("# of apps completed") MutableCounterInt appsCompleted;
   @Metric("# of apps killed") MutableCounterInt appsKilled;
-  @Metric("# of apps failed") MutableCounterInt appsFailed;
+  @Metric("# of apps failed") MutableGaugeInt appsFailed;
 
   @Metric("Allocated memory in MB") MutableGaugeInt allocatedMB;
   @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
@@ -214,70 +214,54 @@ public class QueueMetrics implements Met
     registry.snapshot(collector.addRecord(registry.info()), all);
   }
 
-  public void submitApp(String user) {
-    appsSubmitted.incr();
-    QueueMetrics userMetrics = getUserMetrics(user);
-    if (userMetrics != null) {
-      userMetrics.submitApp(user);
-    }
-    if (parent != null) {
-      parent.submitApp(user);
+  public void submitApp(String user, int attemptId) {
+    if (attemptId == 1) {
+      appsSubmitted.incr();
+    } else {
+      appsFailed.decr();
     }
-  }
-
-  public void submitAppAttempt(String user) {
     appsPending.incr();
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
-      userMetrics.submitAppAttempt(user);
+      userMetrics.submitApp(user, attemptId);
     }
     if (parent != null) {
-      parent.submitAppAttempt(user);
+      parent.submitApp(user, attemptId);
     }
   }
 
-  public void runAppAttempt(ApplicationId appId, String user) {
-    runBuckets.add(appId, System.currentTimeMillis());
+  public void incrAppsRunning(AppSchedulingInfo app, String user) {
+    runBuckets.add(app.getApplicationId(), System.currentTimeMillis());
     appsRunning.incr();
     appsPending.decr();
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
-      userMetrics.runAppAttempt(appId, user);
+      userMetrics.incrAppsRunning(app, user);
     }
     if (parent != null) {
-      parent.runAppAttempt(appId, user);
+      parent.incrAppsRunning(app, user);
     }
   }
 
-  public void finishAppAttempt(
-      ApplicationId appId, boolean isPending, String user) {
-    runBuckets.remove(appId);
-    if (isPending) {
-      appsPending.decr();
-    } else {
-      appsRunning.decr();
-    }
-    QueueMetrics userMetrics = getUserMetrics(user);
-    if (userMetrics != null) {
-      userMetrics.finishAppAttempt(appId, isPending, user);
-    }
-    if (parent != null) {
-      parent.finishAppAttempt(appId, isPending, user);
-    }
-  }
-
-  public void finishApp(String user, RMAppState rmAppFinalState) {
-    switch (rmAppFinalState) {
+  public void finishApp(AppSchedulingInfo app,
+      RMAppAttemptState rmAppAttemptFinalState) {
+    runBuckets.remove(app.getApplicationId());
+    switch (rmAppAttemptFinalState) {
       case KILLED: appsKilled.incr(); break;
       case FAILED: appsFailed.incr(); break;
       default: appsCompleted.incr();  break;
     }
-    QueueMetrics userMetrics = getUserMetrics(user);
+    if (app.isPending()) {
+      appsPending.decr();
+    } else {
+      appsRunning.decr();
+    }
+    QueueMetrics userMetrics = getUserMetrics(app.getUser());
     if (userMetrics != null) {
-      userMetrics.finishApp(user, rmAppFinalState);
+      userMetrics.finishApp(app, rmAppAttemptFinalState);
     }
     if (parent != null) {
-      parent.finishApp(user, rmAppFinalState);
+      parent.finishApp(app, rmAppAttemptFinalState);
     }
   }
 

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java Fri Feb  7 20:33:01 2014
@@ -36,7 +36,7 @@ public class SchedulerAppReport {
   private final Collection<RMContainer> reserved;
   private final boolean pending;
   
-  public SchedulerAppReport(SchedulerApplicationAttempt app) {
+  public SchedulerAppReport(SchedulerApplication app) {
     this.live = app.getLiveContainers();
     this.reserved = app.getReservedContainers();
     this.pending = app.isPending();

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java Fri Feb  7 20:33:01 2014
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
 
 public class SchedulerAppUtils {
 
-  public static  boolean isBlacklisted(SchedulerApplicationAttempt application,
+  public static  boolean isBlacklisted(SchedulerApplication application,
       SchedulerNode node, Log LOG) {
     if (application.isBlacklisted(node.getNodeName())) {
       if (LOG.isDebugEnabled()) {

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Fri Feb  7 20:33:01 2014
@@ -17,41 +17,393 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
 
+/**
+ * Represents an application attempt from the viewpoint of the scheduler.
+ * Each running app attempt in the RM corresponds to one instance
+ * of this class.
+ */
 @Private
 @Unstable
-public class SchedulerApplication {
+public abstract class SchedulerApplication {
+  
+  private static final Log LOG = LogFactory.getLog(SchedulerApplication.class);
+
+  protected final AppSchedulingInfo appSchedulingInfo;
+  
+  protected final Map<ContainerId, RMContainer> liveContainers =
+      new HashMap<ContainerId, RMContainer>();
+  protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
+      new HashMap<Priority, Map<NodeId, RMContainer>>();
+
+  private final Multiset<Priority> reReservations = HashMultiset.create();
+  
+  protected final Resource currentReservation = Resource.newInstance(0, 0);
+  private Resource resourceLimit = Resource.newInstance(0, 0);
+  protected final Resource currentConsumption = Resource.newInstance(0, 0);
 
-  private final Queue queue;
-  private final String user;
-  private SchedulerApplicationAttempt currentAttempt;
+  protected List<RMContainer> newlyAllocatedContainers = 
+      new ArrayList<RMContainer>();
 
-  public SchedulerApplication(Queue queue, String user) {
+  /**
+   * Count how many times the application has been given an opportunity
+   * to schedule a task at each priority. Each time the scheduler
+   * asks the application for a task at this priority, it is incremented,
+   * and each time the application successfully schedules a task, it
+   * is reset to 0.
+   */
+  Multiset<Priority> schedulingOpportunities = HashMultiset.create();
+  
+  // Time of the last container scheduled at the current allowed level
+  protected Map<Priority, Long> lastScheduledContainer =
+      new HashMap<Priority, Long>();
+
+  protected final Queue queue;
+  protected boolean isStopped = false;
+  
+  protected final RMContext rmContext;
+  
+  public SchedulerApplication(ApplicationAttemptId applicationAttemptId, 
+      String user, Queue queue, ActiveUsersManager activeUsersManager,
+      RMContext rmContext) {
+    this.rmContext = rmContext;
+    this.appSchedulingInfo = 
+        new AppSchedulingInfo(applicationAttemptId, user, queue,  
+            activeUsersManager);
     this.queue = queue;
-    this.user = user;
+  }
+  
+  /**
+   * Get the live containers of the application.
+   * @return live containers of the application
+   */
+  public synchronized Collection<RMContainer> getLiveContainers() {
+    return new ArrayList<RMContainer>(liveContainers.values());
+  }
+  
+  /**
+   * Is this application pending?
+   * @return true if it is else false.
+   */
+  public boolean isPending() {
+    return appSchedulingInfo.isPending();
+  }
+  
+  /**
+   * Get {@link ApplicationAttemptId} of the application master.
+   * @return <code>ApplicationAttemptId</code> of the application master
+   */
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return appSchedulingInfo.getApplicationAttemptId();
+  }
+  
+  public ApplicationId getApplicationId() {
+    return appSchedulingInfo.getApplicationId();
+  }
+  
+  public String getUser() {
+    return appSchedulingInfo.getUser();
+  }
+
+  public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
+    return appSchedulingInfo.getResourceRequests(priority);
+  }
+
+  public int getNewContainerId() {
+    return appSchedulingInfo.getNewContainerId();
+  }
+
+  public Collection<Priority> getPriorities() {
+    return appSchedulingInfo.getPriorities();
+  }
+  
+  public ResourceRequest getResourceRequest(Priority priority, String resourceName) {
+    return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
+  }
+
+  public synchronized int getTotalRequiredResources(Priority priority) {
+    return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
+  }
+
+  public Resource getResource(Priority priority) {
+    return appSchedulingInfo.getResource(priority);
+  }
+
+  public String getQueueName() {
+    return appSchedulingInfo.getQueueName();
+  }
+  
+  public synchronized RMContainer getRMContainer(ContainerId id) {
+    return liveContainers.get(id);
+  }
+
+  protected synchronized void resetReReservations(Priority priority) {
+    reReservations.setCount(priority, 0);
   }
 
+  protected synchronized void addReReservation(Priority priority) {
+    reReservations.add(priority);
+  }
+
+  public synchronized int getReReservations(Priority priority) {
+    return reReservations.count(priority);
+  }
+
+  /**
+   * Get total current reservations.
+   * Used only by unit tests
+   * @return total current reservations
+   */
+  @Stable
+  @Private
+  public synchronized Resource getCurrentReservation() {
+    return currentReservation;
+  }
+  
   public Queue getQueue() {
     return queue;
   }
+  
+  public synchronized void updateResourceRequests(
+      List<ResourceRequest> requests) {
+    if (!isStopped) {
+      appSchedulingInfo.updateResourceRequests(requests);
+    }
+  }
+  
+  public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
+    // Cleanup all scheduling information
+    isStopped = true;
+    appSchedulingInfo.stop(rmAppAttemptFinalState);
+  }
 
-  public String getUser() {
-    return user;
+  public synchronized boolean isStopped() {
+    return isStopped;
   }
 
-  public SchedulerApplicationAttempt getCurrentAppAttempt() {
-    return currentAttempt;
+  /**
+   * Get the list of reserved containers
+   * @return All of the reserved containers.
+   */
+  public synchronized List<RMContainer> getReservedContainers() {
+    List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
+    for (Map.Entry<Priority, Map<NodeId, RMContainer>> e : 
+      this.reservedContainers.entrySet()) {
+      reservedContainers.addAll(e.getValue().values());
+    }
+    return reservedContainers;
+  }
+  
+  public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
+      RMContainer rmContainer, Container container) {
+    // Create RMContainer if necessary
+    if (rmContainer == null) {
+      rmContainer = 
+          new RMContainerImpl(container, getApplicationAttemptId(), 
+              node.getNodeID(), rmContext.getDispatcher().getEventHandler(), 
+              rmContext.getContainerAllocationExpirer());
+        
+      Resources.addTo(currentReservation, container.getResource());
+      
+      // Reset the re-reservation count
+      resetReReservations(priority);
+    } else {
+      // Note down the re-reservation
+      addReReservation(priority);
+    }
+    rmContainer.handle(new RMContainerReservedEvent(container.getId(), 
+        container.getResource(), node.getNodeID(), priority));
+    
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    if (reservedContainers == null) {
+      reservedContainers = new HashMap<NodeId, RMContainer>();
+      this.reservedContainers.put(priority, reservedContainers);
+    }
+    reservedContainers.put(node.getNodeID(), rmContainer);
+    
+    LOG.info("Application " + getApplicationId() 
+        + " reserved container " + rmContainer
+        + " on node " + node + ", currently has " + reservedContainers.size()
+        + " at priority " + priority 
+        + "; currentReservation " + currentReservation.getMemory());
+    
+    return rmContainer;
+  }
+  
+  /**
+   * Has the application reserved the given <code>node</code> at the
+   * given <code>priority</code>?
+   * @param node node to be checked
+   * @param priority priority of reserved container
+   * @return true is reserved, false if not
+   */
+  public synchronized boolean isReserved(SchedulerNode node, Priority priority) {
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    if (reservedContainers != null) {
+      return reservedContainers.containsKey(node.getNodeID());
+    }
+    return false;
+  }
+  
+  public synchronized void setHeadroom(Resource globalLimit) {
+    this.resourceLimit = globalLimit; 
   }
 
-  public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) {
-    this.currentAttempt = currentAttempt;
+  /**
+   * Get available headroom in terms of resources for the application's user.
+   * @return available resource headroom
+   */
+  public synchronized Resource getHeadroom() {
+    // Corner case to deal with applications being slightly over-limit
+    if (resourceLimit.getMemory() < 0) {
+      resourceLimit.setMemory(0);
+    }
+    
+    return resourceLimit;
   }
+  
+  public synchronized int getNumReservedContainers(Priority priority) {
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    return (reservedContainers == null) ? 0 : reservedContainers.size();
+  }
+  
+  @SuppressWarnings("unchecked")
+  public synchronized void containerLaunchedOnNode(ContainerId containerId,
+      NodeId nodeId) {
+    // Inform the container
+    RMContainer rmContainer = getRMContainer(containerId);
+    if (rmContainer == null) {
+      // Some unknown container sneaked into the system. Kill it.
+      rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
+      return;
+    }
 
-  public void stop(RMAppState rmAppFinalState) {
-    queue.getMetrics().finishApp(user, rmAppFinalState);
+    rmContainer.handle(new RMContainerEvent(containerId,
+        RMContainerEventType.LAUNCHED));
+  }
+  
+  public synchronized void showRequests() {
+    if (LOG.isDebugEnabled()) {
+      for (Priority priority : getPriorities()) {
+        Map<String, ResourceRequest> requests = getResourceRequests(priority);
+        if (requests != null) {
+          LOG.debug("showRequests:" + " application=" + getApplicationId() + 
+              " headRoom=" + getHeadroom() + 
+              " currentConsumption=" + currentConsumption.getMemory());
+          for (ResourceRequest request : requests.values()) {
+            LOG.debug("showRequests:" + " application=" + getApplicationId()
+                + " request=" + request);
+          }
+        }
+      }
+    }
+  }
+  
+  public Resource getCurrentConsumption() {
+    return currentConsumption;
+  }
+
+  public synchronized List<Container> pullNewlyAllocatedContainers() {
+    List<Container> returnContainerList = new ArrayList<Container>(
+        newlyAllocatedContainers.size());
+    for (RMContainer rmContainer : newlyAllocatedContainers) {
+      rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
+          RMContainerEventType.ACQUIRED));
+      returnContainerList.add(rmContainer.getContainer());
+    }
+    newlyAllocatedContainers.clear();
+    return returnContainerList;
+  }
+
+  public synchronized void updateBlacklist(
+      List<String> blacklistAdditions, List<String> blacklistRemovals) {
+    if (!isStopped) {
+      this.appSchedulingInfo.updateBlacklist(
+          blacklistAdditions, blacklistRemovals);
+    }
+  }
+  
+  public boolean isBlacklisted(String resourceName) {
+    return this.appSchedulingInfo.isBlacklisted(resourceName);
+  }
+
+  public synchronized void addSchedulingOpportunity(Priority priority) {
+    schedulingOpportunities.setCount(priority,
+        schedulingOpportunities.count(priority) + 1);
+  }
+  
+  public synchronized void subtractSchedulingOpportunity(Priority priority) {
+    int count = schedulingOpportunities.count(priority) - 1;
+    this.schedulingOpportunities.setCount(priority, Math.max(count,  0));
+  }
+
+  /**
+   * Return the number of times the application has been given an opportunity
+   * to schedule a task at the given priority since the last time it
+   * successfully did so.
+   */
+  public synchronized int getSchedulingOpportunities(Priority priority) {
+    return schedulingOpportunities.count(priority);
+  }
+  
+  /**
+   * Should be called when an application has successfully scheduled a container,
+   * or when the scheduling locality threshold is relaxed.
+   * Reset various internal counters which affect delay scheduling
+   *
+   * @param priority The priority of the container scheduled.
+   */
+  public synchronized void resetSchedulingOpportunities(Priority priority) {
+    resetSchedulingOpportunities(priority, System.currentTimeMillis());
+  }
+  // used for continuous scheduling
+  public synchronized void resetSchedulingOpportunities(Priority priority,
+      long currentTimeMs) {
+    lastScheduledContainer.put(priority, currentTimeMs);
+    schedulingOpportunities.setCount(priority, 0);
+  }
+  
+  public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
+    return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
+        reservedContainers.size(), Resources.clone(currentConsumption),
+        Resources.clone(currentReservation),
+        Resources.add(currentConsumption, currentReservation));
   }
 
 }

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Fri Feb  7 20:33:01 2014
@@ -19,13 +19,13 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 
 /**
@@ -171,13 +170,4 @@ public interface YarnScheduler extends E
   @LimitedPrivate("yarn")
   @Stable
   public List<ApplicationAttemptId> getAppsInQueue(String queueName);
-
-  /**
-   * Get the container for the given containerId.
-   * @param containerId
-   * @return the container for the given containerId.
-   */
-  @LimitedPrivate("yarn")
-  @Unstable
-  public RMContainer getRMContainer(ContainerId containerId);
 }

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Fri Feb  7 20:33:01 2014
@@ -27,7 +27,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 
@@ -155,32 +155,21 @@ extends org.apache.hadoop.yarn.server.re
   
   /**
    * Submit a new application to the queue.
-   * @param applicationId the applicationId of the application being submitted
+   * @param application application being submitted
    * @param user user who submitted the application
    * @param queue queue to which the application is submitted
    */
-  public void submitApplication(ApplicationId applicationId, String user,
-      String queue) throws AccessControlException;
-
-  /**
-   * Submit an application attempt to the queue.
-   */
-  public void submitApplicationAttempt(FiCaSchedulerApp application,
-      String userName);
-
+  public void submitApplication(FiCaSchedulerApp application, String user, 
+      String queue) 
+  throws AccessControlException;
+  
   /**
    * An application submitted to this queue has finished.
-   * @param applicationId
-   * @param user user who submitted the application
+   * @param application
+   * @param queue application queue 
    */
-  public void finishApplication(ApplicationId applicationId, String user);
-
-  /**
-   * An application attempt submitted to this queue has finished.
-   */
-  public void finishApplicationAttempt(FiCaSchedulerApp application,
-      String queue);
-
+  public void finishApplication(FiCaSchedulerApp application, String queue);
+  
   /**
    * Assign containers to applications in the queue or it's children (if any).
    * @param clusterResource the resource of the cluster.

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Fri Feb  7 20:33:01 2014
@@ -36,7 +36,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -54,20 +53,15 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -77,10 +71,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -96,7 +88,7 @@ import com.google.common.annotations.Vis
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
-public class CapacityScheduler extends AbstractYarnScheduler
+public class CapacityScheduler
   implements PreemptableResourceScheduler, CapacitySchedulerContext,
              Configurable {
 
@@ -178,6 +170,7 @@ public class CapacityScheduler extends A
 
   private CapacitySchedulerConfiguration conf;
   private Configuration yarnConf;
+  private RMContext rmContext;
 
   private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
 
@@ -191,6 +184,10 @@ public class CapacityScheduler extends A
   private Resource minimumAllocation;
   private Resource maximumAllocation;
 
+  @VisibleForTesting
+  protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications = 
+      new ConcurrentHashMap<ApplicationAttemptId, FiCaSchedulerApp>();
+
   private boolean initialized = false;
 
   private ResourceCalculator calculator;
@@ -267,10 +264,9 @@ public class CapacityScheduler extends A
       this.maximumAllocation = this.conf.getMaximumAllocation();
       this.calculator = this.conf.getResourceCalculator();
       this.usePortForNodeName = this.conf.getUsePortForNodeName();
-      this.applications =
-          new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
-      this.rmContext = rmContext;
 
+      this.rmContext = rmContext;
+      
       initializeQueues(this.conf);
       
       initialized = true;
@@ -419,141 +415,105 @@ public class CapacityScheduler extends A
   synchronized CSQueue getQueue(String queueName) {
     return queues.get(queueName);
   }
+  
+  private synchronized void
+      addApplicationAttempt(ApplicationAttemptId applicationAttemptId,
+          String queueName, String user) {
 
-  private synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user) {
-    // santiy checks.
+    // Sanity checks
     CSQueue queue = getQueue(queueName);
     if (queue == null) {
-      String message = "Application " + applicationId + 
+      String message = "Application " + applicationAttemptId + 
       " submitted by user " + user + " to unknown queue: " + queueName;
-      this.rmContext.getDispatcher().getEventHandler()
-          .handle(new RMAppRejectedEvent(applicationId, message));
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptRejectedEvent(applicationAttemptId, message));
       return;
     }
     if (!(queue instanceof LeafQueue)) {
-      String message = "Application " + applicationId + 
+      String message = "Application " + applicationAttemptId + 
           " submitted by user " + user + " to non-leaf queue: " + queueName;
-      this.rmContext.getDispatcher().getEventHandler()
-          .handle(new RMAppRejectedEvent(applicationId, message));
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptRejectedEvent(applicationAttemptId, message));
       return;
     }
+
+    // TODO: Fix store
+    FiCaSchedulerApp SchedulerApp = 
+        new FiCaSchedulerApp(applicationAttemptId, user, queue, 
+            queue.getActiveUsersManager(), rmContext);
+
     // Submit to the queue
     try {
-      queue.submitApplication(applicationId, user, queueName);
+      queue.submitApplication(SchedulerApp, user, queueName);
     } catch (AccessControlException ace) {
-      LOG.info("Failed to submit application " + applicationId + " to queue "
-          + queueName + " from user " + user, ace);
-      this.rmContext.getDispatcher().getEventHandler()
-          .handle(new RMAppRejectedEvent(applicationId, ace.toString()));
+      LOG.info("Failed to submit application " + applicationAttemptId + 
+          " to queue " + queueName + " from user " + user, ace);
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptRejectedEvent(applicationAttemptId, 
+              ace.toString()));
       return;
     }
-    SchedulerApplication application =
-        new SchedulerApplication(queue, user);
-    applications.put(applicationId, application);
-    LOG.info("Accepted application " + applicationId + " from user: " + user
-        + ", in queue: " + queueName);
-    rmContext.getDispatcher().getEventHandler()
-        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
-  }
 
-  private synchronized void addApplicationAttempt(
-      ApplicationAttemptId applicationAttemptId,
-      boolean transferStateFromPreviousAttempt) {
-    SchedulerApplication application =
-        applications.get(applicationAttemptId.getApplicationId());
-    CSQueue queue = (CSQueue) application.getQueue();
-
-    FiCaSchedulerApp attempt =
-        new FiCaSchedulerApp(applicationAttemptId, application.getUser(),
-          queue, queue.getActiveUsersManager(), rmContext);
-    if (transferStateFromPreviousAttempt) {
-      attempt.transferStateFromPreviousAttempt(application
-        .getCurrentAppAttempt());
-    }
-    application.setCurrentAppAttempt(attempt);
-
-    queue.submitApplicationAttempt(attempt, application.getUser());
-    LOG.info("Added Application Attempt " + applicationAttemptId
-        + " to scheduler from user " + application.getUser() + " in queue "
-        + queue.getQueueName());
-    rmContext.getDispatcher().getEventHandler() .handle(
-        new RMAppAttemptEvent(applicationAttemptId,
-          RMAppAttemptEventType.ATTEMPT_ADDED));
-  }
+    applications.put(applicationAttemptId, SchedulerApp);
 
-  private synchronized void doneApplication(ApplicationId applicationId,
-      RMAppState finalState) {
-    SchedulerApplication application = applications.get(applicationId);
-    if (application == null){
-      // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps,
-      // ignore it.
-      LOG.warn("Couldn't find application " + applicationId);
-      return;
-    }
-    CSQueue queue = (CSQueue) application.getQueue();
-    if (!(queue instanceof LeafQueue)) {
-      LOG.error("Cannot finish application " + "from non-leaf queue: "
-          + queue.getQueueName());
-    } else {
-      queue.finishApplication(applicationId, application.getUser());
-    }
-    application.stop(finalState);
-    applications.remove(applicationId);
+    LOG.info("Application Submission: " + applicationAttemptId + 
+        ", user: " + user +
+        " queue: " + queue +
+        ", currently active: " + applications.size());
+
+    rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(applicationAttemptId,
+            RMAppAttemptEventType.APP_ACCEPTED));
   }
 
   private synchronized void doneApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
-      RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
-    LOG.info("Application Attempt " + applicationAttemptId + " is done." +
+      RMAppAttemptState rmAppAttemptFinalState) {
+    LOG.info("Application " + applicationAttemptId + " is done." +
     		" finalState=" + rmAppAttemptFinalState);
     
-    FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
-    SchedulerApplication application =
-        applications.get(applicationAttemptId.getApplicationId());
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
 
-    if (application == null || attempt == null) {
+    if (application == null) {
+      //      throw new IOException("Unknown application " + applicationId + 
+      //          " has completed!");
       LOG.info("Unknown application " + applicationAttemptId + " has completed!");
       return;
     }
-
-    // Release all the allocated, acquired, running containers
-    for (RMContainer rmContainer : attempt.getLiveContainers()) {
-      if (keepContainers
-          && rmContainer.getState().equals(RMContainerState.RUNNING)) {
-        // do not kill the running container in the case of work-preserving AM
-        // restart.
-        LOG.info("Skip killing " + rmContainer.getContainerId());
-        continue;
-      }
-      completedContainer(
-        rmContainer,
-        SchedulerUtils.createAbnormalContainerStatus(
-          rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
-        RMContainerEventType.KILL);
+    
+    // Release all the running containers 
+    for (RMContainer rmContainer : application.getLiveContainers()) {
+      completedContainer(rmContainer, 
+          SchedulerUtils.createAbnormalContainerStatus(
+              rmContainer.getContainerId(), 
+              SchedulerUtils.COMPLETED_APPLICATION), 
+          RMContainerEventType.KILL);
     }
-
-    // Release all reserved containers
-    for (RMContainer rmContainer : attempt.getReservedContainers()) {
-      completedContainer(
-        rmContainer,
-        SchedulerUtils.createAbnormalContainerStatus(
-          rmContainer.getContainerId(), "Application Complete"),
-        RMContainerEventType.KILL);
+    
+     // Release all reserved containers
+    for (RMContainer rmContainer : application.getReservedContainers()) {
+      completedContainer(rmContainer, 
+          SchedulerUtils.createAbnormalContainerStatus(
+              rmContainer.getContainerId(), 
+              "Application Complete"), 
+          RMContainerEventType.KILL);
     }
-
+    
     // Clean up pending requests, metrics etc.
-    attempt.stop(rmAppAttemptFinalState);
-
+    application.stop(rmAppAttemptFinalState);
+    
     // Inform the queue
-    String queueName = attempt.getQueue().getQueueName();
+    String queueName = application.getQueue().getQueueName();
     CSQueue queue = queues.get(queueName);
     if (!(queue instanceof LeafQueue)) {
       LOG.error("Cannot finish application " + "from non-leaf queue: "
           + queueName);
     } else {
-      queue.finishApplicationAttempt(attempt, queue.getQueueName());
+      queue.finishApplication(application, queue.getQueueName());
     }
+    
+    // Remove from our data-structure
+    applications.remove(applicationAttemptId);
   }
 
   private static final Allocation EMPTY_ALLOCATION = 
@@ -565,7 +525,7 @@ public class CapacityScheduler extends A
       List<ResourceRequest> ask, List<ContainerId> release, 
       List<String> blacklistAdditions, List<String> blacklistRemovals) {
 
-    FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
       LOG.info("Calling allocate on removed " +
           "or non existant application " + applicationAttemptId);
@@ -707,8 +667,8 @@ public class CapacityScheduler extends A
 
     RMContainer reservedContainer = node.getReservedContainer();
     if (reservedContainer != null) {
-      FiCaSchedulerApp reservedApplication =
-          getCurrentAttemptForContainer(reservedContainer.getContainerId());
+      FiCaSchedulerApp reservedApplication = 
+          getApplication(reservedContainer.getApplicationAttemptId());
       
       // Try to fulfill the reservation
       LOG.info("Trying to fulfill reservation for application " + 
@@ -745,11 +705,12 @@ public class CapacityScheduler extends A
 
   private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
     // Get the application for the finished container
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
+    ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
+      LOG.info("Unknown application: " + applicationAttemptId + 
+          " launched container " + containerId +
+          " on node: " + node);
       this.rmContext.getDispatcher().getEventHandler()
         .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
       return;
@@ -779,26 +740,12 @@ public class CapacityScheduler extends A
       nodeUpdate(nodeUpdatedEvent.getRMNode());
     }
     break;
-    case APP_ADDED:
-    {
-      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
-      addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
-    }
-    break;
-    case APP_REMOVED:
-    {
-      AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
-      doneApplication(appRemovedEvent.getApplicationID(),
-        appRemovedEvent.getFinalState());
-    }
-    break;
     case APP_ATTEMPT_ADDED:
     {
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+        appAttemptAddedEvent.getQueue(), appAttemptAddedEvent.getUser());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -806,8 +753,7 @@ public class CapacityScheduler extends A
       AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
           (AppAttemptRemovedSchedulerEvent) event;
       doneApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
-        appAttemptRemovedEvent.getFinalAttemptState(),
-        appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
+        appAttemptRemovedEvent.getFinalAttemptState());
     }
     break;
     case CONTAINER_EXPIRED:
@@ -882,13 +828,13 @@ public class CapacityScheduler extends A
     Container container = rmContainer.getContainer();
     
     // Get the application for the finished container
-    FiCaSchedulerApp application =
-        getCurrentAttemptForContainer(container.getId());
-    ApplicationId appId =
-        container.getId().getApplicationAttemptId().getApplicationId();
+    ApplicationAttemptId applicationAttemptId =
+      container.getId().getApplicationAttemptId();
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
-      LOG.info("Container " + container + " of" + " unknown application "
-          + appId + " completed with event " + event);
+      LOG.info("Container " + container + " of" +
+      		" unknown application " + applicationAttemptId + 
+          " completed with event " + event);
       return;
     }
     
@@ -900,33 +846,28 @@ public class CapacityScheduler extends A
     queue.completedContainer(clusterResource, application, node, 
         rmContainer, containerStatus, event, null);
 
-    LOG.info("Application attempt " + application.getApplicationAttemptId()
-        + " released container " + container.getId() + " on node: " + node
-        + " with event: " + event);
+    LOG.info("Application " + applicationAttemptId + 
+        " released container " + container.getId() +
+        " on node: " + node + 
+        " with event: " + event);
   }
 
   @Lock(Lock.NoLock.class)
-  FiCaSchedulerApp getApplicationAttempt(
-      ApplicationAttemptId applicationAttemptId) {
-    SchedulerApplication app =
-        applications.get(applicationAttemptId.getApplicationId());
-    if (app != null) {
-      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
+  FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
+    return applications.get(applicationAttemptId);
   }
 
   @Override
   public SchedulerAppReport getSchedulerAppInfo(
       ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
+    FiCaSchedulerApp app = getApplication(applicationAttemptId);
     return app == null ? null : new SchedulerAppReport(app);
   }
   
   @Override
   public ApplicationResourceUsageReport getAppResourceUsageReport(
       ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
+    FiCaSchedulerApp app = getApplication(applicationAttemptId);
     return app == null ? null : app.getResourceUsageReport();
   }
   
@@ -935,22 +876,10 @@ public class CapacityScheduler extends A
     return nodes.get(nodeId);
   }
 
-  @Override
-  public RMContainer getRMContainer(ContainerId containerId) {
-    FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
-    return (attempt == null) ? null : attempt.getRMContainer(containerId);
-  }
-
-  @VisibleForTesting
-  public FiCaSchedulerApp getCurrentAttemptForContainer(
-      ContainerId containerId) {
-    SchedulerApplication app =
-        applications.get(containerId.getApplicationAttemptId()
-          .getApplicationId());
-    if (app != null) {
-      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
+  private RMContainer getRMContainer(ContainerId containerId) {
+    FiCaSchedulerApp application = 
+        getApplication(containerId.getApplicationAttemptId());
+    return (application == null) ? null : application.getRMContainer(containerId);
   }
 
   @Override
@@ -983,7 +912,7 @@ public class CapacityScheduler extends A
       LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() +
           " container: " + cont.toString());
     }
-    FiCaSchedulerApp app = getApplicationAttempt(aid);
+    FiCaSchedulerApp app = applications.get(aid);
     if (app != null) {
       app.addPreemptContainer(cont.getContainerId());
     }

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Fri Feb  7 20:33:01 2014
@@ -38,7 +38,6 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -99,7 +99,7 @@ public class LeafQueue implements CSQueu
   private volatile int numContainers;
 
   Set<FiCaSchedulerApp> activeApplications;
-  Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap = 
+  Map<ApplicationAttemptId, FiCaSchedulerApp> applicationsMap = 
       new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
   
   Set<FiCaSchedulerApp> pendingApplications;
@@ -635,21 +635,7 @@ public class LeafQueue implements CSQueu
   }
 
   @Override
-  public void submitApplicationAttempt(FiCaSchedulerApp application,
-      String userName) {
-    // Careful! Locking order is important!
-    synchronized (this) {
-      User user = getUser(userName);
-      // Add the attempt to our data-structures
-      addApplicationAttempt(application, user);
-    }
-
-    metrics.submitAppAttempt(userName);
-    getParent().submitApplicationAttempt(application, userName);
-  }
-
-  @Override
-  public void submitApplication(ApplicationId applicationId, String userName,
+  public void submitApplication(FiCaSchedulerApp application, String userName,
       String queue)  throws AccessControlException {
     // Careful! Locking order is important!
 
@@ -667,7 +653,8 @@ public class LeafQueue implements CSQueu
       // Check if the queue is accepting jobs
       if (getState() != QueueState.RUNNING) {
         String msg = "Queue " + getQueuePath() +
-        " is STOPPED. Cannot accept submission of application: " + applicationId;
+        " is STOPPED. Cannot accept submission of application: " +
+        application.getApplicationId();
         LOG.info(msg);
         throw new AccessControlException(msg);
       }
@@ -676,7 +663,8 @@ public class LeafQueue implements CSQueu
       if (getNumApplications() >= getMaxApplications()) {
         String msg = "Queue " + getQueuePath() + 
         " already has " + getNumApplications() + " applications," +
-        " cannot accept submission of application: " + applicationId;
+        " cannot accept submission of application: " + 
+        application.getApplicationId();
         LOG.info(msg);
         throw new AccessControlException(msg);
       }
@@ -687,22 +675,28 @@ public class LeafQueue implements CSQueu
         String msg = "Queue " + getQueuePath() + 
         " already has " + user.getTotalApplications() + 
         " applications from user " + userName + 
-        " cannot accept submission of application: " + applicationId;
+        " cannot accept submission of application: " + 
+        application.getApplicationId();
         LOG.info(msg);
         throw new AccessControlException(msg);
       }
+
+      // Add the application to our data-structures
+      addApplication(application, user);
     }
 
+    int attemptId = application.getApplicationAttemptId().getAttemptId();
+    metrics.submitApp(userName, attemptId);
+
     // Inform the parent queue
     try {
-      getParent().submitApplication(applicationId, userName, queue);
+      getParent().submitApplication(application, userName, queue);
     } catch (AccessControlException ace) {
       LOG.info("Failed to submit application to parent-queue: " + 
           getParent().getQueuePath(), ace);
+      removeApplication(application, user);
       throw ace;
     }
-
-    metrics.submitApp(userName);
   }
 
   private synchronized void activateApplications() {
@@ -728,11 +722,11 @@ public class LeafQueue implements CSQueu
     }
   }
   
-  private synchronized void addApplicationAttempt(FiCaSchedulerApp application, User user) {
+  private synchronized void addApplication(FiCaSchedulerApp application, User user) {
     // Accept 
     user.submitApplication();
     pendingApplications.add(application);
-    applicationAttemptMap.put(application.getApplicationAttemptId(), application);
+    applicationsMap.put(application.getApplicationAttemptId(), application);
 
     // Activate applications
     activateApplications();
@@ -748,28 +742,22 @@ public class LeafQueue implements CSQueu
   }
 
   @Override
-  public void finishApplication(ApplicationId application, String user) {
-    // Inform the activeUsersManager
-    activeUsersManager.deactivateApplication(user, application);
-    // Inform the parent queue
-    getParent().finishApplication(application, user);
-  }
-
-  @Override
-  public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) {
+  public void finishApplication(FiCaSchedulerApp application, String queue) {
     // Careful! Locking order is important!
     synchronized (this) {
-      removeApplicationAttempt(application, getUser(application.getUser()));
+      removeApplication(application, getUser(application.getUser()));
     }
-    getParent().finishApplicationAttempt(application, queue);
+
+    // Inform the parent queue
+    getParent().finishApplication(application, queue);
   }
 
-  public synchronized void removeApplicationAttempt(FiCaSchedulerApp application, User user) {
+  public synchronized void removeApplication(FiCaSchedulerApp application, User user) {
     boolean wasActive = activeApplications.remove(application);
     if (!wasActive) {
       pendingApplications.remove(application);
     }
-    applicationAttemptMap.remove(application.getApplicationAttemptId());
+    applicationsMap.remove(application.getApplicationAttemptId());
 
     user.finishApplication(wasActive);
     if (user.getTotalApplications() == 0) {
@@ -778,7 +766,13 @@ public class LeafQueue implements CSQueu
 
     // Check if we can activate more applications
     activateApplications();
-
+    
+    // Inform the activeUsersManager
+    synchronized (application) {
+      activeUsersManager.deactivateApplication(
+          application.getUser(), application.getApplicationId());
+    }
+    
     LOG.info("Application removed -" +
         " appId: " + application.getApplicationId() + 
         " user: " + application.getUser() + 
@@ -789,10 +783,10 @@ public class LeafQueue implements CSQueu
         " #queue-active-applications: " + getNumActiveApplications()
         );
   }
-
+  
   private synchronized FiCaSchedulerApp getApplication(
       ApplicationAttemptId applicationAttemptId) {
-    return applicationAttemptMap.get(applicationAttemptId);
+    return applicationsMap.get(applicationAttemptId);
   }
 
   private static final CSAssignment NULL_ASSIGNMENT =

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Fri Feb  7 20:33:01 2014
@@ -37,7 +37,6 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -52,6 +51,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -442,7 +442,7 @@ public class ParentQueue implements CSQu
   }
 
   @Override
-  public void submitApplication(ApplicationId applicationId, String user,
+  public void submitApplication(FiCaSchedulerApp application, String user,
       String queue) throws AccessControlException {
     
     synchronized (this) {
@@ -455,70 +455,57 @@ public class ParentQueue implements CSQu
       if (state != QueueState.RUNNING) {
         throw new AccessControlException("Queue " + getQueuePath() +
             " is STOPPED. Cannot accept submission of application: " +
-            applicationId);
+            application.getApplicationId());
       }
 
-      addApplication(applicationId, user);
+      addApplication(application, user);
     }
     
     // Inform the parent queue
     if (parent != null) {
       try {
-        parent.submitApplication(applicationId, user, queue);
+        parent.submitApplication(application, user, queue);
       } catch (AccessControlException ace) {
         LOG.info("Failed to submit application to parent-queue: " + 
             parent.getQueuePath(), ace);
-        removeApplication(applicationId, user);
+        removeApplication(application, user);
         throw ace;
       }
     }
   }
 
-
-  @Override
-  public void submitApplicationAttempt(FiCaSchedulerApp application,
-      String userName) {
-    // submit attempt logic.
-  }
-
-  @Override
-  public void finishApplicationAttempt(FiCaSchedulerApp application,
-      String queue) {
-    // finish attempt logic.
-  }
-
-  private synchronized void addApplication(ApplicationId applicationId,
+  private synchronized void addApplication(FiCaSchedulerApp application, 
       String user) {
-
+  
     ++numApplications;
 
     LOG.info("Application added -" +
-        " appId: " + applicationId + 
+        " appId: " + application.getApplicationId() + 
         " user: " + user + 
         " leaf-queue of parent: " + getQueueName() + 
         " #applications: " + getNumApplications());
   }
   
   @Override
-  public void finishApplication(ApplicationId application, String user) {
+  public void finishApplication(FiCaSchedulerApp application, String queue) {
     
     synchronized (this) {
-      removeApplication(application, user);
+      removeApplication(application, application.getUser());
     }
     
     // Inform the parent queue
     if (parent != null) {
-      parent.finishApplication(application, user);
+      parent.finishApplication(application, queue);
     }
   }
 
-  public synchronized void removeApplication(ApplicationId applicationId, 
+  public synchronized void removeApplication(FiCaSchedulerApp application, 
       String user) {
     
     --numApplications;
 
     LOG.info("Application removed -" +
-        " appId: " + applicationId + 
+        " appId: " + application.getApplicationId() + 
         " user: " + user + 
         " leaf-queue of parent: " + getQueueName() + 
         " #applications: " + getNumApplications());

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Fri Feb  7 20:33:01 2014
@@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 
@@ -57,7 +57,7 @@ import org.apache.hadoop.yarn.util.resou
  */
 @Private
 @Unstable
-public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
+public class FiCaSchedulerApp extends SchedulerApplication {
 
   private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
 

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Fri Feb  7 20:33:01 2014
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -206,7 +206,7 @@ public class FiCaSchedulerNode extends S
   }
 
   public synchronized void reserveResource(
-      SchedulerApplicationAttempt application, Priority priority, 
+      SchedulerApplication application, Priority priority, 
       RMContainer reservedContainer) {
     // Check if it's already reserved
     if (this.reservedContainer != null) {
@@ -219,8 +219,7 @@ public class FiCaSchedulerNode extends S
             " on node " + this.reservedContainer.getReservedNode());
       }
       
-      // Cannot reserve more than one application attempt on a given node!
-      // Reservation is still against attempt.
+      // Cannot reserve more than one application on a given node!
       if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
           reservedContainer.getContainer().getId().getApplicationAttemptId())) {
         throw new IllegalStateException("Trying to reserve" +
@@ -242,7 +241,7 @@ public class FiCaSchedulerNode extends S
   }
 
   public synchronized void unreserveResource(
-      SchedulerApplicationAttempt application) {
+      SchedulerApplication application) {
     
     // adding NP checks as this can now be called for preemption
     if (reservedContainer != null

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Fri Feb  7 20:33:01 2014
@@ -23,21 +23,27 @@ import org.apache.hadoop.yarn.api.record
 public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
 
   private final ApplicationAttemptId applicationAttemptId;
-  private final boolean transferStateFromPreviousAttempt;
+  private final String queue;
+  private final String user;
 
   public AppAttemptAddedSchedulerEvent(
-      ApplicationAttemptId applicationAttemptId,
-      boolean transferStateFromPreviousAttempt) {
+      ApplicationAttemptId applicationAttemptId, String queue, String user) {
     super(SchedulerEventType.APP_ATTEMPT_ADDED);
     this.applicationAttemptId = applicationAttemptId;
-    this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
+    this.queue = queue;
+    this.user = user;
   }
 
   public ApplicationAttemptId getApplicationAttemptId() {
     return applicationAttemptId;
   }
 
-  public boolean getTransferStateFromPreviousAttempt() {
-    return transferStateFromPreviousAttempt;
+  public String getQueue() {
+    return queue;
   }
+
+  public String getUser() {
+    return user;
+  }
+
 }

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java Fri Feb  7 20:33:01 2014
@@ -25,15 +25,13 @@ public class AppAttemptRemovedSchedulerE
 
   private final ApplicationAttemptId applicationAttemptId;
   private final RMAppAttemptState finalAttemptState;
-  private final boolean keepContainersAcrossAppAttempts;
 
   public AppAttemptRemovedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
-      RMAppAttemptState finalAttemptState, boolean keepContainers) {
+      RMAppAttemptState finalAttemptState) {
     super(SchedulerEventType.APP_ATTEMPT_REMOVED);
     this.applicationAttemptId = applicationAttemptId;
     this.finalAttemptState = finalAttemptState;
-    this.keepContainersAcrossAppAttempts = keepContainers;
   }
 
   public ApplicationAttemptId getApplicationAttemptID() {
@@ -43,8 +41,4 @@ public class AppAttemptRemovedSchedulerE
   public RMAppAttemptState getFinalAttemptState() {
     return this.finalAttemptState;
   }
-
-  public boolean getKeepContainersAcrossAppAttempts() {
-    return this.keepContainersAcrossAppAttempts;
-  }
 }

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java Fri Feb  7 20:33:01 2014
@@ -24,11 +24,7 @@ public enum SchedulerEventType {
   NODE_ADDED,
   NODE_REMOVED,
   NODE_UPDATE,
-
-  // Source: RMApp
-  APP_ADDED,
-  APP_REMOVED,
-
+  
   // Source: RMAppAttempt
   APP_ATTEMPT_ADDED,
   APP_ATTEMPT_REMOVED,

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Fri Feb  7 20:33:01 2014
@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 
 @Private
 @Unstable

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Fri Feb  7 20:33:01 2014
@@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.util.resou
  */
 @Private
 @Unstable
-public class FSSchedulerApp extends SchedulerApplicationAttempt {
+public class FSSchedulerApp extends SchedulerApplication {
 
   private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
 



Mime
View raw message