hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1596754 [2/2] - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capa...
Date Thu, 22 May 2014 05:33:01 GMT
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1596754&r1=1596753&r2=1596754&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu May 22 05:33:00 2014
@@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -53,7 +52,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -76,10 +74,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -122,11 +118,11 @@ import com.google.common.annotations.Vis
 @LimitedPrivate("yarn")
 @Unstable
 @SuppressWarnings("unchecked")
-public class FairScheduler extends AbstractYarnScheduler {
+public class FairScheduler extends
+    AbstractYarnScheduler<FSSchedulerApp, FSSchedulerNode> {
   private boolean initialized;
   private FairSchedulerConfiguration conf;
-  private Resource minimumAllocation;
-  private Resource maximumAllocation;
+
   private Resource incrAllocation;
   private QueueManager queueMgr;
   private Clock clock;
@@ -152,14 +148,6 @@ public class FairScheduler extends Abstr
   // Time we last ran preemptTasksIfNecessary
   private long lastPreemptCheckTime;
 
-  // Nodes in the cluster, indexed by NodeId
-  private Map<NodeId, FSSchedulerNode> nodes = 
-      new ConcurrentHashMap<NodeId, FSSchedulerNode>();
-
-  // Aggregate capacity of the cluster
-  private Resource clusterCapacity = 
-      RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
-
   // How often tasks are preempted 
   protected long preemptionInterval; 
   
@@ -246,23 +234,6 @@ public class FairScheduler extends Abstr
     return queueMgr;
   }
 
-  @Override
-  public RMContainer getRMContainer(ContainerId containerId) {
-    FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
-    return (attempt == null) ? null : attempt.getRMContainer(containerId);
-  }
-
-  private FSSchedulerApp getCurrentAttemptForContainer(
-      ContainerId containerId) {
-    SchedulerApplication app =
-        applications.get(containerId.getApplicationAttemptId()
-          .getApplicationId());
-    if (app != null) {
-      return (FSSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
-  }
-
   /**
    * A runnable which calls {@link FairScheduler#update()} every
    * <code>UPDATE_INTERVAL</code> milliseconds.
@@ -294,7 +265,7 @@ public class FairScheduler extends Abstr
     // Recursively update demands for all queues
     rootQueue.updateDemand();
 
-    rootQueue.setFairShare(clusterCapacity);
+    rootQueue.setFairShare(clusterResource);
     // Recursively compute fair shares for all queues
     // and update metrics
     rootQueue.recomputeShares();
@@ -322,9 +293,9 @@ public class FairScheduler extends Abstr
    * Is a queue below its min share for the given task type?
    */
   boolean isStarvedForMinShare(FSLeafQueue sched) {
-    Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+    Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterResource,
       sched.getMinShare(), sched.getDemand());
-    return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
+    return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
         sched.getResourceUsage(), desiredShare);
   }
 
@@ -333,9 +304,9 @@ public class FairScheduler extends Abstr
    * defined as being below half its fair share.
    */
   boolean isStarvedForFairShare(FSLeafQueue sched) {
-    Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+    Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterResource,
         Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
-    return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
+    return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
         sched.getResourceUsage(), desiredFairShare);
   }
 
@@ -362,7 +333,7 @@ public class FairScheduler extends Abstr
     for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
       resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
     }
-    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt,
+    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
         Resources.none())) {
       preemptResources(queueMgr.getLeafQueues(), resToPreempt);
     }
@@ -389,7 +360,7 @@ public class FairScheduler extends Abstr
     // Collect running containers from over-scheduled queues
     List<RMContainer> runningContainers = new ArrayList<RMContainer>();
     for (FSLeafQueue sched : scheds) {
-      if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+      if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
           sched.getResourceUsage(), sched.getFairShare())) {
         for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
           for (RMContainer c : as.getApp().getLiveContainers()) {
@@ -421,7 +392,7 @@ public class FairScheduler extends Abstr
     while (warnedIter.hasNext()) {
       RMContainer container = warnedIter.next();
       if (container.getState() == RMContainerState.RUNNING &&
-          Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+          Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
               toPreempt, Resources.none())) {
         warnOrKillContainer(container, apps.get(container), queues.get(container));
         preemptedThisRound.add(container);
@@ -435,12 +406,12 @@ public class FairScheduler extends Abstr
     // sure we don't preempt too many from any queue
     Iterator<RMContainer> runningIter = runningContainers.iterator();
     while (runningIter.hasNext() &&
-        Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+        Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
             toPreempt, Resources.none())) {
       RMContainer container = runningIter.next();
       FSLeafQueue sched = queues.get(container);
       if (!preemptedThisRound.contains(container) &&
-          Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+          Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
               sched.getResourceUsage(), sched.getFairShare())) {
         warnOrKillContainer(container, apps.get(container), sched);
         
@@ -496,20 +467,20 @@ public class FairScheduler extends Abstr
     Resource resDueToMinShare = Resources.none();
     Resource resDueToFairShare = Resources.none();
     if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
-      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
           sched.getMinShare(), sched.getDemand());
-      resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
+      resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
           Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
     }
     if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
-      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
           sched.getFairShare(), sched.getDemand());
-      resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
+      resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
           Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
     }
-    Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
+    Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource,
         resDueToMinShare, resDueToFairShare);
-    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
         resToPreempt, Resources.none())) {
       String message = "Should preempt " + resToPreempt + " res for queue "
           + sched.getName() + ": resDueToMinShare = " + resDueToMinShare
@@ -540,18 +511,12 @@ public class FairScheduler extends Abstr
     return resourceWeights;
   }
 
-  @Override
-  public Resource getMinimumResourceCapability() {
-    return minimumAllocation;
-  }
-
   public Resource getIncrementResourceCapability() {
     return incrAllocation;
   }
 
-  @Override
-  public Resource getMaximumResourceCapability() {
-    return maximumAllocation;
+  private FSSchedulerNode getFSSchedulerNode(NodeId nodeId) {
+    return nodes.get(nodeId);
   }
 
   public double getNodeLocalityThreshold() {
@@ -578,10 +543,6 @@ public class FairScheduler extends Abstr
     return continuousSchedulingSleepMs;
   }
 
-  public Resource getClusterCapacity() {
-    return clusterCapacity;
-  }
-
   public synchronized Clock getClock() {
     return clock;
   }
@@ -629,8 +590,8 @@ public class FairScheduler extends Abstr
       return;
     }
   
-    SchedulerApplication application =
-        new SchedulerApplication(queue, user);
+    SchedulerApplication<FSSchedulerApp> application =
+        new SchedulerApplication<FSSchedulerApp>(queue, user);
     applications.put(applicationId, application);
     queue.getMetrics().submitApp(user);
 
@@ -647,7 +608,7 @@ public class FairScheduler extends Abstr
   protected synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt) {
-    SchedulerApplication application =
+    SchedulerApplication<FSSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     String user = application.getUser();
     FSLeafQueue queue = (FSLeafQueue) application.getQueue();
@@ -720,7 +681,8 @@ public class FairScheduler extends Abstr
 
   private synchronized void removeApplication(ApplicationId applicationId,
       RMAppState finalState) {
-    SchedulerApplication application = applications.get(applicationId);
+    SchedulerApplication<FSSchedulerApp> application =
+        applications.get(applicationId);
     if (application == null){
       LOG.warn("Couldn't find application " + applicationId);
       return;
@@ -734,7 +696,7 @@ public class FairScheduler extends Abstr
       RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
     LOG.info("Application " + applicationAttemptId + " is done." +
         " finalState=" + rmAppAttemptFinalState);
-    SchedulerApplication application =
+    SchedulerApplication<FSSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
 
@@ -809,7 +771,7 @@ public class FairScheduler extends Abstr
     }
 
     // Get the node on which the container was allocated
-    FSSchedulerNode node = nodes.get(container.getNodeId());
+    FSSchedulerNode node = getFSSchedulerNode(container.getNodeId());
 
     if (rmContainer.getState() == RMContainerState.RESERVED) {
       application.unreserve(node, rmContainer.getReservedPriority());
@@ -827,20 +789,20 @@ public class FairScheduler extends Abstr
 
   private synchronized void addNode(RMNode node) {
     nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
-    Resources.addTo(clusterCapacity, node.getTotalCapability());
+    Resources.addTo(clusterResource, node.getTotalCapability());
     updateRootQueueMetrics();
 
     LOG.info("Added node " + node.getNodeAddress() +
-        " cluster capacity: " + clusterCapacity);
+        " cluster capacity: " + clusterResource);
   }
 
   private synchronized void removeNode(RMNode rmNode) {
-    FSSchedulerNode node = nodes.get(rmNode.getNodeID());
+    FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID());
     // This can occur when an UNHEALTHY node reconnects
     if (node == null) {
       return;
     }
-    Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
+    Resources.subtractFrom(clusterResource, rmNode.getTotalCapability());
     updateRootQueueMetrics();
 
     // Remove running containers
@@ -865,7 +827,7 @@ public class FairScheduler extends Abstr
 
     nodes.remove(rmNode.getNodeID());
     LOG.info("Removed node " + rmNode.getNodeAddress() +
-        " cluster capacity: " + clusterCapacity);
+        " cluster capacity: " + clusterResource);
   }
 
   @Override
@@ -882,7 +844,7 @@ public class FairScheduler extends Abstr
 
     // Sanity check
     SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
-        clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation);
+        clusterResource, minimumAllocation, maximumAllocation, incrAllocation);
 
     // Release containers
     for (ContainerId releasedContainerId : release) {
@@ -961,13 +923,13 @@ public class FairScheduler extends Abstr
    */
   private synchronized void nodeUpdate(RMNode nm) {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
+      LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
     }
     eventLog.log("HEARTBEAT", nm.getHostName());
-    FSSchedulerNode node = nodes.get(nm.getNodeID());
+    FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
 
     // Update resource if any change
-    SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG);
+    SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
     
     List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
     List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
@@ -1012,7 +974,7 @@ public class FairScheduler extends Abstr
       // iterate all nodes
       for (NodeId nodeId : nodeIdList) {
         if (nodes.containsKey(nodeId)) {
-          FSSchedulerNode node = nodes.get(nodeId);
+          FSSchedulerNode node = getFSSchedulerNode(nodeId);
           try {
             if (Resources.fitsIn(minimumAllocation,
                     node.getAvailableResource())) {
@@ -1038,7 +1000,7 @@ public class FairScheduler extends Abstr
 
     @Override
     public int compare(NodeId n1, NodeId n2) {
-      return RESOURCE_CALCULATOR.compare(clusterCapacity,
+      return RESOURCE_CALCULATOR.compare(clusterResource,
               nodes.get(n2).getAvailableResource(),
               nodes.get(n1).getAvailableResource());
     }
@@ -1075,7 +1037,7 @@ public class FairScheduler extends Abstr
       int assignedContainers = 0;
       while (node.getReservedContainer() == null) {
         boolean assignedContainer = false;
-        if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+        if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
               queueMgr.getRootQueue().assignContainer(node),
               Resources.none())) {
           assignedContainers++;
@@ -1089,45 +1051,8 @@ public class FairScheduler extends Abstr
     updateRootQueueMetrics();
   }
 
-  @Override
-  public SchedulerNodeReport getNodeReport(NodeId nodeId) {
-    FSSchedulerNode node = nodes.get(nodeId);
-    return node == null ? null : new SchedulerNodeReport(node);
-  }
-  
   public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
-    SchedulerApplication app =
-        applications.get(appAttemptId.getApplicationId());
-    if (app != null) {
-      return (FSSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
-  }
-  
-  @Override
-  public SchedulerAppReport getSchedulerAppInfo(
-      ApplicationAttemptId appAttemptId) {
-    FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
-    if (attempt == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
-      }
-      return null;
-    }
-    return new SchedulerAppReport(attempt);
-  }
-  
-  @Override
-  public ApplicationResourceUsageReport getAppResourceUsageReport(
-      ApplicationAttemptId appAttemptId) {
-    FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
-    if (attempt == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
-      }
-      return null;
-    }
-    return attempt.getResourceUsageReport();
+    return (FSSchedulerApp) super.getApplicationAttempt(appAttemptId);
   }
   
   /**
@@ -1139,7 +1064,7 @@ public class FairScheduler extends Abstr
   private void updateRootQueueMetrics() {
     rootMetrics.setAvailableResourcesToQueue(
         Resources.subtract(
-            clusterCapacity, rootMetrics.getAllocatedResources()));
+            clusterResource, rootMetrics.getAllocatedResources()));
   }
 
   @Override
@@ -1258,7 +1183,7 @@ public class FairScheduler extends Abstr
       this.rmContext = rmContext;
       // This stores per-application scheduling information
       this.applications =
-          new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
+          new ConcurrentHashMap<ApplicationId, SchedulerApplication<FSSchedulerApp>>();
       this.eventLog = new FairSchedulerEventLog();
       eventLog.init(this.conf);
 
@@ -1365,7 +1290,7 @@ public class FairScheduler extends Abstr
       // if it does not already exist, so it can be displayed on the web UI.
       synchronized (FairScheduler.this) {
         allocConf = queueInfo;
-        allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity);
+        allocConf.getDefaultSchedulingPolicy().initialize(clusterResource);
         queueMgr.updateAllocationConfiguration(allocConf);
       }
     }
@@ -1385,7 +1310,7 @@ public class FairScheduler extends Abstr
   @Override
   public synchronized String moveApplication(ApplicationId appId,
       String queueName) throws YarnException {
-    SchedulerApplication app = applications.get(appId);
+    SchedulerApplication<FSSchedulerApp> app = applications.get(appId);
     if (app == null) {
       throw new YarnException("App to be moved " + appId + " not found.");
     }
@@ -1449,8 +1374,8 @@ public class FairScheduler extends Abstr
    * Helper for moveApplication, which has appropriate synchronization, so all
    * operations will be atomic.
    */
-  private void executeMove(SchedulerApplication app, FSSchedulerApp attempt,
-      FSLeafQueue oldQueue, FSLeafQueue newQueue) {
+  private void executeMove(SchedulerApplication<FSSchedulerApp> app,
+      FSSchedulerApp attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) {
     boolean wasRunnable = oldQueue.removeApp(attempt);
     // if app was not runnable before, it may be runnable now
     boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1596754&r1=1596753&r2=1596754&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Thu May 22 05:33:00 2014
@@ -369,7 +369,7 @@ public class QueueManager {
       // Set scheduling policies
       try {
         SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName());
-        policy.initialize(scheduler.getClusterCapacity());
+        policy.initialize(scheduler.getClusterResource());
         queue.setPolicy(policy);
       } catch (AllocationConfigurationException ex) {
         LOG.warn("Cannot apply configured scheduling policy to queue "

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1596754&r1=1596753&r2=1596754&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Thu May 22 05:33:00 2014
@@ -25,7 +25,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.commons.logging.Log;
@@ -38,7 +37,6 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -76,11 +74,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -104,7 +100,8 @@ import com.google.common.annotations.Vis
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
-public class FifoScheduler extends AbstractYarnScheduler implements
+public class FifoScheduler extends
+    AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
     Configurable {
 
   private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
@@ -114,11 +111,7 @@ public class FifoScheduler extends Abstr
 
   Configuration conf;
 
-  protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
-
   private boolean initialized;
-  private Resource minimumAllocation;
-  private Resource maximumAllocation;
   private boolean usePortForNodeName;
 
   private ActiveUsersManager activeUsersManager;
@@ -218,19 +211,9 @@ public class FifoScheduler extends Abstr
   }
 
   @Override
-  public Resource getMinimumResourceCapability() {
-    return minimumAllocation;
-  }
-
-  @Override
   public int getNumClusterNodes() {
     return nodes.size();
   }
-  
-  @Override
-  public Resource getMaximumResourceCapability() {
-    return maximumAllocation;
-  }
 
   @Override
   public synchronized void
@@ -242,7 +225,7 @@ public class FifoScheduler extends Abstr
       this.rmContext = rmContext;
       //Use ConcurrentSkipListMap because applications need to be ordered
       this.applications =
-          new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
+          new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
       this.minimumAllocation = 
         Resources.createResource(conf.getInt(
             YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
@@ -332,30 +315,6 @@ public class FifoScheduler extends Abstr
     }
   }
 
-  @VisibleForTesting
-  FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
-    SchedulerApplication app =
-        applications.get(applicationAttemptId.getApplicationId());
-    if (app != null) {
-      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
-  }
-
-  @Override
-  public SchedulerAppReport getSchedulerAppInfo(
-      ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
-    return app == null ? null : new SchedulerAppReport(app);
-  }
-  
-  @Override
-  public ApplicationResourceUsageReport getAppResourceUsageReport(
-      ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
-    return app == null ? null : app.getResourceUsageReport();
-  }
-  
   private FiCaSchedulerNode getNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }
@@ -363,8 +322,8 @@ public class FifoScheduler extends Abstr
   @VisibleForTesting
   public synchronized void addApplication(ApplicationId applicationId,
       String queue, String user) {
-    SchedulerApplication application =
-        new SchedulerApplication(DEFAULT_QUEUE, user);
+    SchedulerApplication<FiCaSchedulerApp> application =
+        new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
     applications.put(applicationId, application);
     metrics.submitApp(user);
     LOG.info("Accepted application " + applicationId + " from user: " + user
@@ -377,7 +336,7 @@ public class FifoScheduler extends Abstr
   public synchronized void
       addApplicationAttempt(ApplicationAttemptId appAttemptId,
           boolean transferStateFromPreviousAttempt) {
-    SchedulerApplication application =
+    SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(appAttemptId.getApplicationId());
     String user = application.getUser();
     // TODO: Fix store
@@ -401,7 +360,8 @@ public class FifoScheduler extends Abstr
 
   private synchronized void doneApplication(ApplicationId applicationId,
       RMAppState finalState) {
-    SchedulerApplication application = applications.get(applicationId);
+    SchedulerApplication<FiCaSchedulerApp> application =
+        applications.get(applicationId);
     if (application == null){
       LOG.warn("Couldn't find application " + applicationId);
       return;
@@ -419,7 +379,7 @@ public class FifoScheduler extends Abstr
       RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
       throws IOException {
     FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
-    SchedulerApplication application =
+    SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     if (application == null || attempt == null) {
       throw new IOException("Unknown application " + applicationAttemptId + 
@@ -456,13 +416,13 @@ public class FifoScheduler extends Abstr
         " #applications=" + applications.size());
 
     // Try to assign containers to applications in fifo order
-    for (Map.Entry<ApplicationId, SchedulerApplication> e : applications
+    for (Map.Entry<ApplicationId, SchedulerApplication<FiCaSchedulerApp>> e : applications
         .entrySet()) {
-      FiCaSchedulerApp application =
-          (FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
+      FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt();
       if (application == null) {
         continue;
       }
+
       LOG.debug("pre-assignContainers");
       application.showRequests();
       synchronized (application) {
@@ -499,7 +459,7 @@ public class FifoScheduler extends Abstr
 
     // Update the applications' headroom to correctly take into
     // account the containers assigned in this update.
-    for (SchedulerApplication application : applications.values()) {
+    for (SchedulerApplication<FiCaSchedulerApp> application : applications.values()) {
       FiCaSchedulerApp attempt =
           (FiCaSchedulerApp) application.getCurrentAppAttempt();
       if (attempt == null) {
@@ -864,7 +824,6 @@ public class FifoScheduler extends Abstr
      
   }
   
-  private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
   private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
 
   private synchronized void removeNode(RMNode nodeInfo) {
@@ -911,28 +870,11 @@ public class FifoScheduler extends Abstr
   }
 
   @Override
-  public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) {
-    FiCaSchedulerNode node = getNode(nodeId);
-    return node == null ? null : new SchedulerNodeReport(node);
-  }
-
-  @Override
   public RMContainer getRMContainer(ContainerId containerId) {
     FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
     return (attempt == null) ? null : attempt.getRMContainer(containerId);
   }
 
-  private FiCaSchedulerApp getCurrentAttemptForContainer(
-      ContainerId containerId) {
-    SchedulerApplication app =
-        applications.get(containerId.getApplicationAttemptId()
-          .getApplicationId());
-    if (app != null) {
-      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
-  }
-
   @Override
   public QueueMetrics getRootQueueMetrics() {
     return DEFAULT_QUEUE.getMetrics();
@@ -943,13 +885,14 @@ public class FifoScheduler extends Abstr
       QueueACL acl, String queueName) {
     return DEFAULT_QUEUE.hasAccess(acl, callerUGI);
   }
-  
+
   @Override
-  public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) {
+  public synchronized List<ApplicationAttemptId>
+      getAppsInQueue(String queueName) {
     if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
-      List<ApplicationAttemptId> attempts = new ArrayList<ApplicationAttemptId>(
-          applications.size());
-      for (SchedulerApplication app : applications.values()) {
+      List<ApplicationAttemptId> attempts =
+          new ArrayList<ApplicationAttemptId>(applications.size());
+      for (SchedulerApplication<FiCaSchedulerApp> app : applications.values()) {
         attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
       }
       return attempts;
@@ -957,5 +900,4 @@ public class FifoScheduler extends Abstr
       return null;
     }
   }
-
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java?rev=1596754&r1=1596753&r2=1596754&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java Thu May 22 05:33:00 2014
@@ -70,7 +70,7 @@ public class FairSchedulerQueueInfo {  
     queueName = queue.getName();
     schedulingPolicy = queue.getPolicy().getName();
     
-    clusterResources = new ResourceInfo(scheduler.getClusterCapacity());
+    clusterResources = new ResourceInfo(scheduler.getClusterResource());
     
     usedResources = new ResourceInfo(queue.getResourceUsage());
     fractionMemUsed = (float)usedResources.getMemory() /
@@ -81,7 +81,7 @@ public class FairSchedulerQueueInfo {  
     maxResources = new ResourceInfo(queue.getMaxShare());
     maxResources = new ResourceInfo(
         Resources.componentwiseMin(queue.getMaxShare(),
-            scheduler.getClusterCapacity()));
+            scheduler.getClusterResource()));
     
     fractionMemFairShare = (float)fairResources.getMemory() / clusterResources.getMemory();
     fractionMemMinShare = (float)minResources.getMemory() / clusterResources.getMemory();

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java?rev=1596754&r1=1596753&r2=1596754&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java Thu May 22 05:33:00 2014
@@ -456,7 +456,7 @@ public class TestProportionalCapacityPre
 
     Resource clusterResources =
       Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
-    when(mCS.getClusterResources()).thenReturn(clusterResources);
+    when(mCS.getClusterResource()).thenReturn(clusterResources);
     return policy;
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java?rev=1596754&r1=1596753&r2=1596754&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java Thu May 22 05:33:00 2014
@@ -384,15 +384,18 @@ public class TestSchedulerUtils {
     Assert.assertEquals(ContainerExitStatus.PREEMPTED, cd.getExitStatus());
   }
 
-  public static <T> SchedulerApplication verifyAppAddedAndRemovedFromScheduler(
-      final Map<ApplicationId, SchedulerApplication> applications,
-      EventHandler<SchedulerEvent> handler, String queueName) throws Exception {
+  public static SchedulerApplication<SchedulerApplicationAttempt>
+      verifyAppAddedAndRemovedFromScheduler(
+          Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications,
+          EventHandler<SchedulerEvent> handler, String queueName)
+          throws Exception {
     ApplicationId appId =
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
     AppAddedSchedulerEvent appAddedEvent =
         new AppAddedSchedulerEvent(appId, queueName, "user");
     handler.handle(appAddedEvent);
-    SchedulerApplication app = applications.get(appId);
+    SchedulerApplication<SchedulerApplicationAttempt> app =
+        applications.get(appId);
     // verify application is added.
     Assert.assertNotNull(app);
     Assert.assertEquals("user", app.getUser());

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1596754&r1=1596753&r2=1596754&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Thu May 22 05:33:00 2014
@@ -81,7 +81,7 @@ public class TestApplicationLimits {
         thenReturn(Resources.createResource(GB, 1));
     when(csContext.getMaximumResourceCapability()).
         thenReturn(Resources.createResource(16*GB, 32));
-    when(csContext.getClusterResources()).
+    when(csContext.getClusterResource()).
         thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
     when(csContext.getApplicationComparator()).
         thenReturn(CapacityScheduler.applicationComparator);
@@ -165,7 +165,7 @@ public class TestApplicationLimits {
     
     // Say cluster has 100 nodes of 16G each
     Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16);
-    when(csContext.getClusterResources()).thenReturn(clusterResource);
+    when(csContext.getClusterResource()).thenReturn(clusterResource);
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     CSQueue root = 
@@ -478,7 +478,7 @@ public class TestApplicationLimits {
     
     // Say cluster has 100 nodes of 16G each
     Resource clusterResource = Resources.createResource(100 * 16 * GB);
-    when(csContext.getClusterResources()).thenReturn(clusterResource);
+    when(csContext.getClusterResource()).thenReturn(clusterResource);
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     CapacityScheduler.parseQueue(csContext, csConf, null, "root", 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1596754&r1=1596753&r2=1596754&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Thu May 22 05:33:00 2014
@@ -29,8 +29,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
-import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -56,8 +54,11 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -465,14 +467,14 @@ public class TestCapacityScheduler {
     cs.handle(new NodeAddedSchedulerEvent(n1));
     cs.handle(new NodeAddedSchedulerEvent(n2));
 
-    Assert.assertEquals(6 * GB, cs.getClusterResources().getMemory());
+    Assert.assertEquals(6 * GB, cs.getClusterResource().getMemory());
 
     // reconnect n1 with downgraded memory
     n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
     cs.handle(new NodeRemovedSchedulerEvent(n1));
     cs.handle(new NodeAddedSchedulerEvent(n1));
 
-    Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory());
+    Assert.assertEquals(4 * GB, cs.getClusterResource().getMemory());
   }
 
   @Test
@@ -627,17 +629,17 @@ public class TestCapacityScheduler {
 
   @Test
   public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {
-
-    AsyncDispatcher rmDispatcher = new AsyncDispatcher();
-    CapacityScheduler cs = new CapacityScheduler();
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
     setupQueueConfiguration(conf);
-    cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null,
-      null, null, new RMContainerTokenSecretManager(conf),
-      new NMTokenSecretManagerInRM(conf),
-      new ClientToAMTokenSecretManagerInRM(), null));
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    MockRM rm = new MockRM(conf);
+    @SuppressWarnings("unchecked")
+    AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> cs =
+        (AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm
+          .getResourceScheduler();
 
-    SchedulerApplication app =
+    SchedulerApplication<SchedulerApplicationAttempt> app =
         TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
           cs.getSchedulerApplications(), cs, "a1");
     Assert.assertEquals("a1", app.getQueue().getQueueName());

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java?rev=1596754&r1=1596753&r2=1596754&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java Thu May 22 05:33:00 2014
@@ -89,7 +89,7 @@ public class TestChildQueueOrder {
         Resources.createResource(GB, 1));
     when(csContext.getMaximumResourceCapability()).thenReturn(
         Resources.createResource(16*GB, 32));
-    when(csContext.getClusterResources()).
+    when(csContext.getClusterResource()).
     thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
     when(csContext.getApplicationComparator()).
     thenReturn(CapacityScheduler.applicationComparator);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1596754&r1=1596753&r2=1596754&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Thu May 22 05:33:00 2014
@@ -122,7 +122,7 @@ public class TestLeafQueue {
         thenReturn(Resources.createResource(GB, 1));
     when(csContext.getMaximumResourceCapability()).
         thenReturn(Resources.createResource(16*GB, 32));
-    when(csContext.getClusterResources()).
+    when(csContext.getClusterResource()).
         thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
     when(csContext.getApplicationComparator()).
     thenReturn(CapacityScheduler.applicationComparator);
@@ -1651,7 +1651,7 @@ public class TestLeafQueue {
             newQueues, queues,
             TestUtils.spyHook);
     queues = newQueues;
-    root.reinitialize(newRoot, cs.getClusterResources());
+    root.reinitialize(newRoot, cs.getClusterResource());
 
     // after reinitialization
     assertEquals(3, e.activeApplications.size());
@@ -1676,7 +1676,7 @@ public class TestLeafQueue {
             newQueues, queues,
             TestUtils.spyHook);
     queues = newQueues;
-    root.reinitialize(newRoot, cs.getClusterResources());
+    root.reinitialize(newRoot, cs.getClusterResource());
 
     // after reinitialization
     assertEquals(60, e.getNodeLocalityDelay());
@@ -2070,7 +2070,7 @@ public class TestLeafQueue {
     when(csContext.getConfiguration()).thenReturn(csConf);
     when(csContext.getConf()).thenReturn(new YarnConfiguration());
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
-    when(csContext.getClusterResources()).thenReturn(clusterResource);
+    when(csContext.getClusterResource()).thenReturn(clusterResource);
     when(csContext.getMinimumResourceCapability()).thenReturn(
         Resources.createResource(GB, 1));
     when(csContext.getMaximumResourceCapability()).thenReturn(

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java?rev=1596754&r1=1596753&r2=1596754&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java Thu May 22 05:33:00 2014
@@ -86,7 +86,7 @@ public class TestParentQueue {
         Resources.createResource(GB, 1));
     when(csContext.getMaximumResourceCapability()).thenReturn(
         Resources.createResource(16*GB, 32));
-    when(csContext.getClusterResources()).
+    when(csContext.getClusterResource()).
         thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
     when(csContext.getApplicationComparator()).
     thenReturn(CapacityScheduler.applicationComparator);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1596754&r1=1596753&r2=1596754&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu May 22 05:33:00 2014
@@ -43,7 +43,6 @@ import java.util.Set;
 
 import javax.xml.parsers.ParserConfigurationException;
 
-import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -80,8 +79,11 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -95,12 +97,14 @@ import org.apache.hadoop.yarn.server.uti
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.xml.sax.SAXException;
 
 import com.google.common.collect.Sets;
 
+@SuppressWarnings("unchecked")
 public class TestFairScheduler {
 
   static class MockClock implements Clock {
@@ -377,19 +381,19 @@ public class TestFairScheduler {
             .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
-    assertEquals(1024, scheduler.getClusterCapacity().getMemory());
+    assertEquals(1024, scheduler.getClusterResource().getMemory());
 
     // Add another node
     RMNode node2 =
         MockNodes.newNodeInfo(1, Resources.createResource(512), 2, "127.0.0.2");
     NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
     scheduler.handle(nodeEvent2);
-    assertEquals(1536, scheduler.getClusterCapacity().getMemory());
+    assertEquals(1536, scheduler.getClusterResource().getMemory());
 
     // Remove the first node
     NodeRemovedSchedulerEvent nodeEvent3 = new NodeRemovedSchedulerEvent(node1);
     scheduler.handle(nodeEvent3);
-    assertEquals(512, scheduler.getClusterCapacity().getMemory());
+    assertEquals(512, scheduler.getClusterResource().getMemory());
   }
 
   @Test
@@ -2123,7 +2127,7 @@ public class TestFairScheduler {
     FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2);
 
     DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
-    drfPolicy.initialize(scheduler.getClusterCapacity());
+    drfPolicy.initialize(scheduler.getClusterResource());
     scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
     scheduler.update();
 
@@ -2167,7 +2171,7 @@ public class TestFairScheduler {
     FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3);
     
     DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
-    drfPolicy.initialize(scheduler.getClusterCapacity());
+    drfPolicy.initialize(scheduler.getClusterResource());
     scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
     scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
     scheduler.update();
@@ -2210,7 +2214,7 @@ public class TestFairScheduler {
     FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4);
     
     DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
-    drfPolicy.initialize(scheduler.getClusterCapacity());
+    drfPolicy.initialize(scheduler.getClusterResource());
     scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
     scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
     scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy);
@@ -2466,8 +2470,8 @@ public class TestFairScheduler {
     fs.handle(nodeEvent2);
 
     // available resource
-    Assert.assertEquals(fs.getClusterCapacity().getMemory(), 16 * 1024);
-    Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 16);
+    Assert.assertEquals(fs.getClusterResource().getMemory(), 16 * 1024);
+    Assert.assertEquals(fs.getClusterResource().getVirtualCores(), 16);
 
     // send application request
     ApplicationAttemptId appAttemptId =
@@ -2647,8 +2651,9 @@ public class TestFairScheduler {
 
   @Test
   public void testAddAndRemoveAppFromFairScheduler() throws Exception {
-    FairScheduler scheduler =
-        (FairScheduler) resourceManager.getResourceScheduler();
+    AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> scheduler =
+        (AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) resourceManager
+          .getResourceScheduler();
     TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
       scheduler.getSchedulerApplications(), scheduler, "default");
   }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1596754&r1=1596753&r2=1596754&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Thu May 22 05:33:00 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.re
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
@@ -30,8 +29,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -60,13 +57,13 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -78,6 +75,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -594,9 +592,12 @@ public class TestFifoScheduler {
   public void testAddAndRemoveAppFromFiFoScheduler() throws Exception {
     Configuration conf = new Configuration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
-        ResourceScheduler.class);
+      ResourceScheduler.class);
     MockRM rm = new MockRM(conf);
-    FifoScheduler fs = (FifoScheduler)rm.getResourceScheduler();
+    @SuppressWarnings("unchecked")
+    AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> fs =
+        (AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm
+          .getResourceScheduler();
     TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
       fs.getSchedulerApplications(), fs, "queue");
   }



Mime
View raw message