hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject svn commit: r1619012 [14/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...
Date Tue, 19 Aug 2014 23:51:01 GMT
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Aug 19 23:49:39 2014
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,10 +39,10 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -49,19 +50,13 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -69,16 +64,15 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -96,17 +90,20 @@ import org.apache.hadoop.yarn.util.resou
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
-public class CapacityScheduler extends AbstractYarnScheduler
-  implements PreemptableResourceScheduler, CapacitySchedulerContext,
-             Configurable {
+public class CapacityScheduler extends
+    AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
+    PreemptableResourceScheduler, CapacitySchedulerContext, Configurable {
 
   private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
 
   private CSQueue root;
+  // timeout to join when we stop this service
+  protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
 
   static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
     @Override
@@ -182,17 +179,7 @@ public class CapacityScheduler extends A
 
   private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
 
-  private Map<NodeId, FiCaSchedulerNode> nodes = 
-      new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
-
-  private Resource clusterResource = 
-    RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
-  private int numNodeManagers = 0;
-
-  private Resource minimumAllocation;
-  private Resource maximumAllocation;
-
-  private boolean initialized = false;
+  private AtomicInteger numNodeManagers = new AtomicInteger(0);
 
   private ResourceCalculator calculator;
   private boolean usePortForNodeName;
@@ -209,7 +196,19 @@ public class CapacityScheduler extends A
           + ".scheduling-interval-ms";
   private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
   
-  public CapacityScheduler() {}
+  private boolean overrideWithQueueMappings = false;
+  private List<QueueMapping> mappings = null;
+  private Groups groups;
+
+  @VisibleForTesting
+  public synchronized String getMappedQueueForTest(String user)
+      throws IOException {
+    return getMappedQueue(user);
+  }
+
+  public CapacityScheduler() {
+    super(CapacityScheduler.class.getName());
+  }
 
   @Override
   public QueueMetrics getRootQueueMetrics() {
@@ -231,16 +230,6 @@ public class CapacityScheduler extends A
   }
 
   @Override
-  public Resource getMinimumResourceCapability() {
-    return minimumAllocation;
-  }
-
-  @Override
-  public Resource getMaximumResourceCapability() {
-    return maximumAllocation;
-  }
-
-  @Override
   public Comparator<FiCaSchedulerApp> getApplicationComparator() {
     return applicationComparator;
   }
@@ -256,66 +245,95 @@ public class CapacityScheduler extends A
   }
 
   @Override
-  public synchronized int getNumClusterNodes() {
-    return numNodeManagers;
+  public int getNumClusterNodes() {
+    return numNodeManagers.get();
   }
 
   @Override
-  public RMContext getRMContext() {
+  public synchronized RMContext getRMContext() {
     return this.rmContext;
   }
 
   @Override
-  public Resource getClusterResources() {
-    return clusterResource;
+  public synchronized void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
   }
-  
+
+  private synchronized void initScheduler(Configuration configuration) throws
+      IOException {
+    this.conf = loadCapacitySchedulerConfiguration(configuration);
+    validateConf(this.conf);
+    this.minimumAllocation = this.conf.getMinimumAllocation();
+    this.maximumAllocation = this.conf.getMaximumAllocation();
+    this.calculator = this.conf.getResourceCalculator();
+    this.usePortForNodeName = this.conf.getUsePortForNodeName();
+    this.applications =
+        new ConcurrentHashMap<ApplicationId,
+            SchedulerApplication<FiCaSchedulerApp>>();
+    initializeQueues(this.conf);
+
+    scheduleAsynchronously = this.conf.getScheduleAynschronously();
+    asyncScheduleInterval =
+        this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
+            DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+    if (scheduleAsynchronously) {
+      asyncSchedulerThread = new AsyncScheduleThread(this);
+    }
+
+    LOG.info("Initialized CapacityScheduler with " +
+        "calculator=" + getResourceCalculator().getClass() + ", " +
+        "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
+        "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
+        "asynchronousScheduling=" + scheduleAsynchronously + ", " +
+        "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
+  }
+
+  private synchronized void startSchedulerThreads() {
+    if (scheduleAsynchronously) {
+      Preconditions.checkNotNull(asyncSchedulerThread,
+          "asyncSchedulerThread is null");
+      asyncSchedulerThread.start();
+    }
+  }
+
   @Override
-  public synchronized void
-      reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+  public void serviceInit(Configuration conf) throws Exception {
     Configuration configuration = new Configuration(conf);
-    if (!initialized) {
-      this.rmContext = rmContext;
-      this.conf = loadCapacitySchedulerConfiguration(configuration);
-      validateConf(this.conf);
-      this.minimumAllocation = this.conf.getMinimumAllocation();
-      this.maximumAllocation = this.conf.getMaximumAllocation();
-      this.calculator = this.conf.getResourceCalculator();
-      this.usePortForNodeName = this.conf.getUsePortForNodeName();
-      this.applications =
-          new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
+    initScheduler(configuration);
+    super.serviceInit(conf);
+  }
 
-      initializeQueues(this.conf);
-      
-      scheduleAsynchronously = this.conf.getScheduleAynschronously();
-      asyncScheduleInterval = 
-          this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, 
-              DEFAULT_ASYNC_SCHEDULER_INTERVAL);
-      if (scheduleAsynchronously) {
-        asyncSchedulerThread = new AsyncScheduleThread(this);
-        asyncSchedulerThread.start();
-      }
-      
-      initialized = true;
-      LOG.info("Initialized CapacityScheduler with " +
-          "calculator=" + getResourceCalculator().getClass() + ", " +
-          "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
-          "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
-          "asynchronousScheduling=" + scheduleAsynchronously + ", " +
-          "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
-      
-    } else {
-      CapacitySchedulerConfiguration oldConf = this.conf; 
-      this.conf = loadCapacitySchedulerConfiguration(configuration);
-      validateConf(this.conf);
-      try {
-        LOG.info("Re-initializing queues...");
-        reinitializeQueues(this.conf);
-      } catch (Throwable t) {
-        this.conf = oldConf;
-        throw new IOException("Failed to re-init queues", t);
+  @Override
+  public void serviceStart() throws Exception {
+    startSchedulerThreads();
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    synchronized (this) {
+      if (scheduleAsynchronously && asyncSchedulerThread != null) {
+        asyncSchedulerThread.interrupt();
+        asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS);
       }
     }
+    super.serviceStop();
+  }
+
+  @Override
+  public synchronized void
+  reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+    Configuration configuration = new Configuration(conf);
+    CapacitySchedulerConfiguration oldConf = this.conf;
+    this.conf = loadCapacitySchedulerConfiguration(configuration);
+    validateConf(this.conf);
+    try {
+      LOG.info("Re-initializing queues...");
+      reinitializeQueues(this.conf);
+    } catch (Throwable t) {
+      this.conf = oldConf;
+      throw new IOException("Failed to re-init queues", t);
+    }
   }
   
   long getAsyncScheduleInterval() {
@@ -390,7 +408,32 @@ public class CapacityScheduler extends A
     }
   }
   private static final QueueHook noop = new QueueHook();
-  
+
+  private void initializeQueueMappings() throws IOException {
+    overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
+    LOG.info("Initialized queue mappings, override: "
+        + overrideWithQueueMappings);
+    // Get new user/group mappings
+    List<QueueMapping> newMappings = conf.getQueueMappings();
+    //check if mappings refer to valid queues
+    for (QueueMapping mapping : newMappings) {
+      if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
+          !mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+        CSQueue queue = queues.get(mapping.queue);
+        if (queue == null || !(queue instanceof LeafQueue)) {
+          throw new IOException(
+              "mapping contains invalid or non-leaf queue " + mapping.queue);
+        }
+      }
+    }
+    //apply the new mappings since they are valid
+    mappings = newMappings;
+    // initialize groups if mappings are present
+    if (mappings.size() > 0) {
+      groups = new Groups(conf);
+    }
+  }
+
   @Lock(CapacityScheduler.class)
   private void initializeQueues(CapacitySchedulerConfiguration conf)
     throws IOException {
@@ -398,7 +441,9 @@ public class CapacityScheduler extends A
     root = 
         parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, 
             queues, queues, noop);
+
     LOG.info("Initialized root queue " + root);
+    initializeQueueMappings();
   }
 
   @Lock(CapacityScheduler.class)
@@ -418,6 +463,7 @@ public class CapacityScheduler extends A
     
     // Re-configure queues
     root.reinitialize(newRoot, clusterResource);
+    initializeQueueMappings();
   }
 
   /**
@@ -505,12 +551,73 @@ public class CapacityScheduler extends A
   }
 
   synchronized CSQueue getQueue(String queueName) {
+    if (queueName == null) {
+      return null;
+    }
     return queues.get(queueName);
   }
 
+  private static final String CURRENT_USER_MAPPING = "%user";
+
+  private static final String PRIMARY_GROUP_MAPPING = "%primary_group";
+
+  private String getMappedQueue(String user) throws IOException {
+    for (QueueMapping mapping : mappings) {
+      if (mapping.type == MappingType.USER) {
+        if (mapping.source.equals(CURRENT_USER_MAPPING)) {
+          if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
+            return user;
+          }
+          else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+            return groups.getGroups(user).get(0);
+          }
+          else {
+            return mapping.queue;
+          }
+        }
+        if (user.equals(mapping.source)) {
+          return mapping.queue;
+        }
+      }
+      if (mapping.type == MappingType.GROUP) {
+        for (String userGroups : groups.getGroups(user)) {
+          if (userGroups.equals(mapping.source)) {
+            return mapping.queue;
+          }
+        }
+      }
+    }
+    return null;
+  }
+
   private synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user) {
-    // santiy checks.
+    String queueName, String user, boolean isAppRecovering) {
+
+    if (mappings != null && mappings.size() > 0) {
+      try {
+        String mappedQueue = getMappedQueue(user);
+        if (mappedQueue != null) {
+          // We have a mapping, should we use it?
+          if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
+              || overrideWithQueueMappings) {
+            LOG.info("Application " + applicationId + " user " + user
+                + " mapping [" + queueName + "] to [" + mappedQueue
+                + "] override " + overrideWithQueueMappings);
+            queueName = mappedQueue;
+            RMApp rmApp = rmContext.getRMApps().get(applicationId);
+            rmApp.setQueue(queueName);
+          }
+        }
+      } catch (IOException ioex) {
+        String message = "Failed to submit application " + applicationId +
+            " submitted by user " + user + " reason: " + ioex.getMessage();
+        this.rmContext.getDispatcher().getEventHandler()
+            .handle(new RMAppRejectedEvent(applicationId, message));
+        return;
+      }
+    }
+
+    // sanity checks.
     CSQueue queue = getQueue(queueName);
     if (queue == null) {
       String message = "Application " + applicationId + 
@@ -536,19 +643,28 @@ public class CapacityScheduler extends A
           .handle(new RMAppRejectedEvent(applicationId, ace.toString()));
       return;
     }
-    SchedulerApplication application =
-        new SchedulerApplication(queue, user);
+    // update the metrics
+    queue.getMetrics().submitApp(user);
+    SchedulerApplication<FiCaSchedulerApp> application =
+        new SchedulerApplication<FiCaSchedulerApp>(queue, user);
     applications.put(applicationId, application);
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", in queue: " + queueName);
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   private synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
-      boolean transferStateFromPreviousAttempt) {
-    SchedulerApplication application =
+      boolean transferStateFromPreviousAttempt,
+      boolean isAttemptRecovering) {
+    SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     CSQueue queue = (CSQueue) application.getQueue();
 
@@ -565,14 +681,22 @@ public class CapacityScheduler extends A
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user " + application.getUser() + " in queue "
         + queue.getQueueName());
-    rmContext.getDispatcher().getEventHandler() .handle(
+    if (isAttemptRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
         new RMAppAttemptEvent(applicationAttemptId,
-          RMAppAttemptEventType.ATTEMPT_ADDED));
+            RMAppAttemptEventType.ATTEMPT_ADDED));
+    }
   }
 
   private synchronized void doneApplication(ApplicationId applicationId,
       RMAppState finalState) {
-    SchedulerApplication application = applications.get(applicationId);
+    SchedulerApplication<FiCaSchedulerApp> application =
+        applications.get(applicationId);
     if (application == null){
       // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps,
       // ignore it.
@@ -597,7 +721,7 @@ public class CapacityScheduler extends A
     		" finalState=" + rmAppAttemptFinalState);
     
     FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
-    SchedulerApplication application =
+    SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
 
     if (application == null || attempt == null) {
@@ -659,25 +783,11 @@ public class CapacityScheduler extends A
     
     // Sanity check
     SchedulerUtils.normalizeRequests(
-        ask, getResourceCalculator(), getClusterResources(),
+        ask, getResourceCalculator(), getClusterResource(),
         getMinimumResourceCapability(), maximumAllocation);
 
     // Release containers
-    for (ContainerId releasedContainerId : release) {
-      RMContainer rmContainer = getRMContainer(releasedContainerId);
-      if (rmContainer == null) {
-         RMAuditLogger.logFailure(application.getUser(),
-             AuditConstants.RELEASE_CONTAINER, 
-             "Unauthorized access or invalid container", "CapacityScheduler",
-             "Trying to release container not owned by app or with invalid id",
-             application.getApplicationId(), releasedContainerId);
-      }
-      completedContainer(rmContainer,
-          SchedulerUtils.createAbnormalContainerStatus(
-              releasedContainerId, 
-              SchedulerUtils.RELEASED_CONTAINER),
-          RMContainerEventType.RELEASED);
-    }
+    releaseContainers(release, application);
 
     synchronized (application) {
 
@@ -757,7 +867,10 @@ public class CapacityScheduler extends A
     FiCaSchedulerNode node = getNode(nm.getNodeID());
     
     // Update resource if any change
-    SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
+    if (SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource,
+        LOG)) {
+      root.updateClusterResource(clusterResource);
+    }
     
     List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
     List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
@@ -822,7 +935,7 @@ public class CapacityScheduler extends A
 
     // Try to schedule more if there are no reservations to fulfill
     if (node.getReservedContainer() == null) {
-      if (Resources.greaterThanOrEqual(calculator, getClusterResources(),
+      if (Resources.greaterThanOrEqual(calculator, getClusterResource(),
           node.getAvailableResource(), minimumAllocation)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Trying to schedule on node: " + node.getNodeName() +
@@ -839,21 +952,6 @@ public class CapacityScheduler extends A
   
   }
 
-  private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
-    // Get the application for the finished container
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-      return;
-    }
-    
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
   @Override
   public void handle(SchedulerEvent event) {
     switch(event.getType()) {
@@ -861,6 +959,8 @@ public class CapacityScheduler extends A
     {
       NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
       addNode(nodeAddedEvent.getAddedRMNode());
+      recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
+        nodeAddedEvent.getAddedRMNode());
     }
     break;
     case NODE_REMOVED:
@@ -883,7 +983,8 @@ public class CapacityScheduler extends A
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(),
+        appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering());
     }
     break;
     case APP_REMOVED:
@@ -898,7 +999,8 @@ public class CapacityScheduler extends A
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+        appAttemptAddedEvent.getIsAttemptRecovering());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -932,25 +1034,25 @@ public class CapacityScheduler extends A
         usePortForNodeName));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     root.updateClusterResource(clusterResource);
-    ++numNodeManagers;
+    int numNodes = numNodeManagers.incrementAndGet();
     LOG.info("Added node " + nodeManager.getNodeAddress() + 
         " clusterResource: " + clusterResource);
 
-    if (scheduleAsynchronously && numNodeManagers == 1) {
+    if (scheduleAsynchronously && numNodes == 1) {
       asyncSchedulerThread.beginSchedule();
     }
   }
 
   private synchronized void removeNode(RMNode nodeInfo) {
-    FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
+    FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID());
     if (node == null) {
       return;
     }
     Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
     root.updateClusterResource(clusterResource);
-    --numNodeManagers;
+    int numNodes = numNodeManagers.decrementAndGet();
 
-    if (scheduleAsynchronously && numNodeManagers == 0) {
+    if (scheduleAsynchronously && numNodes == 0) {
       asyncSchedulerThread.suspendSchedule();
     }
     
@@ -980,7 +1082,8 @@ public class CapacityScheduler extends A
   }
   
   @Lock(CapacityScheduler.class)
-  private synchronized void completedContainer(RMContainer rmContainer,
+  @Override
+  protected synchronized void completedContainer(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {
     if (rmContainer == null) {
       LOG.info("Null container completed...");
@@ -1015,28 +1118,10 @@ public class CapacityScheduler extends A
 
   @Lock(Lock.NoLock.class)
   @VisibleForTesting
-  public FiCaSchedulerApp getApplicationAttempt(
-      ApplicationAttemptId applicationAttemptId) {
-    SchedulerApplication app =
-        applications.get(applicationAttemptId.getApplicationId());
-    if (app != null) {
-      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
-  }
-
-  @Override
-  public SchedulerAppReport getSchedulerAppInfo(
-      ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
-    return app == null ? null : new SchedulerAppReport(app);
-  }
-  
   @Override
-  public ApplicationResourceUsageReport getAppResourceUsageReport(
+  public FiCaSchedulerApp getApplicationAttempt(
       ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
-    return app == null ? null : app.getResourceUsageReport();
+    return super.getApplicationAttempt(applicationAttemptId);
   }
   
   @Lock(Lock.NoLock.class)
@@ -1048,24 +1133,6 @@ public class CapacityScheduler extends A
   Map<NodeId, FiCaSchedulerNode> getAllNodes() {
     return nodes;
   }
-  
-  @Override
-  public RMContainer getRMContainer(ContainerId containerId) {
-    FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
-    return (attempt == null) ? null : attempt.getRMContainer(containerId);
-  }
-
-  @VisibleForTesting
-  public FiCaSchedulerApp getCurrentAttemptForContainer(
-      ContainerId containerId) {
-    SchedulerApplication app =
-        applications.get(containerId.getApplicationAttemptId()
-          .getApplicationId());
-    if (app != null) {
-      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
-  }
 
   @Override
   @Lock(Lock.NoLock.class)
@@ -1074,12 +1141,6 @@ public class CapacityScheduler extends A
   }
 
   @Override
-  public SchedulerNodeReport getNodeReport(NodeId nodeId) {
-    FiCaSchedulerNode node = getNode(nodeId);
-    return node == null ? null : new SchedulerNodeReport(node);
-  }
-
-  @Override
   public void dropContainerReservation(RMContainer container) {
     if(LOG.isDebugEnabled()){
       LOG.debug("DROP_RESERVATION:" + container.toString());
@@ -1105,14 +1166,13 @@ public class CapacityScheduler extends A
 
   @Override
   public void killContainer(RMContainer cont) {
-    if(LOG.isDebugEnabled()){
+    if (LOG.isDebugEnabled()) {
       LOG.debug("KILL_CONTAINER: container" + cont.toString());
     }
-    completedContainer(cont,
-        SchedulerUtils.createPreemptedContainerStatus(
-            cont.getContainerId(),"Container being forcibly preempted:"
-        + cont.getContainerId()),
-        RMContainerEventType.KILL);
+    recoverResourceRequestForContainer(cont);
+    completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
+      cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
+      RMContainerEventType.KILL);
   }
 
   @Override
@@ -1156,4 +1216,59 @@ public class CapacityScheduler extends A
       throw new IOException(e);
     }
   }
+
+  @Override
+  public synchronized String moveApplication(ApplicationId appId,
+      String targetQueueName) throws YarnException {
+    FiCaSchedulerApp app =
+        getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0));
+    String sourceQueueName = app.getQueue().getQueueName();
+    LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
+    LeafQueue dest = getAndCheckLeafQueue(targetQueueName);
+    // Validation check - ACLs, submission limits for user & queue
+    String user = app.getUser();
+    try {
+      dest.submitApplication(appId, user, targetQueueName);
+    } catch (AccessControlException e) {
+      throw new YarnException(e);
+    }
+    // Move all live containers
+    for (RMContainer rmContainer : app.getLiveContainers()) {
+      source.detachContainer(clusterResource, app, rmContainer);
+      // attach the Container to another queue
+      dest.attachContainer(clusterResource, app, rmContainer);
+    }
+    // Detach the application..
+    source.finishApplicationAttempt(app, sourceQueueName);
+    source.getParent().finishApplication(appId, app.getUser());
+    // Finish app & update metrics
+    app.move(dest);
+    // Submit to a new queue
+    dest.submitApplicationAttempt(app, user);
+    applications.get(appId).setQueue(dest);
+    LOG.info("App: " + app.getApplicationId() + " successfully moved from "
+        + sourceQueueName + " to: " + targetQueueName);
+    return targetQueueName;
+  }
+
+  /**
+   * Check that the String provided in input is the name of an existing,
+   * LeafQueue, if successful returns the queue.
+   *
+   * @param queue
+   * @return the LeafQueue
+   * @throws YarnException
+   */
+  private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
+    CSQueue ret = this.getQueue(queue);
+    if (ret == null) {
+      throw new YarnException("The specified Queue: " + queue
+          + " doesn't exist");
+    }
+    if (!(ret instanceof LeafQueue)) {
+      throw new YarnException("The specified Queue: " + queue
+          + " is not a Leaf Queue. Move is supported only for Leaf Queues.");
+    }
+    return (LeafQueue) ret;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Tue Aug 19 23:49:39 2014
@@ -18,8 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -145,6 +144,44 @@ public class CapacitySchedulerConfigurat
 
   @Private
   public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false;
+
+  @Private
+  public static final String QUEUE_MAPPING = PREFIX + "queue-mappings";
+
+  @Private
+  public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable";
+
+  @Private
+  public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false;
+
+  @Private
+  public static class QueueMapping {
+
+    public enum MappingType {
+
+      USER("u"),
+      GROUP("g");
+      private final String type;
+      private MappingType(String type) {
+        this.type = type;
+      }
+
+      public String toString() {
+        return type;
+      }
+
+    };
+
+    MappingType type;
+    String source;
+    String queue;
+
+    public QueueMapping(MappingType type, String source, String queue) {
+      this.type = type;
+      this.source = source;
+      this.queue = queue;
+    }
+  }
   
   public CapacitySchedulerConfiguration() {
     this(new Configuration());
@@ -378,4 +415,82 @@ public class CapacitySchedulerConfigurat
     setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async);
   }
 
+  public boolean getOverrideWithQueueMappings() {
+    return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE,
+        DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE);
+  }
+
+  /**
+   * Returns a collection of strings, trimming leading and trailing whitespeace
+   * on each value
+   *
+   * @param str
+   *          String to parse
+   * @param delim
+   *          delimiter to separate the values
+   * @return Collection of parsed elements.
+   */
+  private static Collection<String> getTrimmedStringCollection(String str,
+      String delim) {
+    List<String> values = new ArrayList<String>();
+    if (str == null)
+      return values;
+    StringTokenizer tokenizer = new StringTokenizer(str, delim);
+    while (tokenizer.hasMoreTokens()) {
+      String next = tokenizer.nextToken();
+      if (next == null || next.trim().isEmpty()) {
+        continue;
+      }
+      values.add(next.trim());
+    }
+    return values;
+  }
+
+  /**
+   * Get user/group mappings to queues.
+   *
+   * @return user/groups mappings or null on illegal configs
+   */
+  public List<QueueMapping> getQueueMappings() {
+    List<QueueMapping> mappings =
+        new ArrayList<CapacitySchedulerConfiguration.QueueMapping>();
+    Collection<String> mappingsString =
+        getTrimmedStringCollection(QUEUE_MAPPING);
+    for (String mappingValue : mappingsString) {
+      String[] mapping =
+          getTrimmedStringCollection(mappingValue, ":")
+              .toArray(new String[] {});
+      if (mapping.length != 3 || mapping[1].length() == 0
+          || mapping[2].length() == 0) {
+        throw new IllegalArgumentException(
+            "Illegal queue mapping " + mappingValue);
+      }
+
+      QueueMapping m;
+      try {
+        QueueMapping.MappingType mappingType;
+        if (mapping[0].equals("u")) {
+          mappingType = QueueMapping.MappingType.USER;
+        } else if (mapping[0].equals("g")) {
+          mappingType = QueueMapping.MappingType.GROUP;
+        } else {
+          throw new IllegalArgumentException(
+              "unknown mapping prefix " + mapping[0]);
+        }
+        m = new QueueMapping(
+                mappingType,
+                mapping[1],
+                mapping[2]);
+      } catch (Throwable t) {
+        throw new IllegalArgumentException(
+            "Illegal queue mapping " + mappingValue);
+      }
+
+      if (m != null) {
+        mappings.add(m);
+      }
+    }
+
+    return mappings;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java Tue Aug 19 23:49:39 2014
@@ -43,7 +43,7 @@ public interface CapacitySchedulerContex
 
   RMContext getRMContext();
   
-  Resource getClusterResources();
+  Resource getClusterResource();
 
   /**
    * Get the yarn configuration.

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Tue Aug 19 23:49:39 2014
@@ -59,15 +59,17 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.annotations.VisibleForTesting;
+
 @Private
 @Unstable
 public class LeafQueue implements CSQueue {
@@ -174,12 +176,12 @@ public class LeafQueue implements CSQueu
     int maxActiveApplications = 
         CSQueueUtils.computeMaxActiveApplications(
             resourceCalculator,
-            cs.getClusterResources(), this.minimumAllocation,
+            cs.getClusterResource(), this.minimumAllocation,
             maxAMResourcePerQueuePercent, absoluteMaxCapacity);
     this.maxActiveAppsUsingAbsCap = 
             CSQueueUtils.computeMaxActiveApplications(
                 resourceCalculator,
-                cs.getClusterResources(), this.minimumAllocation,
+                cs.getClusterResource(), this.minimumAllocation,
                 maxAMResourcePerQueuePercent, absoluteCapacity);
     int maxActiveApplicationsPerUser = 
         CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveAppsUsingAbsCap, userLimit, 
@@ -195,7 +197,7 @@ public class LeafQueue implements CSQueu
       cs.getConfiguration().getAcls(getQueuePath());
 
     setupQueueConfigs(
-        cs.getClusterResources(),
+        cs.getClusterResource(),
         capacity, absoluteCapacity, 
         maximumCapacity, absoluteMaxCapacity, 
         userLimit, userLimitFactor, 
@@ -564,7 +566,8 @@ public class LeafQueue implements CSQueu
         "numContainers=" + getNumContainers();  
   }
 
-  private synchronized User getUser(String userName) {
+  @VisibleForTesting
+  public synchronized User getUser(String userName) {
     User user = users.get(userName);
     if (user == null) {
       user = new User();
@@ -640,7 +643,10 @@ public class LeafQueue implements CSQueu
       addApplicationAttempt(application, user);
     }
 
-    metrics.submitAppAttempt(userName);
+    // We don't want to update metrics for move app
+    if (application.isPending()) {
+      metrics.submitAppAttempt(userName);
+    }
     getParent().submitApplicationAttempt(application, userName);
   }
 
@@ -698,7 +704,6 @@ public class LeafQueue implements CSQueu
       throw ace;
     }
 
-    metrics.submitApp(userName);
   }
 
   private synchronized void activateApplications() {
@@ -973,13 +978,18 @@ public class LeafQueue implements CSQueu
 
     Resource userLimit =                          // User limit
         computeUserLimit(application, clusterResource, required);
-    
+
+    //Max avail capacity needs to take into account usage by ancestor-siblings
+    //which are greater than their base capacity, so we are interested in "max avail"
+    //capacity
+    float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity(
+      resourceCalculator, clusterResource, this);
 
     Resource queueMaxCap =                        // Queue Max-Capacity
         Resources.multiplyAndNormalizeDown(
             resourceCalculator, 
             clusterResource, 
-            absoluteMaxCapacity, 
+            absoluteMaxAvailCapacity,
             minimumAllocation);
     
     Resource userConsumed = getUser(user).getConsumedResources(); 
@@ -1346,8 +1356,7 @@ public class LeafQueue implements CSQueu
       }
 
       // Inform the node
-      node.allocateContainer(application.getApplicationId(), 
-          allocatedContainer);
+      node.allocateContainer(allocatedContainer);
 
       LOG.info("assignedContainer" +
           " application attempt=" + application.getApplicationAttemptId() +
@@ -1446,7 +1455,7 @@ public class LeafQueue implements CSQueu
   }
 
   synchronized void allocateResource(Resource clusterResource, 
-      FiCaSchedulerApp application, Resource resource) {
+      SchedulerApplicationAttempt application, Resource resource) {
     // Update queue metrics
     Resources.addTo(usedResources, resource);
     CSQueueUtils.updateQueueStatistics(
@@ -1530,7 +1539,8 @@ public class LeafQueue implements CSQueu
     return metrics;
   }
 
-  static class User {
+  @VisibleForTesting
+  public static class User {
     Resource consumed = Resources.createResource(0, 0);
     int pendingApplications = 0;
     int activeApplications = 0;
@@ -1580,13 +1590,16 @@ public class LeafQueue implements CSQueu
 
   @Override
   public void recoverContainer(Resource clusterResource,
-      FiCaSchedulerApp application, Container container) {
+      SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
+    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+      return;
+    }
     // Careful! Locking order is important! 
     synchronized (this) {
-      allocateResource(clusterResource, application, container.getResource());
+      allocateResource(clusterResource, attempt, rmContainer.getContainer()
+        .getResource());
     }
-    getParent().recoverContainer(clusterResource, application, container);
-
+    getParent().recoverContainer(clusterResource, attempt, rmContainer);
   }
 
   /**
@@ -1609,9 +1622,43 @@ public class LeafQueue implements CSQueu
   @Override
   public void collectSchedulerApplications(
       Collection<ApplicationAttemptId> apps) {
+    for (FiCaSchedulerApp pendingApp : pendingApplications) {
+      apps.add(pendingApp.getApplicationAttemptId());
+    }
     for (FiCaSchedulerApp app : activeApplications) {
       apps.add(app.getApplicationAttemptId());
     }
   }
 
+  @Override
+  public void attachContainer(Resource clusterResource,
+      FiCaSchedulerApp application, RMContainer rmContainer) {
+    if (application != null) {
+      allocateResource(clusterResource, application, rmContainer.getContainer()
+          .getResource());
+      LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+          + " resource=" + rmContainer.getContainer().getResource()
+          + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
+          + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+          + usedResources + " cluster=" + clusterResource);
+      // Inform the parent queue
+      getParent().attachContainer(clusterResource, application, rmContainer);
+    }
+  }
+
+  @Override
+  public void detachContainer(Resource clusterResource,
+      FiCaSchedulerApp application, RMContainer rmContainer) {
+    if (application != null) {
+      releaseResource(clusterResource, application, rmContainer.getContainer()
+          .getResource());
+      LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+          + " resource=" + rmContainer.getContainer().getResource()
+          + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
+          + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+          + usedResources + " cluster=" + clusterResource);
+      // Inform the parent queue
+      getParent().detachContainer(clusterResource, application, rmContainer);
+    }
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Tue Aug 19 23:49:39 2014
@@ -38,7 +38,6 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -49,9 +48,11 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -143,7 +144,7 @@ public class ParentQueue implements CSQu
     this.queueInfo.setQueueName(queueName);
     this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
 
-    setupQueueConfigs(cs.getClusterResources(),
+    setupQueueConfigs(cs.getClusterResource(),
         capacity, absoluteCapacity, 
         maximumCapacity, absoluteMaxCapacity, state, acls);
     
@@ -770,13 +771,16 @@ public class ParentQueue implements CSQu
   
   @Override
   public void recoverContainer(Resource clusterResource,
-      FiCaSchedulerApp application, Container container) {
+      SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
+    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+      return;
+    }
     // Careful! Locking order is important! 
     synchronized (this) {
-      allocateResource(clusterResource, container.getResource());
+      allocateResource(clusterResource,rmContainer.getContainer().getResource());
     }
     if (parent != null) {
-      parent.recoverContainer(clusterResource, application, container);
+      parent.recoverContainer(clusterResource, attempt, rmContainer);
     }
   }
 
@@ -787,4 +791,37 @@ public class ParentQueue implements CSQu
       queue.collectSchedulerApplications(apps);
     }
   }
+
+  @Override
+  public void attachContainer(Resource clusterResource,
+      FiCaSchedulerApp application, RMContainer rmContainer) {
+    if (application != null) {
+      allocateResource(clusterResource, rmContainer.getContainer()
+          .getResource());
+      LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+          + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+          + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
+          + clusterResource);
+      // Inform the parent
+      if (parent != null) {
+        parent.attachContainer(clusterResource, application, rmContainer);
+      }
+    }
+  }
+
+  @Override
+  public void detachContainer(Resource clusterResource,
+      FiCaSchedulerApp application, RMContainer rmContainer) {
+    if (application != null) {
+      releaseResource(clusterResource, rmContainer.getContainer().getResource());
+      LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
+          + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+          + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
+          + clusterResource);
+      // Inform the parent
+      if (parent != null) {
+        parent.detachContainer(clusterResource, application, rmContainer);
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Tue Aug 19 23:49:39 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -77,6 +78,9 @@ public class FiCaSchedulerApp extends Sc
     if (null == liveContainers.remove(rmContainer.getContainerId())) {
       return false;
     }
+    
+    // Remove from the list of newly allocated containers if found
+    newlyAllocatedContainers.remove(rmContainer);
 
     Container container = rmContainer.getContainer();
     ContainerId containerId = container.getId();
@@ -129,8 +133,12 @@ public class FiCaSchedulerApp extends Sc
     liveContainers.put(container.getId(), rmContainer);    
 
     // Update consumption and track allocations
-    appSchedulingInfo.allocate(type, node, priority, request, container);
+    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+        type, node, priority, request, container);
     Resources.addTo(currentConsumption, container.getResource());
+    
+    // Update resource requests related to "request" and store in RMContainer 
+    ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
 
     // Inform the container
     rmContainer.handle(

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Tue Aug 19 23:49:39 2014
@@ -18,248 +18,85 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.util.resource.Resources;
 
 public class FiCaSchedulerNode extends SchedulerNode {
 
   private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class);
 
-  private static final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
-
-  private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
-  private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
-  private Resource totalResourceCapability;
-
-  private volatile int numContainers;
-
-  private RMContainer reservedContainer;
-  
-  /* set of containers that are allocated containers */
-  private final Map<ContainerId, RMContainer> launchedContainers = 
-    new HashMap<ContainerId, RMContainer>();
-  
-  private final RMNode rmNode;
-  private final String nodeName;
-
   public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) {
-    this.rmNode = node;
-    this.availableResource.setMemory(node.getTotalCapability().getMemory());
-    this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores());
-    totalResourceCapability =
-        Resource.newInstance(node.getTotalCapability().getMemory(), node
-            .getTotalCapability().getVirtualCores());
-    if (usePortForNodeName) {
-      nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
-    } else {
-      nodeName = rmNode.getHostName();
-    }
-  }
-
-  public RMNode getRMNode() {
-    return this.rmNode;
-  }
-
-  public NodeId getNodeID() {
-    return this.rmNode.getNodeID();
-  }
-
-  public String getHttpAddress() {
-    return this.rmNode.getHttpAddress();
-  }
-
-  @Override
-  public String getNodeName() {
-    return nodeName;
-  }
-
-  @Override
-  public String getRackName() {
-    return this.rmNode.getRackName();
-  }
-
-  /**
-   * The Scheduler has allocated containers on this node to the 
-   * given application.
-   * 
-   * @param applicationId application
-   * @param rmContainer allocated container
-   */
-  public synchronized void allocateContainer(ApplicationId applicationId, 
-      RMContainer rmContainer) {
-    Container container = rmContainer.getContainer();
-    deductAvailableResource(container.getResource());
-    ++numContainers;
-    
-    launchedContainers.put(container.getId(), rmContainer);
-
-    LOG.info("Assigned container " + container.getId() + 
-        " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + 
-        ", which currently has " + numContainers + " containers, " + 
-        getUsedResource() + " used and " + 
-        getAvailableResource() + " available");
-  }
-
-  @Override
-  public synchronized Resource getAvailableResource() {
-    return this.availableResource;
-  }
-
-  @Override
-  public synchronized Resource getUsedResource() {
-    return this.usedResource;
-  }
-
-  @Override
-  public Resource getTotalResource() {
-    return this.totalResourceCapability;
-  }
-
-  private synchronized boolean isValidContainer(Container c) {    
-    if (launchedContainers.containsKey(c.getId()))
-      return true;
-    return false;
-  }
-
-  private synchronized void updateResource(Container container) {
-    addAvailableResource(container.getResource());
-    --numContainers;
-  }
-  
-  /**
-   * Release an allocated container on this node.
-   * @param container container to be released
-   */
-  public synchronized void releaseContainer(Container container) {
-    if (!isValidContainer(container)) {
-      LOG.error("Invalid container released " + container);
-      return;
-    }
-
-    /* remove the containers from the nodemanger */
-    if (null != launchedContainers.remove(container.getId())) {
-      updateResource(container);
-    }
-
-    LOG.info("Released container " + container.getId() + 
-        " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + 
-        ", which currently has " + numContainers + " containers, " + 
-        getUsedResource() + " used and " + getAvailableResource()
-        + " available" + ", release resources=" + true);
-  }
-
-
-  private synchronized void addAvailableResource(Resource resource) {
-    if (resource == null) {
-      LOG.error("Invalid resource addition of null resource for "
-          + rmNode.getNodeAddress());
-      return;
-    }
-    Resources.addTo(availableResource, resource);
-    Resources.subtractFrom(usedResource, resource);
-  }
-
-  private synchronized void deductAvailableResource(Resource resource) {
-    if (resource == null) {
-      LOG.error("Invalid deduction of null resource for "
-          + rmNode.getNodeAddress());
-      return;
-    }
-    Resources.subtractFrom(availableResource, resource);
-    Resources.addTo(usedResource, resource);
+    super(node, usePortForNodeName);
   }
 
   @Override
-  public String toString() {
-    return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +  
-      " available=" + getAvailableResource().getMemory() + 
-      " used=" + getUsedResource().getMemory();
-  }
-
-  @Override
-  public int getNumContainers() {
-    return numContainers;
-  }
-
-  public synchronized List<RMContainer> getRunningContainers() {
-    return new ArrayList<RMContainer>(launchedContainers.values());
-  }
-
   public synchronized void reserveResource(
-      SchedulerApplicationAttempt application, Priority priority, 
-      RMContainer reservedContainer) {
+      SchedulerApplicationAttempt application, Priority priority,
+      RMContainer container) {
     // Check if it's already reserved
-    if (this.reservedContainer != null) {
+    RMContainer reservedContainer = getReservedContainer();
+    if (reservedContainer != null) {
       // Sanity check
-      if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) {
+      if (!container.getContainer().getNodeId().equals(getNodeID())) {
         throw new IllegalStateException("Trying to reserve" +
-            " container " + reservedContainer +
-            " on node " + reservedContainer.getReservedNode() + 
-            " when currently" + " reserved resource " + this.reservedContainer +
-            " on node " + this.reservedContainer.getReservedNode());
+            " container " + container +
+            " on node " + container.getReservedNode() + 
+            " when currently" + " reserved resource " + reservedContainer +
+            " on node " + reservedContainer.getReservedNode());
       }
       
       // Cannot reserve more than one application attempt on a given node!
       // Reservation is still against attempt.
-      if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
-          reservedContainer.getContainer().getId().getApplicationAttemptId())) {
+      if (!reservedContainer.getContainer().getId().getApplicationAttemptId()
+          .equals(container.getContainer().getId().getApplicationAttemptId())) {
         throw new IllegalStateException("Trying to reserve" +
-        		" container " + reservedContainer + 
+            " container " + container + 
             " for application " + application.getApplicationAttemptId() + 
             " when currently" +
-            " reserved container " + this.reservedContainer +
+            " reserved container " + reservedContainer +
             " on node " + this);
       }
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("Updated reserved container "
-            + reservedContainer.getContainer().getId() + " on node " + this
+            + container.getContainer().getId() + " on node " + this
             + " for application attempt "
             + application.getApplicationAttemptId());
       }
     } else {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Reserved container "
-            + reservedContainer.getContainer().getId() + " on node " + this
+            + container.getContainer().getId() + " on node " + this
             + " for application attempt "
             + application.getApplicationAttemptId());
       }
     }
-    this.reservedContainer = reservedContainer;
+    setReservedContainer(container);
   }
 
+  @Override
   public synchronized void unreserveResource(
       SchedulerApplicationAttempt application) {
-    
+
     // adding NP checks as this can now be called for preemption
-    if (reservedContainer != null
-        && reservedContainer.getContainer() != null
-        && reservedContainer.getContainer().getId() != null
-        && reservedContainer.getContainer().getId().getApplicationAttemptId() != null) {
+    if (getReservedContainer() != null
+        && getReservedContainer().getContainer() != null
+        && getReservedContainer().getContainer().getId() != null
+        && getReservedContainer().getContainer().getId()
+          .getApplicationAttemptId() != null) {
 
       // Cannot unreserve for wrong application...
       ApplicationAttemptId reservedApplication =
-          reservedContainer.getContainer().getId().getApplicationAttemptId();
+          getReservedContainer().getContainer().getId()
+            .getApplicationAttemptId();
       if (!reservedApplication.equals(
           application.getApplicationAttemptId())) {
         throw new IllegalStateException("Trying to unreserve " +
@@ -269,17 +106,6 @@ public class FiCaSchedulerNode extends S
             " on node " + this);
       }
     }
-    reservedContainer = null;
-  }
-
-  public synchronized RMContainer getReservedContainer() {
-    return reservedContainer;
+    setReservedContainer(null);
   }
-
-  @Override
-  public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
-    // we can only adjust available resource if total resource is changed.
-    Resources.addTo(this.availableResource, deltaResource);
-  }
-
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java Tue Aug 19 23:49:39 2014
@@ -25,13 +25,20 @@ public class AppAddedSchedulerEvent exte
   private final ApplicationId applicationId;
   private final String queue;
   private final String user;
+  private final boolean isAppRecovering;
 
   public AppAddedSchedulerEvent(
       ApplicationId applicationId, String queue, String user) {
+    this(applicationId, queue, user, false);
+  }
+
+  public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+      String user, boolean isAppRecovering) {
     super(SchedulerEventType.APP_ADDED);
     this.applicationId = applicationId;
     this.queue = queue;
     this.user = user;
+    this.isAppRecovering = isAppRecovering;
   }
 
   public ApplicationId getApplicationId() {
@@ -46,4 +53,7 @@ public class AppAddedSchedulerEvent exte
     return user;
   }
 
+  public boolean getIsAppRecovering() {
+    return isAppRecovering;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Tue Aug 19 23:49:39 2014
@@ -24,13 +24,22 @@ public class AppAttemptAddedSchedulerEve
 
   private final ApplicationAttemptId applicationAttemptId;
   private final boolean transferStateFromPreviousAttempt;
+  private final boolean isAttemptRecovering;
 
   public AppAttemptAddedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt) {
+    this(applicationAttemptId, transferStateFromPreviousAttempt, false);
+  }
+
+  public AppAttemptAddedSchedulerEvent(
+      ApplicationAttemptId applicationAttemptId,
+      boolean transferStateFromPreviousAttempt,
+      boolean isAttemptRecovering) {
     super(SchedulerEventType.APP_ATTEMPT_ADDED);
     this.applicationAttemptId = applicationAttemptId;
     this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
+    this.isAttemptRecovering = isAttemptRecovering;
   }
 
   public ApplicationAttemptId getApplicationAttemptId() {
@@ -40,4 +49,8 @@ public class AppAttemptAddedSchedulerEve
   public boolean getTransferStateFromPreviousAttempt() {
     return transferStateFromPreviousAttempt;
   }
+
+  public boolean getIsAttemptRecovering() {
+    return isAttemptRecovering;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java Tue Aug 19 23:49:39 2014
@@ -18,19 +18,34 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
 
+import java.util.List;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 public class NodeAddedSchedulerEvent extends SchedulerEvent {
 
   private final RMNode rmNode;
+  private final List<NMContainerStatus> containerReports;
 
   public NodeAddedSchedulerEvent(RMNode rmNode) {
     super(SchedulerEventType.NODE_ADDED);
     this.rmNode = rmNode;
+    this.containerReports = null;
+  }
+
+  public NodeAddedSchedulerEvent(RMNode rmNode,
+      List<NMContainerStatus> containerReports) {
+    super(SchedulerEventType.NODE_ADDED);
+    this.rmNode = rmNode;
+    this.containerReports = containerReports;
   }
 
   public RMNode getAddedRMNode() {
     return rmNode;
   }
 
+  public List<NMContainerStatus> getContainerReports() {
+    return containerReports;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java Tue Aug 19 23:49:39 2014
@@ -53,6 +53,10 @@ public class AllocationConfiguration {
   private final int userMaxAppsDefault;
   private final int queueMaxAppsDefault;
 
+  // Maximum resource share for each leaf queue that can be used to run AMs
+  final Map<String, Float> queueMaxAMShares;
+  private final float queueMaxAMShareDefault;
+
   // ACL's for each queue. Only specifies non-default ACL's from configuration.
   private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
 
@@ -77,26 +81,32 @@ public class AllocationConfiguration {
   @VisibleForTesting
   QueuePlacementPolicy placementPolicy;
   
+  //Configured queues in the alloc xml
   @VisibleForTesting
-  Set<String> queueNames;
+  Map<FSQueueType, Set<String>> configuredQueues;
   
-  public AllocationConfiguration(Map<String, Resource> minQueueResources, 
-      Map<String, Resource> maxQueueResources, 
+  public AllocationConfiguration(Map<String, Resource> minQueueResources,
+      Map<String, Resource> maxQueueResources,
       Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
-      Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
-      int queueMaxAppsDefault, Map<String, SchedulingPolicy> schedulingPolicies,
+      Map<String, ResourceWeights> queueWeights,
+      Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
+      int queueMaxAppsDefault, float queueMaxAMShareDefault,
+      Map<String, SchedulingPolicy> schedulingPolicies,
       SchedulingPolicy defaultSchedulingPolicy,
-      Map<String, Long> minSharePreemptionTimeouts, 
+      Map<String, Long> minSharePreemptionTimeouts,
       Map<String, Map<QueueACL, AccessControlList>> queueAcls,
       long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout,
-      QueuePlacementPolicy placementPolicy, Set<String> queueNames) {
+      QueuePlacementPolicy placementPolicy,
+      Map<FSQueueType, Set<String>> configuredQueues) {
     this.minQueueResources = minQueueResources;
     this.maxQueueResources = maxQueueResources;
     this.queueMaxApps = queueMaxApps;
     this.userMaxApps = userMaxApps;
+    this.queueMaxAMShares = queueMaxAMShares;
     this.queueWeights = queueWeights;
     this.userMaxAppsDefault = userMaxAppsDefault;
     this.queueMaxAppsDefault = queueMaxAppsDefault;
+    this.queueMaxAMShareDefault = queueMaxAMShareDefault;
     this.defaultSchedulingPolicy = defaultSchedulingPolicy;
     this.schedulingPolicies = schedulingPolicies;
     this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
@@ -104,7 +114,7 @@ public class AllocationConfiguration {
     this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
     this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
     this.placementPolicy = placementPolicy;
-    this.queueNames = queueNames;
+    this.configuredQueues = configuredQueues;
   }
   
   public AllocationConfiguration(Configuration conf) {
@@ -113,17 +123,22 @@ public class AllocationConfiguration {
     queueWeights = new HashMap<String, ResourceWeights>();
     queueMaxApps = new HashMap<String, Integer>();
     userMaxApps = new HashMap<String, Integer>();
+    queueMaxAMShares = new HashMap<String, Float>();
     userMaxAppsDefault = Integer.MAX_VALUE;
     queueMaxAppsDefault = Integer.MAX_VALUE;
+    queueMaxAMShareDefault = -1.0f;
     queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
     minSharePreemptionTimeouts = new HashMap<String, Long>();
     defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
     fairSharePreemptionTimeout = Long.MAX_VALUE;
     schedulingPolicies = new HashMap<String, SchedulingPolicy>();
     defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
+    configuredQueues = new HashMap<FSQueueType, Set<String>>();
+    for (FSQueueType queueType : FSQueueType.values()) {
+      configuredQueues.put(queueType, new HashSet<String>());
+    }
     placementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
-        new HashSet<String>());
-    queueNames = new HashSet<String>();
+        configuredQueues);
   }
   
   /**
@@ -178,6 +193,11 @@ public class AllocationConfiguration {
     return (maxApps == null) ? queueMaxAppsDefault : maxApps;
   }
   
+  public float getQueueMaxAMShare(String queue) {
+    Float maxAMShare = queueMaxAMShares.get(queue);
+    return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare;
+  }
+
   /**
    * Get the minimum resource allocation for the given queue.
    * @return the cap set on this queue, or 0 if not set.
@@ -221,8 +241,8 @@ public class AllocationConfiguration {
     return defaultSchedulingPolicy;
   }
   
-  public Set<String> getQueueNames() {
-    return queueNames;
+  public Map<FSQueueType, Set<String>> getConfiguredQueues() {
+    return configuredQueues;
   }
   
   public QueuePlacementPolicy getPlacementPolicy() {



Mime
View raw message