hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1098051 - in /hadoop/mapreduce/branches/MR-279: ./ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/ha...
Date Sat, 30 Apr 2011 07:24:21 GMT
Author: acmurthy
Date: Sat Apr 30 07:24:20 2011
New Revision: 1098051

URL: http://svn.apache.org/viewvc?rev=1098051&view=rev
Log:
MAPREDUCE-2434. Metrics for ResourceManager. Contributed by Luke Lu.

Modified:
    hadoop/mapreduce/branches/MR-279/.gitignore
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java

Modified: hadoop/mapreduce/branches/MR-279/.gitignore
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/.gitignore?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/.gitignore (original)
+++ hadoop/mapreduce/branches/MR-279/.gitignore Sat Apr 30 07:24:20 2011
@@ -46,3 +46,4 @@ src/docs/cn/uming.conf
 target
 SecurityAuth.audit
 conf/yarn-site.xml
+.eclipse.templates/

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Sat Apr 30 07:24:20 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   MAPREDUCE-279
 
+    MAPREDUCE-2434. Metrics for ResourceManager. (Luke Lu via acmurthy)
+
     Fix container launch w/ inconsistent credential file naming. (cdouglas)
 
     Disable ContainerMonitoring for non-linux systems. (vinodkv)

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Sat Apr 30 07:24:20 2011
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 
 /**
@@ -75,6 +75,7 @@ public class Application {
   List<Container> allocated = new ArrayList<Container>(); 
   Set<NodeInfo> applicationOnNodes = new HashSet<NodeInfo>();
   ApplicationMaster master;
+  boolean pending = true; // for app metrics
   
   /* Reserved containers */
   private final Comparator<NodeManager> nodeComparator = 
@@ -107,6 +108,14 @@ public class Application {
     return user;
   }
 
+  public ApplicationState getState() {
+    return master.getState();
+  }
+
+  public boolean isPending() {
+    return pending;
+  }
+
   public synchronized Map<Priority, Map<String, ResourceRequest>> getRequests() {
     return requests;
   }
@@ -137,10 +146,8 @@ public class Application {
     List<Container> heartbeatContainers = allocated;
     allocated = new ArrayList<Container>();
 
-    // Metrics
     for (Container container : heartbeatContainers) {
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
-          overallConsumption, container.getResource());
+      Resources.addTo(overallConsumption, container.getResource());
     }
 
     LOG.debug("acquire:" +
@@ -159,13 +166,22 @@ public class Application {
    * application, by asking for more resources and releasing resources 
    * acquired by the application.
    * @param requests resources to be acquired
-   * @param release resources being released
    */
   synchronized public void updateResourceRequests(List<ResourceRequest> requests) {
+    QueueMetrics metrics = queue.getMetrics();
     // Update resource requests
     for (ResourceRequest request : requests) {
       Priority priority = request.getPriority();
       String hostName = request.getHostName();
+      boolean updatePendingResources = false;
+      ResourceRequest lastRequest = null;
+
+      if (hostName.equals(NodeManager.ANY)) {
+        LOG.debug("update:" +
+            " application=" + applicationId +
+            " request=" + request);
+        updatePendingResources = true;
+      }
 
       Map<String, ResourceRequest> asks = this.requests.get(priority);
 
@@ -173,14 +189,24 @@ public class Application {
         asks = new HashMap<String, ResourceRequest>();
         this.requests.put(priority, asks);
         this.priorities.add(priority);
+      } else if (updatePendingResources) {
+        lastRequest = asks.get(hostName);
       }
 
       asks.put(hostName, request);
 
-      if (hostName.equals(NodeManager.ANY)) {
-        LOG.debug("update:" +
-            " application=" + applicationId + 
-            " request=" + request);
+      if (updatePendingResources) {
+        int lastRequestContainers = lastRequest != null ?
+            lastRequest.getNumContainers() : 0;
+        Resource lastRequestCapability = lastRequest != null ?
+            lastRequest.getCapability() : Resources.none();
+        metrics.incrPendingResources(user,
+            request.getNumContainers() - lastRequestContainers,
+            Resources.subtractFrom( // save a clone
+                Resources.multiply(request.getCapability(),
+                                   request.getNumContainers()),
+                Resources.multiply(lastRequestCapability,
+                                   lastRequestContainers)));
       }
     }
   }
@@ -190,8 +216,7 @@ public class Application {
     for (Container container : release) {
       LOG.debug("update: " +
           "application=" + applicationId + " released=" + container);
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
-          currentConsumption, container.getResource());
+      Resources.subtractFrom(currentConsumption, container.getResource());
       for (Iterator<Container> i=acquired.iterator(); i.hasNext();) {
         Container c = i.next();
         if (c.getId().equals(container.getId())) {
@@ -220,6 +245,7 @@ public class Application {
   synchronized public void completedContainer(Container container) {
     LOG.info("Completed container: " + container);
     completedContainers.add(container);
+    queue.getMetrics().releaseResources(user, 1, container.getResource());
   }
 
   synchronized public void completedContainers(List<Container> containers) {
@@ -245,6 +271,15 @@ public class Application {
     } else {
       allocateOffSwitch(node, priority, request, containers);
     }
+    QueueMetrics metrics = queue.getMetrics();
+    if (pending) {
+      // once an allocation is done we assume the application is
+      // running from scheduler's POV.
+      pending = false;
+      metrics.incrAppsRunning(user);
+    }
+    LOG.debug("allocate: user: "+ user +", memory: "+ request.getCapability());
+    metrics.allocateResources(user, containers.size(), request.getCapability());
   }
 
   /**
@@ -306,8 +341,7 @@ public class Application {
   synchronized private void allocate(List<Container> containers) {
     // Update consumption and track allocations
     for (Container container : containers) {
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
-          currentConsumption, container.getResource());
+      Resources.addTo(currentConsumption, container.getResource());
 
       allocated.add(container);
 
@@ -391,4 +425,17 @@ public class Application {
     }
     return false;
   }
+
+  synchronized public void finish() {
+    // GC pending resources metrics
+    QueueMetrics metrics = queue.getMetrics();
+    for (Map<String, ResourceRequest> asks : requests.values()) {
+      ResourceRequest request = asks.get(NodeManager.ANY);
+      if (request != null) {
+        metrics.decrPendingResources(user, request.getNumContainers(),
+            Resources.multiply(request.getCapability(),
+                               request.getNumContainers()));
+      }
+    }
+  }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java Sat Apr 30 07:24:20 2011
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 
 /**
@@ -93,8 +94,7 @@ public class NodeManagerImpl implements 
     this.totalCapability = capability; 
     this.nodeAddress = nodeAddress;
     this.httpAddress = httpAddress;
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
-        availableResource, capability);
+    Resources.addTo(availableResource, capability);
     this.node = node;
   }
 
@@ -296,10 +296,8 @@ public class NodeManagerImpl implements 
           + this.nodeAddress);
       return;
     }
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
-        availableResource, resource);
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
-        usedResource, resource);
+    Resources.addTo(availableResource, resource);
+    Resources.subtractFrom(usedResource, resource);
   }
 
   public synchronized void deductAvailableResource(Resource resource) {
@@ -307,10 +305,8 @@ public class NodeManagerImpl implements 
       LOG.error("Invalid deduction of null resource for "
           + this.nodeAddress);
     }
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
-        availableResource, resource);
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
-        usedResource, resource);
+    Resources.subtractFrom(availableResource, resource);
+    Resources.addTo(usedResource, resource);
   }
 
   public synchronized void notifyFinishedApplication(ApplicationId applicationId) {  

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Sat Apr 30 07:24:20 2011
@@ -39,6 +39,12 @@ public interface Queue {
   String getQueueName();
 
   /**
+   * Get the queue metrics
+   * @return the queue metrics
+   */
+  QueueMetrics getMetrics();
+
+  /**
    * Get ACLs for the queue.
    * @return ACLs for the queue
    */

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Sat Apr 30 07:24:20 2011
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
@@ -153,7 +154,7 @@ implements ResourceScheduler, CapacitySc
     CapacitySchedulerConfiguration.PREFIX + ROOT;
 
   private void initializeQueues(CapacitySchedulerConfiguration conf) {
-    root = parseQueue(conf, null, ROOT, queues);
+    root = parseQueue(conf, null, ROOT, queues, queues);
     LOG.info("Initialized root queue " + root);
   }
 
@@ -161,7 +162,7 @@ implements ResourceScheduler, CapacitySc
   throws IOException {
     // Parse new queues
     Map<String, Queue> newQueues = new HashMap<String, Queue>();
-    Queue newRoot = parseQueue(conf, null, ROOT, newQueues);
+    Queue newRoot = parseQueue(conf, null, ROOT, newQueues, queues);
     
     // Ensure all existing queues are still present
     validateExistingQueues(queues, newQueues);
@@ -186,20 +187,23 @@ implements ResourceScheduler, CapacitySc
   }
 
   private Queue parseQueue(CapacitySchedulerConfiguration conf, 
-      Queue parent, String queueName, Map<String, Queue> queues) {
+      Queue parent, String queueName, Map<String, Queue> queues,
+      Map<String, Queue> oldQueues) {
     Queue queue;
     String[] childQueueNames = 
       conf.getQueues((parent == null) ? 
           queueName : (parent.getQueuePath()+"."+queueName));
     if (childQueueNames == null || childQueueNames.length == 0) {
-      queue = new LeafQueue(this, queueName, parent, applicationComparator);
+      queue = new LeafQueue(this, queueName, parent, applicationComparator,
+                            oldQueues.get(queueName));
     } else {
       ParentQueue parentQueue = 
-        new ParentQueue(this, queueName, queueComparator, parent);
+        new ParentQueue(this, queueName, queueComparator, parent,
+                        oldQueues.get(queueName));
       List<Queue> childQueues = new ArrayList<Queue>();
       for (String childQueueName : childQueueNames) {
         Queue childQueue = 
-          parseQueue(conf, parentQueue, childQueueName, queues);
+          parseQueue(conf, parentQueue, childQueueName, queues, oldQueues);
         childQueues.add(childQueue);
       }
       parentQueue.setChildQueues(childQueues);
@@ -462,8 +466,7 @@ implements ResourceScheduler, CapacitySc
 
   @Override
   public synchronized void removeNode(NodeInfo nodeInfo) {
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
-        clusterResource, nodeInfo.getTotalCapability());
+    Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
     //TODO inform the applications that the containers are completed/failed
     nodes.remove(nodeInfo.getNodeAddress());
   }
@@ -476,8 +479,7 @@ implements ResourceScheduler, CapacitySc
   @Override
   public synchronized void addNode(NodeManager nodeManager) {
     nodes.put(nodeManager.getNodeAddress(), nodeManager);
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
-        clusterResource, nodeManager.getTotalCapability());
+    Resources.addTo(clusterResource, nodeManager.getTotalCapability());
   }
 
   public synchronized boolean releaseContainer(ApplicationId applicationId, 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Sat Apr 30 07:24:20 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
 public class CapacitySchedulerConfiguration extends Configuration {
 
@@ -92,7 +93,11 @@ public class CapacitySchedulerConfigurat
   
   @Private
   public static String DEFAULT_ACL = "*";
-  
+
+  @Private public static final String ENABLE_USER_METRICS =
+      PREFIX +"user-metrics.enable";
+  @Private public static final boolean DEFAULT_ENABLE_USER_METRICS = false;
+
   public CapacitySchedulerConfiguration() {
     this(new Configuration());
   }
@@ -191,8 +196,10 @@ public class CapacitySchedulerConfigurat
   
   public Resource getMinimumAllocation() {
     int minimumMemory = getInt(MINIMUM_ALLOCATION, MINIMUM_MEMORY);
-    return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
-             createResource(minimumMemory);
+    return Resources.createResource(minimumMemory);
+  }
+
+  public boolean getEnableUserMetrics() {
+    return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS);
   }
-  
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Sat Apr 30 07:24:20 2011
@@ -49,10 +49,12 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 
 @Private
@@ -72,8 +74,7 @@ public class LeafQueue implements Queue 
   private int maxApplications;
   private int maxApplicationsPerUser;
   
-  private Resource usedResources = 
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+  private Resource usedResources = Resources.createResource(0);
   private float utilization = 0.0f;
   private float usedCapacity = 0.0f;
   private volatile int numContainers;
@@ -86,6 +87,8 @@ public class LeafQueue implements Queue 
 
   private Map<String, User> users = new HashMap<String, User>();
   
+  private final QueueMetrics metrics;
+
   private QueueInfo queueInfo; 
   private Map<ApplicationId, org.apache.hadoop.yarn.api.records.Application> 
   applicationInfos;
@@ -100,9 +103,13 @@ public class LeafQueue implements Queue 
 
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, Queue parent, 
-      Comparator<Application> applicationComparator) {
+      Comparator<Application> applicationComparator, Queue old) {
     this.queueName = queueName;
     this.parent = parent;
+    // must be after parent and queueName are initialized
+    this.metrics = old != null ? old.getMetrics() :
+        QueueMetrics.forQueue(getQueuePath(), parent,
+        cs.getConfiguration().getEnableUserMetrics());
     
     this.minimumAllocation = cs.getMinimumAllocation();
     this.containerTokenSecretManager = cs.getContainerTokenSecretManager();
@@ -350,7 +357,7 @@ public class LeafQueue implements Queue 
         leafQueue.maxApplications, leafQueue.maxApplicationsPerUser,
         leafQueue.state, leafQueue.acls);
     
-    update(clusterResource);
+    updateResource(clusterResource);
   }
 
   @Override
@@ -416,6 +423,8 @@ public class LeafQueue implements Queue 
       addApplication(application, user);
     }
 
+    metrics.submitApp(userName);
+
     // Inform the parent queue
     try {
       parent.submitApplication(application, userName, queue, priority);
@@ -449,6 +458,10 @@ public class LeafQueue implements Queue 
     synchronized (this) {
       removeApplication(application, getUser(application.getUser()));
     }
+
+    // Update metrics
+    metrics.finishApp(application);
+    application.finish();
     
     // Inform the parent queue
     parent.finishApplication(application, queue);
@@ -508,12 +521,12 @@ public class LeafQueue implements Queue 
             
             // Maximum Capacity of the queue
             if (!assignToQueue(clusterResource, required.getCapability())) {
-              return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+              return Resources.none();
             }
             
             // User limits
             if (!assignToUser(application.getUser(), clusterResource, required.getCapability())) {
-              return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+              return Resources.none();
             }
             
           }
@@ -521,9 +534,7 @@ public class LeafQueue implements Queue 
           Resource assigned = 
             assignContainersOnNode(clusterResource, node, application, priority);
   
-          if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
-                assigned, 
-                org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+          if (Resources.greaterThan(assigned, Resources.none())) {
             Resource assignedResource = 
               application.getResourceRequest(priority, NodeManager.ANY).getCapability();
             
@@ -544,7 +555,7 @@ public class LeafQueue implements Queue 
       application.showRequests();
     }
   
-    return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+    return Resources.none();
   }
 
   private synchronized Resource assignReservedContainers(Application application, 
@@ -561,7 +572,7 @@ public class LeafQueue implements Queue 
 
     // Doesn't matter... since it's already charged for at time of reservation
     // "re-reservation" is *free*
-    return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+    return Resources.none();
   }
 
   private synchronized boolean assignToQueue(Resource clusterResource, 
@@ -616,6 +627,9 @@ public class LeafQueue implements Queue 
           (int)(queueCapacity * userLimitFactor)
           );
 
+    metrics.setAvailableUserMemory(userName,
+        limit - user.getConsumedResources().getMemory());
+
     // Note: We aren't considering the current request since there is a fixed
     // overhead of the AM, so... 
     if ((user.getConsumedResources().getMemory()) > limit) {
@@ -654,22 +668,17 @@ public class LeafQueue implements Queue 
   Resource assignContainersOnNode(Resource clusterResource, NodeManager node, 
       Application application, Priority priority) {
 
-    Resource assigned = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+    Resource assigned = Resources.none();
 
     // Data-local
     assigned = assignNodeLocalContainers(clusterResource, node, application, priority); 
-    if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
-          assigned, 
-          org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+    if (Resources.greaterThan(assigned, Resources.none())) {
       return assigned;
     }
 
     // Rack-local
     assigned = assignRackLocalContainers(clusterResource, node, application, priority);
-    if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
-        assigned, 
-        org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+    if (Resources.greaterThan(assigned, Resources.none())) {
     return assigned;
   }
     
@@ -688,7 +697,7 @@ public class LeafQueue implements Queue 
       }
     }
     
-    return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+    return Resources.none();
   }
 
   Resource assignRackLocalContainers(Resource clusterResource, NodeManager node, 
@@ -702,7 +711,7 @@ public class LeafQueue implements Queue 
       }
     }
     
-    return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+    return Resources.none();
   }
 
   Resource assignOffSwitchContainers(Resource clusterResource, NodeManager node, 
@@ -716,7 +725,7 @@ public class LeafQueue implements Queue 
       }
     }
     
-    return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+    return Resources.none();
   }
 
   boolean canAssign(Application application, Priority priority, 
@@ -837,7 +846,8 @@ public class LeafQueue implements Queue 
 
       }
     }
-    return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+
+    return Resources.none();
   }
 
   private void allocate(Application application, NodeType type, 
@@ -893,9 +903,8 @@ public class LeafQueue implements Queue 
 
   private synchronized void allocateResource(Resource clusterResource, 
       String userName, Resource resource) {
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
-      addResource(usedResources, resource);
-    update(clusterResource);
+    Resources.addTo(usedResources, resource);
+    updateResource(clusterResource);
     ++numContainers;
     
     User user = getUser(userName);
@@ -904,23 +913,29 @@ public class LeafQueue implements Queue 
 
   private synchronized void releaseResource(Resource clusterResource, 
       String userName, Resource resource) {
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
-      subtractResource(usedResources, resource);
-    update(clusterResource);
+    Resources.subtractFrom(usedResources, resource);
+    updateResource(clusterResource);
     --numContainers;
     
     User user = getUser(userName);
     user.releaseContainer(resource);
   }
 
-  private synchronized void update(Resource clusterResource) {
-    setUtilization(usedResources.getMemory() / (clusterResource.getMemory() * absoluteCapacity));
+  @Override
+  public synchronized void updateResource(Resource clusterResource) {
+    float memLimit = clusterResource.getMemory() * absoluteCapacity;
+    setUtilization(usedResources.getMemory() / memLimit);
     setUsedCapacity(usedResources.getMemory() / (clusterResource.getMemory() * capacity));
+    metrics.setAvailableQueueMemory((int) memLimit - usedResources.getMemory());
   }
-  
+
+  @Override
+  public QueueMetrics getMetrics() {
+    return metrics;
+  }
+
   static class User {
-    Resource consumed = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+    Resource consumed = Resources.createResource(0);
     int applications = 0;
     
     public Resource getConsumedResources() {
@@ -940,13 +955,11 @@ public class LeafQueue implements Queue 
     }
     
     public synchronized void assignContainer(Resource resource) {
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
-          consumed, resource);
+      Resources.addTo(consumed, resource);
     }
     
     public synchronized void releaseContainer(Resource resource) {
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
-          consumed, resource);
+      Resources.subtractFrom(consumed, resource);
     }
   }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Sat Apr 30 07:24:20 2011
@@ -46,9 +46,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 
 @Private
 @Evolving
@@ -71,7 +73,7 @@ public class ParentQueue implements Queu
   private final Comparator<Queue> queueComparator;
   
   private Resource usedResources = 
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+    Resources.createResource(0);
   
   private final boolean rootQueue;
   
@@ -81,7 +83,9 @@ public class ParentQueue implements Queu
   private volatile int numContainers;
 
   private QueueState state;
-  
+
+  private final QueueMetrics metrics;
+
   private QueueInfo queueInfo; 
   private Map<ApplicationId, org.apache.hadoop.yarn.api.records.Application> 
   applicationInfos;
@@ -93,13 +97,18 @@ public class ParentQueue implements Queu
     RecordFactoryProvider.getRecordFactory(null);
 
   public ParentQueue(CapacitySchedulerContext cs, 
-      String queueName, Comparator<Queue> comparator, Queue parent) {
+      String queueName, Comparator<Queue> comparator, Queue parent, Queue old) {
     minimumAllocation = cs.getMinimumAllocation();
     
     this.parent = parent;
     this.queueName = queueName;
     this.rootQueue = (parent == null);
-    
+
+    // must be called after parent and queueName is set
+    this.metrics = old != null ? old.getMetrics() :
+        QueueMetrics.forQueue(getQueuePath(), parent,
+        cs.getConfiguration().getEnableUserMetrics());
+
     float capacity = 
       (float)cs.getConfiguration().getCapacity(getQueuePath()) / 100;
 
@@ -377,7 +386,7 @@ public class ParentQueue implements Queu
         parentQueue.state, parentQueue.acls);
 
     // Update
-    update(clusterResource);
+    updateResource(clusterResource);
   }
 
   Map<String, Queue> getQueues(Set<Queue> queues) {
@@ -496,8 +505,7 @@ public class ParentQueue implements Queu
   @Override
   public synchronized Resource assignContainers(
       Resource clusterResource, NodeManager node) {
-    Resource assigned = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+    Resource assigned = Resources.createResource(0);
 
     while (canAssign(node)) {
       LOG.info("DEBUG --- Trying to assign containers to child-queue of " + 
@@ -515,15 +523,12 @@ public class ParentQueue implements Queu
       Resource assignedToChild = assignContainersToChildQueues(clusterResource, node);
 
       // Done if no child-queue assigned anything
-      if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
-          assignedToChild, 
-          org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+      if (Resources.greaterThan(assignedToChild, Resources.none())) {
         // Track resource utilization for the parent-queue
         allocateResource(clusterResource, assignedToChild);
         
         // Track resource utilization in this pass of the scheduler
-        org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
-            assigned, assignedToChild);
+        Resources.addTo(assigned, assignedToChild);
         
         LOG.info("assignedContainer" +
             " queue=" + getQueueName() + 
@@ -555,15 +560,13 @@ public class ParentQueue implements Queu
   
   private boolean canAssign(NodeManager node) {
     return (node.getReservedApplication() == null) && 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThanOrEqual(
-        node.getAvailableResource(), 
-        minimumAllocation);
+        Resources.greaterThanOrEqual(node.getAvailableResource(), 
+                                     minimumAllocation);
   }
   
   synchronized Resource assignContainersToChildQueues(Resource cluster, 
       NodeManager node) {
-    Resource assigned = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+    Resource assigned = Resources.createResource(0);
     
     printChildQueues();
 
@@ -576,9 +579,7 @@ public class ParentQueue implements Queu
       assigned = childQueue.assignContainers(cluster, node);
 
       // If we do assign, remove the queue and re-insert in-order to re-sort
-      if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
-            assigned, 
-            org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+      if (Resources.greaterThan(assigned, Resources.none())) {
         // Remove and re-insert to sort
         iter.remove();
         LOG.info("Re-sorting queues since queue: " + childQueue.getQueuePath() + 
@@ -629,23 +630,28 @@ public class ParentQueue implements Queu
   
   private synchronized void allocateResource(Resource clusterResource, 
       Resource resource) {
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
-      addResource(usedResources, resource);
-    update(clusterResource);
+    Resources.addTo(usedResources, resource);
+    updateResource(clusterResource);
     ++numContainers;
   }
   
   private synchronized void releaseResource(Resource clusterResource, 
       Resource resource) {
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
-      subtractResource(usedResources, resource);
-    update(clusterResource);
+    Resources.subtractFrom(usedResources, resource);
+    updateResource(clusterResource);
     --numContainers;
   }
 
-  private synchronized void update(Resource clusterResource) {
-    setUtilization(usedResources.getMemory() / (clusterResource.getMemory() * absoluteCapacity));
+  @Override
+  public synchronized void updateResource(Resource clusterResource) {
+    float memLimit = clusterResource.getMemory() * absoluteCapacity;
+    setUtilization(usedResources.getMemory() / memLimit);
     setUsedCapacity(usedResources.getMemory() / (clusterResource.getMemory() * capacity));
+    metrics.setAvailableQueueMemory((int) memLimit - usedResources.getMemory());
+  }
+
+  @Override
+  public QueueMetrics getMetrics() {
+    return metrics;
   }
-  
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Sat Apr 30 07:24:20 2011
@@ -188,4 +188,10 @@ extends org.apache.hadoop.yarn.server.re
    */
   public void reinitialize(Queue queue, Resource clusterResource) 
   throws IOException;
+
+   /**
+   * Update the cluster resource for queues as we add/remove nodes
+   * @param clusterResource the current cluster resource
+   */
+  public void updateResource(Resource clusterResource);
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Sat Apr 30 07:24:20 2011
@@ -51,12 +51,14 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 
@@ -77,24 +79,29 @@ public class FifoScheduler implements Re
   private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
   private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
   
-  public static final Resource MINIMUM_ALLOCATION = 
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
-        MINIMUM_MEMORY);
+  public static final Resource MINIMUM_ALLOCATION =
+      Resources.createResource(MINIMUM_MEMORY);
     
   Map<ApplicationId, Application> applications = 
     new TreeMap<ApplicationId, Application>(
         new org.apache.hadoop.yarn.util.BuilderUtils.ApplicationIdComparator());
 
+  private static final String DEFAULT_QUEUE_NAME = "default";
+  private final QueueMetrics metrics =
+      QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false);
+
   private final Queue DEFAULT_QUEUE = new Queue() {
-    
-    private static final String DEFAULT_QUEUE_NAME = "default";
-    
     @Override
     public String getQueueName() {
       return DEFAULT_QUEUE_NAME;
     }
 
     @Override
+    public QueueMetrics getMetrics() {
+      return metrics;
+    }
+
+    @Override
     public QueueInfo getQueueInfo(boolean includeApplications, 
         boolean includeChildQueues, boolean recursive) {
       QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
@@ -196,8 +203,10 @@ public class FifoScheduler implements Re
   
   private void normalizeRequest(ResourceRequest ask) {
     int memory = ask.getCapability().getMemory();
-    memory = 
-      MINIMUM_MEMORY * ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY)); 
+    // FIXME: TestApplicationCleanup is relying on unnormalized behavior.
+    //ask.capability.memory = MINIMUM_MEMORY *
+    memory = MINIMUM_MEMORY *
+        ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
   }
   
   private synchronized Application getApplication(ApplicationId applicationId) {
@@ -210,6 +219,7 @@ public class FifoScheduler implements Re
   throws IOException {
     applications.put(applicationId, 
         new Application(applicationId, master, DEFAULT_QUEUE, user));
+    metrics.submitApp(user);
     LOG.info("Application Submission: " + applicationId.getId() + " from " + user + 
         ", currently active: " + applications.size());
   }
@@ -225,7 +235,11 @@ public class FifoScheduler implements Re
     
     // Release current containers
     releaseContainers(application, application.getCurrentContainers());
-    
+
+    // Update metrics
+    metrics.finishApp(application);
+    application.finish();
+
     // Let the cluster know that the applications are done
     finishedApplication(applicationId, 
         application.getAllNodesForApplication());
@@ -269,8 +283,7 @@ public class FifoScheduler implements Re
       application.showRequests();
       
       // Done
-      if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.lessThan(
-          node.getAvailableResource(), MINIMUM_ALLOCATION)) {
+      if (Resources.lessThan(node.getAvailableResource(), MINIMUM_ALLOCATION)) {
         return;
       }
     }
@@ -416,7 +429,7 @@ public class FifoScheduler implements Re
                     node.getHttpAddress(), capability);
         // If security is enabled, send the container-tokens too.
         if (UserGroupInformation.isSecurityEnabled()) {
-          ContainerToken containerToken = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ContainerToken.class);
+          ContainerToken containerToken = recordFactory.newRecordInstance(ContainerToken.class);
           ContainerTokenIdentifier tokenidentifier =
               new ContainerTokenIdentifier(container.getId(),
                   container.getContainerManagerAddress(), container.getResource());
@@ -433,6 +446,8 @@ public class FifoScheduler implements Re
       }
       application.allocate(type, node, priority, request, containers);
       addAllocatedContainers(node, application.getApplicationId(), containers);
+      Resources.addTo(usedResource,
+                      Resources.multiply(capability, assignedContainers));
     }
     return assignedContainers;
   }
@@ -457,10 +472,12 @@ public class FifoScheduler implements Re
     NodeResponse nodeResponse = nodeUpdateInternal(node, containers);
     applicationCompletedContainers(nodeResponse.getCompletedContainers());
     LOG.info("Node heartbeat " + node.getNodeID() + " resource = " + node.getAvailableResource());
-    if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
-        greaterThanOrEqual(node.getAvailableResource(), MINIMUM_ALLOCATION)) {
+    if (Resources.greaterThanOrEqual(node.getAvailableResource(),
+                                     MINIMUM_ALLOCATION)) {
       assignContainers(node);
     }
+    metrics.setAvailableQueueMemory(
+        clusterResource.getMemory() - usedResource.getMemory());
     LOG.info("Node after allocation " + node.getNodeID() + " resource = "
       + node.getAvailableResource());
 
@@ -495,7 +512,8 @@ public class FifoScheduler implements Re
   }
   
   private Map<String, NodeManager> nodes = new HashMap<String, NodeManager>();
-  private Resource clusterResource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
+  private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
+  private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
  
   public synchronized Resource getClusterResource() {
     return clusterResource;
@@ -503,8 +521,7 @@ public class FifoScheduler implements Re
 
   @Override
   public synchronized void removeNode(NodeInfo nodeInfo) {
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
-        clusterResource, nodeInfo.getTotalCapability());
+    Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
     //TODO inform the the applications that the containers are completed/failed
     nodes.remove(nodeInfo.getNodeAddress());
   }
@@ -539,8 +556,7 @@ public class FifoScheduler implements Re
   @Override
   public synchronized void addNode(NodeManager nodeManager) {
     nodes.put(nodeManager.getNodeAddress(), nodeManager);
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
-        clusterResource, nodeManager.getTotalCapability());
+    Resources.addTo(clusterResource, nodeManager.getTotalCapability());
   }
 
   public synchronized boolean releaseContainer(ApplicationId applicationId, 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Sat Apr 30 07:24:20 2011
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 
 @Private
@@ -211,8 +212,7 @@ public class Application {
     stopRequest.setContainerId(containerId);
     nodeManager.stopContainer(stopRequest);
     
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
-        used, requestSpec.get(task.getPriority()));
+    Resources.subtractFrom(used, requestSpec.get(task.getPriority()));
     
     LOG.info("Finished task " + task.getTaskId() + 
         " of application " + applicationId + 
@@ -324,8 +324,7 @@ public class Application {
       Container container = i.next();
       String host = container.getContainerManagerAddress();
       
-      if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.equals(
-          requestSpec.get(priority), container.getResource())) { 
+      if (Resources.equals(requestSpec.get(priority), container.getResource())) { 
         // See which task can use this container
         for (Iterator<Task> t=tasks.get(priority).iterator(); t.hasNext();) {
           Task task = t.next();
@@ -336,8 +335,7 @@ public class Application {
             i.remove();
             
             // Track application resource usage
-            org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
-                used, container.getResource());
+            Resources.addTo(used, container.getResource());
             
             LOG.info("Assigned container (" + container + ") of type " + type +
                 " to task " + task.getTaskId() + " at priority " + priority + 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Sat Apr 30 07:24:20 2011
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
 
@@ -81,11 +82,8 @@ public class NodeManager implements Cont
     this.nodeHttpAddress = hostName + ":" + httpPort;
     this.rackName = rackName;
     this.resourceTracker = resourceTracker;
-    this.capability = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
-          memory);
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
-        available, capability);
+    this.capability = Resources.createResource(memory);
+    Resources.addTo(available, capability);
 
     RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
     request.setHost(hostName);
@@ -174,10 +172,8 @@ public class NodeManager implements Cont
                 containerLaunchContext.getResource());
     applicationContainers.add(container);
     
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
-        available, containerLaunchContext.getResource());
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
-        used, containerLaunchContext.getResource());
+    Resources.subtractFrom(available, containerLaunchContext.getResource());
+    Resources.addTo(used, containerLaunchContext.getResource());
     
     LOG.info("DEBUG --- startContainer:" +
         " node=" + containerManagerAddress +
@@ -230,10 +226,8 @@ public class NodeManager implements Cont
           " stopped " + ctr + " times!");
     }
     
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
-        available, container.getResource());
-    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
-        used, container.getResource());
+    Resources.addTo(available, container.getResource());
+    Resources.subtractFrom(used, container.getResource());
 
     LOG.info("DEBUG --- stopContainer:" +
         " node=" + containerManagerAddress +

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Sat Apr 30 07:24:20 2011
@@ -18,26 +18,22 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-
 import java.io.IOException;
 
-import junit.framework.TestCase;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestResourceManager extends TestCase {
+public class TestResourceManager {
   private static final Log LOG = LogFactory.getLog(TestResourceManager.class);
   
   private ResourceManager resourceManager = null;
@@ -133,9 +129,7 @@ public class TestResourceManager extends
     
     // Application resource requirements
     final int memory1 = 1024;
-    Resource capability1 = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
-          memory1); 
+    Resource capability1 = Resources.createResource(memory1);
     Priority priority1 = 
       org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(1);
     application.addResourceRequestSpec(priority1, capability1);
@@ -144,9 +138,7 @@ public class TestResourceManager extends
     application.addTask(t1);
         
     final int memory2 = 2048;
-    Resource capability2 = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
-          memory2); 
+    Resource capability2 = Resources.createResource(memory2);
     Priority priority0 = 
       org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0); // higher
     application.addResourceRequestSpec(priority0, capability2);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Sat Apr 30 07:24:20 2011
@@ -40,6 +40,7 @@ import org.junit.Test;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
 public class TestCapacityScheduler extends TestCase {
   private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
@@ -103,14 +104,10 @@ public class TestCapacityScheduler exten
     application_0.addNodeManager(host_0, 1234, nm_0);
     application_0.addNodeManager(host_1, 1234, nm_1);
 
-    Resource capability_0_0 = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
-          1 * GB); 
+    Resource capability_0_0 = Resources.createResource(1 * GB);
     application_0.addResourceRequestSpec(priority_1, capability_0_0);
     
-    Resource capability_0_1 = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
-          2 * GB); 
+    Resource capability_0_1 = Resources.createResource(2 * GB);
     application_0.addResourceRequestSpec(priority_0, capability_0_1);
 
     Task task_0_0 = new Task(application_0, priority_1, 
@@ -124,14 +121,10 @@ public class TestCapacityScheduler exten
     application_1.addNodeManager(host_0, 1234, nm_0);
     application_1.addNodeManager(host_1, 1234, nm_1);
     
-    Resource capability_1_0 = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
-          3 * GB); 
+    Resource capability_1_0 = Resources.createResource(3 * GB);
     application_1.addResourceRequestSpec(priority_1, capability_1_0);
     
-    Resource capability_1_1 = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
-          2 * GB); 
+    Resource capability_1_1 = Resources.createResource(2 * GB);
     application_1.addResourceRequestSpec(priority_0, capability_1_1);
 
     Task task_1_0 = new Task(application_1, priority_1, 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Sat Apr 30 07:24:20 2011
@@ -36,6 +36,7 @@ import org.junit.Test;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
 public class TestFifoScheduler extends TestCase {
   private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@@ -94,14 +95,10 @@ public class TestFifoScheduler extends T
     application_0.addNodeManager(host_0, 1234, nm_0);
     application_0.addNodeManager(host_1, 1234, nm_1);
 
-    Resource capability_0_0 = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
-          GB); 
+    Resource capability_0_0 = Resources.createResource(GB);
     application_0.addResourceRequestSpec(priority_1, capability_0_0);
     
-    Resource capability_0_1 = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
-          2 * GB); 
+    Resource capability_0_1 = Resources.createResource(2 * GB);
     application_0.addResourceRequestSpec(priority_0, capability_0_1);
 
     Task task_0_0 = new Task(application_0, priority_1, 
@@ -115,14 +112,10 @@ public class TestFifoScheduler extends T
     application_1.addNodeManager(host_0, 1234, nm_0);
     application_1.addNodeManager(host_1, 1234, nm_1);
     
-    Resource capability_1_0 = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
-          3 * GB); 
+    Resource capability_1_0 = Resources.createResource(3 * GB);
     application_1.addResourceRequestSpec(priority_1, capability_1_0);
     
-    Resource capability_1_1 = 
-      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
-          4 * GB); 
+    Resource capability_1_1 = Resources.createResource(4 * GB);
     application_1.addResourceRequestSpec(priority_0, capability_1_1);
 
     Task task_1_0 = new Task(application_1, priority_1, 



Mime
View raw message