hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject svn commit: r1618106 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-serve...
Date Fri, 15 Aug 2014 06:00:32 GMT
Author: jianhe
Date: Fri Aug 15 06:00:31 2014
New Revision: 1618106

URL: http://svn.apache.org/r1618106
Log:
YARN-2378. Added support for moving applications across queues in CapacityScheduler. Contributed by Subramaniam Venkatraman Krishnan

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1618106&r1=1618105&r2=1618106&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Fri Aug 15 06:00:31 2014
@@ -50,6 +50,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2277. Added cross-origin support for the timeline server web services.
     (Jonathan Eagles via zjshen)
 
+    YARN-2378. Added support for moving applications across queues in
+    CapacityScheduler. (Subramaniam Venkatraman Krishnan via jianhe)
+
   IMPROVEMENTS
 
     YARN-2242. Improve exception information on AM launch crashes. (Li Lu 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1618106&r1=1618105&r2=1618106&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Fri Aug 15 06:00:31 2014
@@ -166,6 +166,8 @@ public class RMAppImpl implements RMApp,
         RMAppEventType.APP_REJECTED,
           new FinalSavingTransition(new AppRejectedTransition(),
             RMAppState.FAILED))
+    .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
+        RMAppEventType.MOVE, new RMAppMoveTransition())
 
      // Transitions from SUBMITTED state
     .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
@@ -243,7 +245,7 @@ public class RMAppImpl implements RMApp,
     // ignorable transitions
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
         EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
-          RMAppEventType.APP_NEW_SAVED))
+          RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE))
 
      // Transitions from FINISHING state
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
@@ -254,9 +256,9 @@ public class RMAppImpl implements RMApp,
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
       EnumSet.of(RMAppEventType.NODE_UPDATE,
-        // ignore Kill as we have already saved the final Finished state in
-        // state store.
-        RMAppEventType.KILL))
+        // ignore Kill/Move as we have already saved the final Finished state
+        // in state store.
+        RMAppEventType.KILL, RMAppEventType.MOVE))
 
      // Transitions from KILLING state
     .addTransition(RMAppState.KILLING, RMAppState.KILLING, 
@@ -274,7 +276,7 @@ public class RMAppImpl implements RMApp,
             RMAppEventType.ATTEMPT_FINISHED,
             RMAppEventType.ATTEMPT_FAILED,
             RMAppEventType.APP_UPDATE_SAVED,
-            RMAppEventType.KILL))
+            RMAppEventType.KILL, RMAppEventType.MOVE))
 
      // Transitions from FINISHED state
      // ignorable transitions
@@ -286,7 +288,7 @@ public class RMAppImpl implements RMApp,
             RMAppEventType.NODE_UPDATE,
             RMAppEventType.ATTEMPT_UNREGISTERED,
             RMAppEventType.ATTEMPT_FINISHED,
-            RMAppEventType.KILL))
+            RMAppEventType.KILL, RMAppEventType.MOVE))
 
      // Transitions from FAILED state
      // ignorable transitions
@@ -294,7 +296,8 @@ public class RMAppImpl implements RMApp,
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
     .addTransition(RMAppState.FAILED, RMAppState.FAILED,
-        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
+        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
+            RMAppEventType.MOVE))
 
      // Transitions from KILLED state
      // ignorable transitions
@@ -307,7 +310,7 @@ public class RMAppImpl implements RMApp,
         EnumSet.of(RMAppEventType.APP_ACCEPTED,
             RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
             RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
-            RMAppEventType.NODE_UPDATE))
+            RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE))
 
      .installTopology();
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1618106&r1=1618105&r2=1618106&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Fri Aug 15 06:00:31 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
@@ -48,6 +50,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.util.concurrent.SettableFuture;
+
 @SuppressWarnings("unchecked")
 public abstract class AbstractYarnScheduler
     <T extends SchedulerApplicationAttempt, N extends SchedulerNode>
@@ -317,4 +321,31 @@ public abstract class AbstractYarnSchedu
   public SchedulerNode getSchedulerNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }
+
+  @Override
+  public synchronized void moveAllApps(String sourceQueue, String destQueue)
+      throws YarnException {
+    // check if destination queue is a valid leaf queue
+    try {
+      getQueueInfo(destQueue, false, false);
+    } catch (IOException e) {
+      LOG.warn(e);
+      throw new YarnException(e);
+    }
+    // check if source queue is a valid
+    List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
+    if (apps == null) {
+      String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist";
+      LOG.warn(errMsg);
+      throw new YarnException(errMsg);
+    }
+    // generate move events for each pending/running app
+    for (ApplicationAttemptId app : apps) {
+      SettableFuture<Object> future = SettableFuture.create();
+      this.rmContext
+          .getDispatcher()
+          .getEventHandler()
+          .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1618106&r1=1618105&r2=1618106&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Fri Aug 15 06:00:31 2014
@@ -54,7 +54,7 @@ public class AppSchedulingInfo {
   private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
   private final ApplicationAttemptId applicationAttemptId;
   final ApplicationId applicationId;
-  private final String queueName;
+  private String queueName;
   Queue queue;
   final String user;
   // TODO making containerIdCounter long
@@ -410,6 +410,7 @@ public class AppSchedulingInfo {
     activeUsersManager = newQueue.getActiveUsersManager();
     activeUsersManager.activateApplication(user, applicationId);
     this.queue = newQueue;
+    this.queueName = newQueue.getQueueName();
   }
 
   synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1618106&r1=1618105&r2=1618106&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Fri Aug 15 06:00:31 2014
@@ -202,4 +202,14 @@ public interface YarnScheduler extends E
   @Evolving
   public String moveApplication(ApplicationId appId, String newQueue)
       throws YarnException;
+
+  /**
+   * Completely drain sourceQueue of applications, by moving all of them to
+   * destQueue.
+   *
+   * @param sourceQueue
+   * @param destQueue
+   * @throws YarnException
+   */
+  void moveAllApps(String sourceQueue, String destQueue) throws YarnException;
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1618106&r1=1618105&r2=1618106&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Fri Aug 15 06:00:31 2014
@@ -238,4 +238,22 @@ extends org.apache.hadoop.yarn.server.re
    * @param apps the collection to add the applications to
    */
   public void collectSchedulerApplications(Collection<ApplicationAttemptId> apps);
+
+  /**
+  * Detach a container from this queue
+  * @param clusterResource the current cluster resource
+  * @param application application to which the container was assigned
+  * @param container the container to detach
+  */
+  public void detachContainer(Resource clusterResource,
+               FiCaSchedulerApp application, RMContainer container);
+
+  /**
+   * Attach a container to this queue
+   * @param clusterResource the current cluster resource
+   * @param application application to which the container was assigned
+   * @param container the container to attach
+   */
+  public void attachContainer(Resource clusterResource,
+               FiCaSchedulerApp application, RMContainer container);
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1618106&r1=1618105&r2=1618106&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Fri Aug 15 06:00:31 2014
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@@ -547,6 +548,8 @@ public class CapacityScheduler extends
           .handle(new RMAppRejectedEvent(applicationId, ace.toString()));
       return;
     }
+    // update the metrics
+    queue.getMetrics().submitApp(user);
     SchedulerApplication<FiCaSchedulerApp> application =
         new SchedulerApplication<FiCaSchedulerApp>(queue, user);
     applications.put(applicationId, application);
@@ -1131,4 +1134,59 @@ public class CapacityScheduler extends
       throw new IOException(e);
     }
   }
+
+  @Override
+  public synchronized String moveApplication(ApplicationId appId,
+      String targetQueueName) throws YarnException {
+    FiCaSchedulerApp app =
+        getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0));
+    String sourceQueueName = app.getQueue().getQueueName();
+    LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
+    LeafQueue dest = getAndCheckLeafQueue(targetQueueName);
+    // Validation check - ACLs, submission limits for user & queue
+    String user = app.getUser();
+    try {
+      dest.submitApplication(appId, user, targetQueueName);
+    } catch (AccessControlException e) {
+      throw new YarnException(e);
+    }
+    // Move all live containers
+    for (RMContainer rmContainer : app.getLiveContainers()) {
+      source.detachContainer(clusterResource, app, rmContainer);
+      // attach the Container to another queue
+      dest.attachContainer(clusterResource, app, rmContainer);
+    }
+    // Detach the application..
+    source.finishApplicationAttempt(app, sourceQueueName);
+    source.getParent().finishApplication(appId, app.getUser());
+    // Finish app & update metrics
+    app.move(dest);
+    // Submit to a new queue
+    dest.submitApplicationAttempt(app, user);
+    applications.get(appId).setQueue(dest);
+    LOG.info("App: " + app.getApplicationId() + " successfully moved from "
+        + sourceQueueName + " to: " + targetQueueName);
+    return targetQueueName;
+  }
+
+  /**
+   * Check that the String provided in input is the name of an existing,
+   * LeafQueue, if successful returns the queue.
+   *
+   * @param queue
+   * @return the LeafQueue
+   * @throws YarnException
+   */
+  private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
+    CSQueue ret = this.getQueue(queue);
+    if (ret == null) {
+      throw new YarnException("The specified Queue: " + queue
+          + " doesn't exist");
+    }
+    if (!(ret instanceof LeafQueue)) {
+      throw new YarnException("The specified Queue: " + queue
+          + " is not a Leaf Queue. Move is supported only for Leaf Queues.");
+    }
+    return (LeafQueue) ret;
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1618106&r1=1618105&r2=1618106&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Fri Aug 15 06:00:31 2014
@@ -643,7 +643,10 @@ public class LeafQueue implements CSQueu
       addApplicationAttempt(application, user);
     }
 
-    metrics.submitAppAttempt(userName);
+    // We don't want to update metrics for move app
+    if (application.isPending()) {
+      metrics.submitAppAttempt(userName);
+    }
     getParent().submitApplicationAttempt(application, userName);
   }
 
@@ -701,7 +704,6 @@ public class LeafQueue implements CSQueu
       throw ace;
     }
 
-    metrics.submitApp(userName);
   }
 
   private synchronized void activateApplications() {
@@ -1620,8 +1622,43 @@ public class LeafQueue implements CSQueu
   @Override
   public void collectSchedulerApplications(
       Collection<ApplicationAttemptId> apps) {
+    for (FiCaSchedulerApp pendingApp : pendingApplications) {
+      apps.add(pendingApp.getApplicationAttemptId());
+    }
     for (FiCaSchedulerApp app : activeApplications) {
       apps.add(app.getApplicationAttemptId());
     }
   }
+
+  @Override
+  public void attachContainer(Resource clusterResource,
+      FiCaSchedulerApp application, RMContainer rmContainer) {
+    if (application != null) {
+      allocateResource(clusterResource, application, rmContainer.getContainer()
+          .getResource());
+      LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+          + " resource=" + rmContainer.getContainer().getResource()
+          + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
+          + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+          + usedResources + " cluster=" + clusterResource);
+      // Inform the parent queue
+      getParent().attachContainer(clusterResource, application, rmContainer);
+    }
+  }
+
+  @Override
+  public void detachContainer(Resource clusterResource,
+      FiCaSchedulerApp application, RMContainer rmContainer) {
+    if (application != null) {
+      releaseResource(clusterResource, application, rmContainer.getContainer()
+          .getResource());
+      LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+          + " resource=" + rmContainer.getContainer().getResource()
+          + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
+          + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+          + usedResources + " cluster=" + clusterResource);
+      // Inform the parent queue
+      getParent().detachContainer(clusterResource, application, rmContainer);
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1618106&r1=1618105&r2=1618106&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Fri Aug 15 06:00:31 2014
@@ -791,4 +791,37 @@ public class ParentQueue implements CSQu
       queue.collectSchedulerApplications(apps);
     }
   }
+
+  @Override
+  public void attachContainer(Resource clusterResource,
+      FiCaSchedulerApp application, RMContainer rmContainer) {
+    if (application != null) {
+      allocateResource(clusterResource, rmContainer.getContainer()
+          .getResource());
+      LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+          + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+          + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
+          + clusterResource);
+      // Inform the parent
+      if (parent != null) {
+        parent.attachContainer(clusterResource, application, rmContainer);
+      }
+    }
+  }
+
+  @Override
+  public void detachContainer(Resource clusterResource,
+      FiCaSchedulerApp application, RMContainer rmContainer) {
+    if (application != null) {
+      releaseResource(clusterResource, rmContainer.getContainer().getResource());
+      LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
+          + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+          + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
+          + clusterResource);
+      // Inform the parent
+      if (parent != null) {
+        parent.detachContainer(clusterResource, application, rmContainer);
+      }
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1618106&r1=1618105&r2=1618106&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Fri Aug 15 06:00:31 2014
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -68,6 +69,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@@ -100,6 +102,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
@@ -1014,4 +1020,782 @@ public class TestCapacityScheduler {
     // Now with updated ResourceRequest, a container is allocated for AM.
     Assert.assertTrue(containers.size() == 1);
   }
+
+  private MockRM setUpMove() {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    return rm;
+  }
+
+  @Test
+  public void testMoveAppBasic() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+    String queue =
+        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals("a1"));
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    // now move the app
+    scheduler.moveApplication(app.getApplicationId(), "b1");
+
+    // check postconditions
+    appsInB1 = scheduler.getAppsInQueue("b1");
+    assertEquals(1, appsInB1.size());
+    queue =
+        scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
+            .getQueueName();
+    System.out.println(queue);
+    Assert.assertTrue(queue.equals("b1"));
+
+    appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInB.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertTrue(appsInA1.isEmpty());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.isEmpty());
+
+    rm.stop();
+  }
+
+  @Test
+  public void testMoveAppSameParent() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+    String queue =
+        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals("a1"));
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    List<ApplicationAttemptId> appsInA2 = scheduler.getAppsInQueue("a2");
+    assertTrue(appsInA2.isEmpty());
+
+    // now move the app
+    scheduler.moveApplication(app.getApplicationId(), "a2");
+
+    // check postconditions
+    appsInA2 = scheduler.getAppsInQueue("a2");
+    assertEquals(1, appsInA2.size());
+    queue =
+        scheduler.getApplicationAttempt(appsInA2.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals("a2"));
+
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertTrue(appsInA1.isEmpty());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertEquals(1, appsInA.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    rm.stop();
+  }
+
+  @Test
+  public void testMoveAppForMoveToQueueWithFreeCap() throws Exception {
+
+    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+    // Register node1
+    String host_0 = "host_0";
+    NodeManager nm_0 =
+        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(4 * GB, 1));
+
+    // Register node2
+    String host_1 = "host_1";
+    NodeManager nm_1 =
+        registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(2 * GB, 1));
+
+    // ResourceRequest priorities
+    Priority priority_0 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(0);
+    Priority priority_1 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(1);
+
+    // Submit application_0
+    Application application_0 =
+        new Application("user_0", "a1", resourceManager);
+    application_0.submit(); // app + app attempt event sent to scheduler
+
+    application_0.addNodeManager(host_0, 1234, nm_0);
+    application_0.addNodeManager(host_1, 1234, nm_1);
+
+    Resource capability_0_0 = Resources.createResource(1 * GB, 1);
+    application_0.addResourceRequestSpec(priority_1, capability_0_0);
+
+    Resource capability_0_1 = Resources.createResource(2 * GB, 1);
+    application_0.addResourceRequestSpec(priority_0, capability_0_1);
+
+    Task task_0_0 =
+        new Task(application_0, priority_1, new String[] { host_0, host_1 });
+    application_0.addTask(task_0_0);
+
+    // Submit application_1
+    Application application_1 =
+        new Application("user_1", "b2", resourceManager);
+    application_1.submit(); // app + app attempt event sent to scheduler
+
+    application_1.addNodeManager(host_0, 1234, nm_0);
+    application_1.addNodeManager(host_1, 1234, nm_1);
+
+    Resource capability_1_0 = Resources.createResource(1 * GB, 1);
+    application_1.addResourceRequestSpec(priority_1, capability_1_0);
+
+    Resource capability_1_1 = Resources.createResource(2 * GB, 1);
+    application_1.addResourceRequestSpec(priority_0, capability_1_1);
+
+    Task task_1_0 =
+        new Task(application_1, priority_1, new String[] { host_0, host_1 });
+    application_1.addTask(task_1_0);
+
+    // Send resource requests to the scheduler
+    application_0.schedule(); // allocate
+    application_1.schedule(); // allocate
+
+    // task_0_0 task_1_0 allocated, used=2G
+    nodeUpdate(nm_0);
+
+    // nothing allocated
+    nodeUpdate(nm_1);
+
+    // Get allocations from the scheduler
+    application_0.schedule(); // task_0_0
+    checkApplicationResourceUsage(1 * GB, application_0);
+
+    application_1.schedule(); // task_1_0
+    checkApplicationResourceUsage(1 * GB, application_1);
+
+    checkNodeResourceUsage(2 * GB, nm_0); // task_0_0 (1G) and task_1_0 (1G) 2G
+                                          // available
+    checkNodeResourceUsage(0 * GB, nm_1); // no tasks, 2G available
+
+    // move app from a1(30% cap of total 10.5% cap) to b1(79,2% cap of 89,5%
+    // total cap)
+    scheduler.moveApplication(application_0.getApplicationId(), "b1");
+
+    // 2GB 1C
+    Task task_1_1 =
+        new Task(application_1, priority_0,
+            new String[] { ResourceRequest.ANY });
+    application_1.addTask(task_1_1);
+
+    application_1.schedule();
+
+    // 2GB 1C
+    Task task_0_1 =
+        new Task(application_0, priority_0, new String[] { host_0, host_1 });
+    application_0.addTask(task_0_1);
+
+    application_0.schedule();
+
+    // prev 2G used free 2G
+    nodeUpdate(nm_0);
+
+    // prev 0G used free 2G
+    nodeUpdate(nm_1);
+
+    // Get allocations from the scheduler
+    application_1.schedule();
+    checkApplicationResourceUsage(3 * GB, application_1);
+
+    // Get allocations from the scheduler
+    application_0.schedule();
+    checkApplicationResourceUsage(3 * GB, application_0);
+
+    checkNodeResourceUsage(4 * GB, nm_0);
+    checkNodeResourceUsage(2 * GB, nm_1);
+
+  }
+
+  @Test
+  public void testMoveAppSuccess() throws Exception {
+
+    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+    // Register node1
+    String host_0 = "host_0";
+    NodeManager nm_0 =
+        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(5 * GB, 1));
+
+    // Register node2
+    String host_1 = "host_1";
+    NodeManager nm_1 =
+        registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(5 * GB, 1));
+
+    // ResourceRequest priorities
+    Priority priority_0 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(0);
+    Priority priority_1 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(1);
+
+    // Submit application_0
+    Application application_0 =
+        new Application("user_0", "a1", resourceManager);
+    application_0.submit(); // app + app attempt event sent to scheduler
+
+    application_0.addNodeManager(host_0, 1234, nm_0);
+    application_0.addNodeManager(host_1, 1234, nm_1);
+
+    Resource capability_0_0 = Resources.createResource(3 * GB, 1);
+    application_0.addResourceRequestSpec(priority_1, capability_0_0);
+
+    Resource capability_0_1 = Resources.createResource(2 * GB, 1);
+    application_0.addResourceRequestSpec(priority_0, capability_0_1);
+
+    Task task_0_0 =
+        new Task(application_0, priority_1, new String[] { host_0, host_1 });
+    application_0.addTask(task_0_0);
+
+    // Submit application_1
+    Application application_1 =
+        new Application("user_1", "b2", resourceManager);
+    application_1.submit(); // app + app attempt event sent to scheduler
+
+    application_1.addNodeManager(host_0, 1234, nm_0);
+    application_1.addNodeManager(host_1, 1234, nm_1);
+
+    Resource capability_1_0 = Resources.createResource(1 * GB, 1);
+    application_1.addResourceRequestSpec(priority_1, capability_1_0);
+
+    Resource capability_1_1 = Resources.createResource(2 * GB, 1);
+    application_1.addResourceRequestSpec(priority_0, capability_1_1);
+
+    Task task_1_0 =
+        new Task(application_1, priority_1, new String[] { host_0, host_1 });
+    application_1.addTask(task_1_0);
+
+    // Send resource requests to the scheduler
+    application_0.schedule(); // allocate
+    application_1.schedule(); // allocate
+
+    // b2 can only run 1 app at a time
+    scheduler.moveApplication(application_0.getApplicationId(), "b2");
+
+    nodeUpdate(nm_0);
+
+    nodeUpdate(nm_1);
+
+    // Get allocations from the scheduler
+    application_0.schedule(); // task_0_0
+    checkApplicationResourceUsage(0 * GB, application_0);
+
+    application_1.schedule(); // task_1_0
+    checkApplicationResourceUsage(1 * GB, application_1);
+
+    // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
+    // not scheduled
+    checkNodeResourceUsage(1 * GB, nm_0);
+    checkNodeResourceUsage(0 * GB, nm_1);
+
+    // lets move application_0 to a queue where it can run
+    scheduler.moveApplication(application_0.getApplicationId(), "a2");
+    application_0.schedule();
+
+    nodeUpdate(nm_1);
+
+    // Get allocations from the scheduler
+    application_0.schedule(); // task_0_0
+    checkApplicationResourceUsage(3 * GB, application_0);
+
+    checkNodeResourceUsage(1 * GB, nm_0);
+    checkNodeResourceUsage(3 * GB, nm_1);
+
+  }
+
+  @Test(expected = YarnException.class)
+  public void testMoveAppViolateQueueState() throws Exception {
+
+    resourceManager = new ResourceManager();
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    StringBuilder qState = new StringBuilder();
+    qState.append(CapacitySchedulerConfiguration.PREFIX).append(B)
+        .append(CapacitySchedulerConfiguration.DOT)
+        .append(CapacitySchedulerConfiguration.STATE);
+    csConf.set(qState.toString(), QueueState.STOPPED.name());
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    resourceManager.init(conf);
+    resourceManager.getRMContext().getContainerTokenSecretManager()
+        .rollMasterKey();
+    resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
+    ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
+    mockContext = mock(RMContext.class);
+    when(mockContext.getConfigurationProvider()).thenReturn(
+        new LocalConfigurationProvider());
+
+    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+    // Register node1
+    String host_0 = "host_0";
+    NodeManager nm_0 =
+        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(6 * GB, 1));
+
+    // ResourceRequest priorities
+    Priority priority_0 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(0);
+    Priority priority_1 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(1);
+
+    // Submit application_0
+    Application application_0 =
+        new Application("user_0", "a1", resourceManager);
+    application_0.submit(); // app + app attempt event sent to scheduler
+
+    application_0.addNodeManager(host_0, 1234, nm_0);
+
+    Resource capability_0_0 = Resources.createResource(3 * GB, 1);
+    application_0.addResourceRequestSpec(priority_1, capability_0_0);
+
+    Resource capability_0_1 = Resources.createResource(2 * GB, 1);
+    application_0.addResourceRequestSpec(priority_0, capability_0_1);
+
+    Task task_0_0 =
+        new Task(application_0, priority_1, new String[] { host_0 });
+    application_0.addTask(task_0_0);
+
+    // Send resource requests to the scheduler
+    application_0.schedule(); // allocate
+
+    // task_0_0 allocated
+    nodeUpdate(nm_0);
+
+    // Get allocations from the scheduler
+    application_0.schedule(); // task_0_0
+    checkApplicationResourceUsage(3 * GB, application_0);
+
+    checkNodeResourceUsage(3 * GB, nm_0);
+    // b2 queue contains 3GB consumption app,
+    // add another 3GB will hit max capacity limit on queue b
+    scheduler.moveApplication(application_0.getApplicationId(), "b1");
+
+  }
+
+  @Test
+  public void testMoveAppQueueMetricsCheck() throws Exception {
+    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+    // Register node1
+    String host_0 = "host_0";
+    NodeManager nm_0 =
+        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(5 * GB, 1));
+
+    // Register node2
+    String host_1 = "host_1";
+    NodeManager nm_1 =
+        registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(5 * GB, 1));
+
+    // ResourceRequest priorities
+    Priority priority_0 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(0);
+    Priority priority_1 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(1);
+
+    // Submit application_0
+    Application application_0 =
+        new Application("user_0", "a1", resourceManager);
+    application_0.submit(); // app + app attempt event sent to scheduler
+
+    application_0.addNodeManager(host_0, 1234, nm_0);
+    application_0.addNodeManager(host_1, 1234, nm_1);
+
+    Resource capability_0_0 = Resources.createResource(3 * GB, 1);
+    application_0.addResourceRequestSpec(priority_1, capability_0_0);
+
+    Resource capability_0_1 = Resources.createResource(2 * GB, 1);
+    application_0.addResourceRequestSpec(priority_0, capability_0_1);
+
+    Task task_0_0 =
+        new Task(application_0, priority_1, new String[] { host_0, host_1 });
+    application_0.addTask(task_0_0);
+
+    // Submit application_1
+    Application application_1 =
+        new Application("user_1", "b2", resourceManager);
+    application_1.submit(); // app + app attempt event sent to scheduler
+
+    application_1.addNodeManager(host_0, 1234, nm_0);
+    application_1.addNodeManager(host_1, 1234, nm_1);
+
+    Resource capability_1_0 = Resources.createResource(1 * GB, 1);
+    application_1.addResourceRequestSpec(priority_1, capability_1_0);
+
+    Resource capability_1_1 = Resources.createResource(2 * GB, 1);
+    application_1.addResourceRequestSpec(priority_0, capability_1_1);
+
+    Task task_1_0 =
+        new Task(application_1, priority_1, new String[] { host_0, host_1 });
+    application_1.addTask(task_1_0);
+
+    // Send resource requests to the scheduler
+    application_0.schedule(); // allocate
+    application_1.schedule(); // allocate
+
+    nodeUpdate(nm_0);
+
+    nodeUpdate(nm_1);
+
+    CapacityScheduler cs =
+        (CapacityScheduler) resourceManager.getResourceScheduler();
+    CSQueue origRootQ = cs.getRootQueue();
+    CapacitySchedulerInfo oldInfo = new CapacitySchedulerInfo(origRootQ);
+    int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues());
+    int origNumAppsRoot = origRootQ.getNumApplications();
+
+    scheduler.moveApplication(application_0.getApplicationId(), "a2");
+
+    CSQueue newRootQ = cs.getRootQueue();
+    int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues());
+    int newNumAppsRoot = newRootQ.getNumApplications();
+    CapacitySchedulerInfo newInfo = new CapacitySchedulerInfo(newRootQ);
+    CapacitySchedulerLeafQueueInfo origOldA1 =
+        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues());
+    CapacitySchedulerLeafQueueInfo origNewA1 =
+        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", newInfo.getQueues());
+    CapacitySchedulerLeafQueueInfo targetOldA2 =
+        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", oldInfo.getQueues());
+    CapacitySchedulerLeafQueueInfo targetNewA2 =
+        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", newInfo.getQueues());
+    // originally submitted here
+    assertEquals(1, origOldA1.getNumApplications());
+    assertEquals(1, origNumAppsA);
+    assertEquals(2, origNumAppsRoot);
+    // after the move
+    assertEquals(0, origNewA1.getNumApplications());
+    assertEquals(1, newNumAppsA);
+    assertEquals(2, newNumAppsRoot);
+    // original consumption on a1
+    assertEquals(3 * GB, origOldA1.getResourcesUsed().getMemory());
+    assertEquals(1, origOldA1.getResourcesUsed().getvCores());
+    assertEquals(0, origNewA1.getResourcesUsed().getMemory()); // after the move
+    assertEquals(0, origNewA1.getResourcesUsed().getvCores()); // after the move
+    // app moved here with live containers
+    assertEquals(3 * GB, targetNewA2.getResourcesUsed().getMemory());
+    assertEquals(1, targetNewA2.getResourcesUsed().getvCores());
+    // it was empty before the move
+    assertEquals(0, targetOldA2.getNumApplications());
+    assertEquals(0, targetOldA2.getResourcesUsed().getMemory());
+    assertEquals(0, targetOldA2.getResourcesUsed().getvCores());
+    // after the app moved here
+    assertEquals(1, targetNewA2.getNumApplications());
+    // 1 container on original queue before move
+    assertEquals(1, origOldA1.getNumContainers());
+    // after the move the resource released
+    assertEquals(0, origNewA1.getNumContainers());
+    // and moved to the new queue
+    assertEquals(1, targetNewA2.getNumContainers());
+    // which originally didn't have any
+    assertEquals(0, targetOldA2.getNumContainers());
+    // 1 user with 3GB
+    assertEquals(3 * GB, origOldA1.getUsers().getUsersList().get(0)
+        .getResourcesUsed().getMemory());
+    // 1 user with 1 core
+    assertEquals(1, origOldA1.getUsers().getUsersList().get(0)
+        .getResourcesUsed().getvCores());
+    // user ha no more running app in the orig queue
+    assertEquals(0, origNewA1.getUsers().getUsersList().size());
+    // 1 user with 3GB
+    assertEquals(3 * GB, targetNewA2.getUsers().getUsersList().get(0)
+        .getResourcesUsed().getMemory());
+    // 1 user with 1 core
+    assertEquals(1, targetNewA2.getUsers().getUsersList().get(0)
+        .getResourcesUsed().getvCores());
+
+    // Get allocations from the scheduler
+    application_0.schedule(); // task_0_0
+    checkApplicationResourceUsage(3 * GB, application_0);
+
+    application_1.schedule(); // task_1_0
+    checkApplicationResourceUsage(1 * GB, application_1);
+
+    // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
+    // not scheduled
+    checkNodeResourceUsage(4 * GB, nm_0);
+    checkNodeResourceUsage(0 * GB, nm_1);
+
+  }
+
+  private int getNumAppsInQueue(String name, List<CSQueue> queues) {
+    for (CSQueue queue : queues) {
+      if (queue.getQueueName().equals(name)) {
+        return queue.getNumApplications();
+      }
+    }
+    return -1;
+  }
+
+  private CapacitySchedulerQueueInfo getQueueInfo(String name,
+      CapacitySchedulerQueueInfoList info) {
+    if (info != null) {
+      for (CapacitySchedulerQueueInfo queueInfo : info.getQueueInfoList()) {
+        if (queueInfo.getQueueName().equals(name)) {
+          return queueInfo;
+        } else {
+          CapacitySchedulerQueueInfo result =
+              getQueueInfo(name, queueInfo.getQueues());
+          if (result == null) {
+            continue;
+          }
+          return result;
+        }
+      }
+    }
+    return null;
+  }
+
+  @Test
+  public void testMoveAllApps() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+    String queue =
+        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals("a1"));
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    // now move the app
+    scheduler.moveAllApps("a1", "b1");
+
+    // check postconditions
+    Thread.sleep(1000);
+    appsInB1 = scheduler.getAppsInQueue("b1");
+    assertEquals(1, appsInB1.size());
+    queue =
+        scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals("b1"));
+
+    appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInB.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertTrue(appsInA1.isEmpty());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.isEmpty());
+
+    rm.stop();
+  }
+
+  @Test
+  public void testMoveAllAppsInvalidDestination() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    // now move the app
+    try {
+      scheduler.moveAllApps("a1", "DOES_NOT_EXIST");
+      Assert.fail();
+    } catch (YarnException e) {
+      // expected
+    }
+
+    // check postconditions, app should still be in a1
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    rm.stop();
+  }
+
+  @Test
+  public void testMoveAllAppsInvalidSource() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    // now move the app
+    try {
+      scheduler.moveAllApps("DOES_NOT_EXIST", "b1");
+      Assert.fail();
+    } catch (YarnException e) {
+      // expected
+    }
+
+    // check postconditions, app should still be in a1
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    rm.stop();
+  }
+
 }



Mime
View raw message