hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1432796 [2/4] - in /hadoop/common/branches/HDFS-2802/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yar...
Date Mon, 14 Jan 2013 03:44:42 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Mon Jan 14 03:44:35 2013
@@ -124,7 +124,7 @@ public class DefaultContainerExecutor ex
       Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);
       Path appDir = new Path(appCacheDir, appIdStr);
       Path containerDir = new Path(appDir, containerIdStr);
-      createDir(containerDir, dirPerm, false);
+      createDir(containerDir, dirPerm, true);
     }
 
     // Create the container log-dirs on all disks

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Mon Jan 14 03:44:35 2013
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -54,11 +56,10 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
-import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
 import org.apache.hadoop.yarn.util.Records;
 
-public class NodeManager extends CompositeService implements
-    ServiceStateChangeListener {
+public class NodeManager extends CompositeService 
+    implements EventHandler<NodeManagerEvent> {
 
   /**
    * Priority of the NodeManager shutdown hook.
@@ -82,6 +83,8 @@ public class NodeManager extends Composi
   
   private long waitForContainersOnShutdownMillis;
   
+  private AtomicBoolean isStopping = new AtomicBoolean(false);
+  
   public NodeManager() {
     super(NodeManager.class.getName());
   }
@@ -152,7 +155,6 @@ public class NodeManager extends Composi
 
     NodeStatusUpdater nodeStatusUpdater =
         createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
-    nodeStatusUpdater.register(this);
 
     NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
     addService(nodeResourceMonitor);
@@ -167,6 +169,7 @@ public class NodeManager extends Composi
     addService(webServer);
 
     dispatcher.register(ContainerManagerEventType.class, containerManager);
+    dispatcher.register(NodeManagerEventType.class, this);
     addService(dispatcher);
     
     DefaultMetricsSystem.initialize("NodeManager");
@@ -198,13 +201,17 @@ public class NodeManager extends Composi
 
   @Override
   public void stop() {
+    if (isStopping.getAndSet(true)) {
+      return;
+    }
+    
     cleanupContainers();
     super.stop();
     DefaultMetricsSystem.shutdown();
   }
   
   @SuppressWarnings("unchecked")
-  private void cleanupContainers() {
+  protected void cleanupContainers() {
     Map<ContainerId, Container> containers = context.getContainers();
     if (containers.isEmpty()) {
       return;
@@ -293,24 +300,10 @@ public class NodeManager extends Composi
     return nodeHealthChecker;
   }
 
-  @Override
-  public void stateChanged(Service service) {
-    if (NodeStatusUpdaterImpl.class.getName().equals(service.getName())
-        && STATE.STOPPED.equals(service.getServiceState())) {
-
-      boolean hasToReboot = ((NodeStatusUpdaterImpl) service).hasToRebootNode();
-
-      // Shutdown the Nodemanager when the NodeStatusUpdater is stopped.      
-      stop();
-
-      // Reboot the whole node-manager if NodeStatusUpdater got a reboot command
-      // from the RM.
-      if (hasToReboot) {
-        LOG.info("Rebooting the node manager.");
-        NodeManager nodeManager = createNewNodeManager();
-        nodeManager.initAndStartNodeManager(this.getConfig(), hasToReboot);
-      }
-    }
+  private void reboot() {
+    LOG.info("Rebooting the node manager.");
+    NodeManager nodeManager = createNewNodeManager();
+    nodeManager.initAndStartNodeManager(this.getConfig(), true);
   }
   
   private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
@@ -333,6 +326,21 @@ public class NodeManager extends Composi
     }
   }
 
+  @Override
+  public void handle(NodeManagerEvent event) {
+    switch (event.getType()) {
+    case SHUTDOWN:
+      stop();
+      break;
+    case REBOOT:
+      stop();
+      reboot();
+      break;
+    default:
+      LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
+    }
+  }
+  
   // For testing
   NodeManager createNewNodeManager() {
     return new NodeManager();

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Mon Jan 14 03:44:35 2013
@@ -88,8 +88,6 @@ public class NodeStatusUpdaterImpl exten
   private final NodeHealthCheckerService healthChecker;
   private final NodeManagerMetrics metrics;
 
-  private boolean hasToRebootNode;
-  
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
     super(NodeStatusUpdaterImpl.class.getName());
@@ -108,14 +106,37 @@ public class NodeStatusUpdaterImpl exten
     this.heartBeatInterval =
         conf.getLong(YarnConfiguration.NM_TO_RM_HEARTBEAT_INTERVAL_MS,
             YarnConfiguration.DEFAULT_NM_TO_RM_HEARTBEAT_INTERVAL_MS);
-    int memoryMb = conf.getInt(YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
+    int memoryMb = 
+        conf.getInt(
+            YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
+    float vMemToPMem =             
+        conf.getFloat(
+            YarnConfiguration.NM_VMEM_PMEM_RATIO, 
+            YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); 
+    int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem);
+    
+    int cpuCores =
+        conf.getInt(
+            YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
+    float vCoresToPCores =             
+        conf.getFloat(
+            YarnConfiguration.NM_VCORES_PCORES_RATIO, 
+            YarnConfiguration.DEFAULT_NM_VCORES_PCORES_RATIO); 
+    int virtualCores = (int)Math.ceil(cpuCores * vCoresToPCores); 
+
     this.totalResource = recordFactory.newRecordInstance(Resource.class);
     this.totalResource.setMemory(memoryMb);
+    this.totalResource.setVirtualCores(virtualCores);
     metrics.addResource(totalResource);
     this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
     this.tokenRemovalDelayMs =
         conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
             YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+    
+    LOG.info("Initialized nodemanager for " + nodeId + ":" +
+    		" physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
+    		" physical-cores=" + cpuCores + " virtual-cores=" + virtualCores);
+    
     super.init(conf);
   }
 
@@ -149,18 +170,6 @@ public class NodeStatusUpdaterImpl exten
     super.stop();
   }
   
-  private synchronized void reboot() {
-    this.hasToRebootNode = true;
-    // Stop the status-updater. This will trigger a sub-service state change in
-    // the NodeManager which will then decide to reboot or not based on
-    // isRebooted.
-    this.stop();
-  }
-
-  synchronized boolean hasToRebootNode() {
-    return this.hasToRebootNode;
-  }
-
   private boolean isSecurityEnabled() {
     return UserGroupInformation.isSecurityEnabled();
   }
@@ -348,13 +357,15 @@ public class NodeStatusUpdaterImpl exten
               LOG
                   .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
                   		" hence shutting down.");
-              NodeStatusUpdaterImpl.this.stop();
+              dispatcher.getEventHandler().handle(
+                  new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
               break;
             }
             if (response.getNodeAction() == NodeAction.REBOOT) {
               LOG.info("Node is out of sync with ResourceManager,"
                   + " hence rebooting.");
-              NodeStatusUpdaterImpl.this.reboot();
+              dispatcher.getEventHandler().handle(
+                  new NodeManagerEvent(NodeManagerEventType.REBOOT));
               break;
             }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Mon Jan 14 03:44:35 2013
@@ -17,20 +17,27 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+
 import java.io.DataOutputStream;
 import java.io.File;
-
+import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
-
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
@@ -41,36 +48,25 @@ import java.util.concurrent.ScheduledThr
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-import static org.apache.hadoop.fs.CreateFlag.CREATE;
-import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.hadoop.ipc.Server;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -94,8 +90,8 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
@@ -372,7 +368,9 @@ public class ResourceLocalizationService
         tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
       }
     }
-
+    String locId = ConverterUtils.toString(c.getContainerID());
+    localizerTracker.cleanupPrivLocalizers(locId);
+    
     // Delete the container directories
     String userName = c.getUser();
     String containerIDStr = c.toString();
@@ -520,9 +518,8 @@ public class ResourceLocalizationService
           synchronized (privLocalizers) {
             LocalizerRunner localizer = privLocalizers.get(locId);
             if (null == localizer) {
-              LOG.info("Created localizer for " + req.getLocalizerId());
-              localizer = new LocalizerRunner(req.getContext(),
-                  req.getLocalizerId());
+              LOG.info("Created localizer for " + locId);
+              localizer = new LocalizerRunner(req.getContext(), locId);
               privLocalizers.put(locId, localizer);
               localizer.start();
             }
@@ -532,21 +529,21 @@ public class ResourceLocalizationService
           break;
         }
         break;
-      case ABORT_LOCALIZATION:
-        // 0) find running localizer, interrupt and remove
-        synchronized (privLocalizers) {
-          LocalizerRunner localizer = privLocalizers.get(locId);
-          if (null == localizer) {
-            return; // ignore; already gone
-          }
-          privLocalizers.remove(locId);
-          localizer.interrupt();
-        }
-        break;
       }
     }
 
+    public void cleanupPrivLocalizers(String locId) {
+      synchronized (privLocalizers) {
+        LocalizerRunner localizer = privLocalizers.get(locId);
+        if (null == localizer) {
+          return; // ignore; already gone
+        }
+        privLocalizers.remove(locId);
+        localizer.interrupt();
+      }
+    }
   }
+  
 
   private static ExecutorService createLocalizerExecutor(Configuration conf) {
     int nThreads = conf.getInt(

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java Mon Jan 14 03:44:35 2013
@@ -19,6 +19,5 @@ package org.apache.hadoop.yarn.server.no
 
 public enum LocalizerEventType {
   /** See {@link LocalizerResourceRequestEvent} */
-  REQUEST_RESOURCE_LOCALIZATION,
-  ABORT_LOCALIZATION
+  REQUEST_RESOURCE_LOCALIZATION
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Mon Jan 14 03:44:35 2013
@@ -29,6 +29,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -97,7 +98,7 @@ public class TestNodeStatusUpdater {
   public void tearDown() {
     this.registeredNodes.clear();
     heartBeatID = 0;
-    if (nm != null) {
+    if (nm != null && nm.getServiceState() == STATE.STARTED) {
       nm.stop();
     }
     DefaultMetricsSystem.shutdown();
@@ -447,6 +448,52 @@ public class TestNodeStatusUpdater {
   }
   
   @Test
+  public void testStopReentrant() throws Exception {
+    final AtomicInteger numCleanups = new AtomicInteger(0);
+    nm = new NodeManager() {
+      @Override
+      protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+          Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+        MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
+            context, dispatcher, healthChecker, metrics);
+        MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
+        myResourceTracker2.heartBeatNodeAction = NodeAction.SHUTDOWN;
+        myNodeStatusUpdater.resourceTracker = myResourceTracker2;
+        return myNodeStatusUpdater;
+      }
+      
+      @Override
+      protected void cleanupContainers() {
+        super.cleanupContainers();
+        numCleanups.incrementAndGet();
+      }
+    };
+
+    YarnConfiguration conf = createNMConfig();
+    nm.init(conf);
+    nm.start();
+    
+    int waitCount = 0;
+    while (heartBeatID < 1 && waitCount++ != 20) {
+      Thread.sleep(500);
+    }
+    Assert.assertFalse(heartBeatID < 1);
+
+    // Meanwhile call stop directly as the shutdown hook would
+    nm.stop();
+    
+    // NM takes a while to reach the STOPPED state.
+    waitCount = 0;
+    while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
+      LOG.info("Waiting for NM to stop..");
+      Thread.sleep(1000);
+    }
+
+    Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
+    Assert.assertEquals(numCleanups.get(), 1);
+  }
+  
+  @Test
   public void testNodeDecommision() throws Exception {
     nm = getNodeManager(NodeAction.SHUTDOWN);
     YarnConfiguration conf = createNMConfig();

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Mon Jan 14 03:44:35 2013
@@ -574,7 +574,7 @@ public class TestContainer {
       when(ctxt.getUser()).thenReturn(this.user);
       when(ctxt.getContainerId()).thenReturn(cId);
 
-      Resource resource = BuilderUtils.newResource(1024);
+      Resource resource = BuilderUtils.newResource(1024, 1);
       when(ctxt.getResource()).thenReturn(resource);
 
       if (withLocalRes) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Mon Jan 14 03:44:35 2013
@@ -328,6 +328,8 @@ public class TestResourceLocalizationSer
       
       //Send Cleanup Event
       spyService.handle(new ContainerLocalizationCleanupEvent(c, req));
+      verify(mockLocallilzerTracker)
+        .cleanupPrivLocalizers("container_314159265358979_0003_01_000042");
       req2.remove(LocalResourceVisibility.PRIVATE);
       spyService.handle(new ContainerLocalizationCleanupEvent(c, req2));
       dispatcher.await();

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml Mon Jan 14 03:44:35 2013
@@ -19,6 +19,18 @@
   </property>
 
   <property>
+    <name>yarn.scheduler.capacity.resource-calculator</name>
+    <value>org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator</value>
+    <description>
+      The ResourceCalculator implementation to be used to compare 
+      Resources in the scheduler.
+      The default i.e. DefaultResourceCalculator only uses Memory while
+      DominantResourceCalculator uses dominant-resource to compare 
+      multi-dimensional resources such as Memory, CPU etc.
+    </description>
+  </property>
+
+  <property>
     <name>yarn.scheduler.capacity.root.queues</name>
     <value>default</value>
     <description>

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Mon Jan 14 03:44:35 2013
@@ -297,7 +297,7 @@ public class ApplicationMasterService ex
         for(RMNode rmNode: updatedNodes) {
           SchedulerNodeReport schedulerNodeReport =  
               rScheduler.getNodeReport(rmNode.getNodeID());
-          Resource used = BuilderUtils.newResource(0);
+          Resource used = BuilderUtils.newResource(0, 0);
           int numContainers = 0;
           if (schedulerNodeReport != null) {
             used = schedulerNodeReport.getUsedResource();

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Mon Jan 14 03:44:35 2013
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.security.AccessControlException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -39,6 +40,8 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -57,12 +60,15 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -87,6 +93,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
 
 
 /**
@@ -105,7 +112,7 @@ public class ClientRMService extends Abs
   private final RMAppManager rmAppManager;
 
   private Server server;
-  private RMDelegationTokenSecretManager rmDTSecretManager;
+  protected RMDelegationTokenSecretManager rmDTSecretManager;
 
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   InetSocketAddress clientBindAddress;
@@ -122,16 +129,13 @@ public class ClientRMService extends Abs
     this.applicationsACLsManager = applicationACLsManager;
     this.rmDTSecretManager = rmDTSecretManager;
   }
-  
+
   @Override
   public void init(Configuration conf) {
-    clientBindAddress = conf.getSocketAddr(
-        YarnConfiguration.RM_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_PORT);
+    clientBindAddress = getBindAddress(conf);
     super.init(conf);
   }
-  
+
   @Override
   public void start() {
     Configuration conf = getConfig();
@@ -156,6 +160,20 @@ public class ClientRMService extends Abs
     super.start();
   }
 
+  @Override
+  public void stop() {
+    if (this.server != null) {
+        this.server.stop();
+    }
+    super.stop();
+  }
+
+  InetSocketAddress getBindAddress(Configuration conf) {
+    return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_PORT);
+  }
+
   @Private
   public InetSocketAddress getBindAddress() {
     return clientBindAddress;
@@ -423,7 +441,7 @@ public class ClientRMService extends Abs
   private NodeReport createNodeReports(RMNode rmNode) {    
     SchedulerNodeReport schedulerNodeReport = 
         scheduler.getNodeReport(rmNode.getNodeID());
-    Resource used = BuilderUtils.newResource(0);
+    Resource used = BuilderUtils.newResource(0, 0);
     int numContainers = 0;
     if (schedulerNodeReport != null) {
       used = schedulerNodeReport.getUsedResource();
@@ -455,10 +473,7 @@ public class ClientRMService extends Abs
     try {
 
       // Verify that the connection is kerberos authenticated
-      AuthenticationMethod authMethod = UserGroupInformation
-        .getRealAuthenticationMethod(UserGroupInformation.getCurrentUser());
-      if (UserGroupInformation.isSecurityEnabled()
-          && (authMethod != AuthenticationMethod.KERBEROS)) {
+      if (!isAllowedDelegationTokenOp()) {
         throw new IOException(
           "Delegation Token can be issued only with kerberos authentication");
       }
@@ -490,17 +505,76 @@ public class ClientRMService extends Abs
     }
   }
 
+  @Override
+  public RenewDelegationTokenResponse renewDelegationToken(
+      RenewDelegationTokenRequest request) throws YarnRemoteException {
+    try {
+      if (!isAllowedDelegationTokenOp()) {
+        throw new IOException(
+            "Delegation Token can be renewed only with kerberos authentication");
+      }
+      
+      DelegationToken protoToken = request.getDelegationToken();
+      Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>(
+          protoToken.getIdentifier().array(), protoToken.getPassword().array(),
+          new Text(protoToken.getKind()), new Text(protoToken.getService()));
+
+      String user = getRenewerForToken(token);
+      long nextExpTime = rmDTSecretManager.renewToken(token, user);
+      RenewDelegationTokenResponse renewResponse = Records
+          .newRecord(RenewDelegationTokenResponse.class);
+      renewResponse.setNextExpirationTime(nextExpTime);
+      return renewResponse;
+    } catch (IOException e) {
+      throw RPCUtil.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public CancelDelegationTokenResponse cancelDelegationToken(
+      CancelDelegationTokenRequest request) throws YarnRemoteException {
+    try {
+      if (!isAllowedDelegationTokenOp()) {
+        throw new IOException(
+            "Delegation Token can be cancelled only with kerberos authentication");
+      }
+      DelegationToken protoToken = request.getDelegationToken();
+      Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>(
+          protoToken.getIdentifier().array(), protoToken.getPassword().array(),
+          new Text(protoToken.getKind()), new Text(protoToken.getService()));
+
+      String user = getRenewerForToken(token);
+      rmDTSecretManager.cancelToken(token, user);
+      return Records.newRecord(CancelDelegationTokenResponse.class);
+    } catch (IOException e) {
+      throw RPCUtil.getRemoteException(e);
+    }
+  }
+
+  private String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
+      throws IOException {
+    UserGroupInformation user = UserGroupInformation.getCurrentUser();
+    UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+    // we can always renew our own tokens
+    return loginUser.getUserName().equals(user.getUserName())
+        ? token.decodeIdentifier().getRenewer().toString()
+        : user.getShortUserName();
+  }
+
   void refreshServiceAcls(Configuration configuration, 
       PolicyProvider policyProvider) {
     this.server.refreshServiceAcl(configuration, policyProvider);
   }
-  
-  @Override
-  public void stop() {
-    if (this.server != null) {
-        this.server.stop();
+
+  private boolean isAllowedDelegationTokenOp() throws IOException {
+    if (UserGroupInformation.isSecurityEnabled()) {
+      return EnumSet.of(AuthenticationMethod.KERBEROS,
+                        AuthenticationMethod.KERBEROS_SSL,
+                        AuthenticationMethod.CERTIFICATE)
+          .contains(UserGroupInformation.getCurrentUser()
+                  .getRealAuthenticationMethod());
+    } else {
+      return true;
     }
-    super.stop();
   }
-  
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Mon Jan 14 03:44:35 2013
@@ -47,10 +47,10 @@ import org.apache.hadoop.yarn.server.RMD
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -66,8 +66,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Mon Jan 14 03:44:35 2013
@@ -193,7 +193,7 @@ public class ResourceTrackerService exte
 
     LOG.info("NodeManager from node " + host + "(cmPort: " + cmPort
         + " httpPort: " + httpPort + ") " + "registered with capability: "
-        + capability.getMemory() + ", assigned nodeId " + nodeId);
+        + capability + ", assigned nodeId " + nodeId);
 
     regResponse.setNodeAction(NodeAction.NORMAL);
     response.setRegistrationResponse(regResponse);

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java Mon Jan 14 03:44:35 2013
@@ -41,15 +41,34 @@ public class Resources {
     }
 
     @Override
+    public int getVirtualCores() {
+      return 0;
+    }
+
+    @Override
+    public void setVirtualCores(int cores) {
+      throw new RuntimeException("NONE cannot be modified!");
+    }
+
+    @Override
     public int compareTo(Resource o) {
-      return (0 - o.getMemory());
+      int diff = 0 - o.getMemory();
+      if (diff == 0) {
+        diff = 0 - o.getVirtualCores();
+      }
+      return diff;
     }
     
   };
 
   public static Resource createResource(int memory) {
+    return createResource(memory, (memory > 0) ? 1 : 0);
+  }
+
+  public static Resource createResource(int memory, int cores) {
     Resource resource = Records.newRecord(Resource.class);
     resource.setMemory(memory);
+    resource.setVirtualCores(cores);
     return resource;
   }
 
@@ -58,11 +77,12 @@ public class Resources {
   }
 
   public static Resource clone(Resource res) {
-    return createResource(res.getMemory());
+    return createResource(res.getMemory(), res.getVirtualCores());
   }
 
   public static Resource addTo(Resource lhs, Resource rhs) {
     lhs.setMemory(lhs.getMemory() + rhs.getMemory());
+    lhs.setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores());
     return lhs;
   }
 
@@ -72,6 +92,7 @@ public class Resources {
 
   public static Resource subtractFrom(Resource lhs, Resource rhs) {
     lhs.setMemory(lhs.getMemory() - rhs.getMemory());
+    lhs.setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores());
     return lhs;
   }
 
@@ -83,50 +104,107 @@ public class Resources {
     return subtract(NONE, resource);
   }
 
-  public static Resource multiplyTo(Resource lhs, int by) {
-    lhs.setMemory(lhs.getMemory() * by);
+  public static Resource multiplyTo(Resource lhs, double by) {
+    lhs.setMemory((int)(lhs.getMemory() * by));
+    lhs.setVirtualCores((int)(lhs.getVirtualCores() * by));
     return lhs;
   }
 
-  public static Resource multiply(Resource lhs, int by) {
+  public static Resource multiply(Resource lhs, double by) {
     return multiplyTo(clone(lhs), by);
   }
   
-  /**
-   * Mutliply a resource by a {@code double}. Note that integral 
-   * resource quantites are subject to rounding during cast.
-   */
-  public static Resource multiply(Resource lhs, double by) {
+  public static Resource multiplyAndNormalizeUp(
+      ResourceCalculator calculator,Resource lhs, double by, Resource factor) {
+    return calculator.multiplyAndNormalizeUp(lhs, by, factor);
+  }
+  
+  public static Resource multiplyAndNormalizeDown(
+      ResourceCalculator calculator,Resource lhs, double by, Resource factor) {
+    return calculator.multiplyAndNormalizeDown(lhs, by, factor);
+  }
+  
+  public static Resource multiplyAndRoundDown(Resource lhs, double by) {
     Resource out = clone(lhs);
-    out.setMemory((int) (lhs.getMemory() * by));
+    out.setMemory((int)(lhs.getMemory() * by));
+    out.setVirtualCores((int)(lhs.getVirtualCores() * by));
     return out;
   }
-
+  
+  public static Resource normalize(
+      ResourceCalculator calculator, Resource lhs, Resource factor) {
+    return calculator.normalize(lhs, factor);
+  }
+  
+  public static Resource roundUp(
+      ResourceCalculator calculator, Resource lhs, Resource factor) {
+    return calculator.roundUp(lhs, factor);
+  }
+  
+  public static Resource roundDown(
+      ResourceCalculator calculator, Resource lhs, Resource factor) {
+    return calculator.roundDown(lhs, factor);
+  }
+  
+  public static float ratio(
+      ResourceCalculator resourceCalculator, Resource lhs, Resource rhs) {
+    return resourceCalculator.ratio(lhs, rhs);
+  }
+  
+  public static float divide(
+      ResourceCalculator resourceCalculator,
+      Resource clusterResource, Resource lhs, Resource rhs) {
+    return resourceCalculator.divide(clusterResource, lhs, rhs);
+  }
+  
+  public static Resource divideAndCeil(
+      ResourceCalculator resourceCalculator, Resource lhs, int rhs) {
+    return resourceCalculator.divideAndCeil(lhs, rhs);
+  }
+  
   public static boolean equals(Resource lhs, Resource rhs) {
-    return lhs.getMemory() == rhs.getMemory();
+    return lhs.equals(rhs);
   }
 
-  public static boolean lessThan(Resource lhs, Resource rhs) {
-    return lhs.getMemory() < rhs.getMemory();
+  public static boolean lessThan(
+      ResourceCalculator resourceCalculator, 
+      Resource clusterResource,
+      Resource lhs, Resource rhs) {
+    return (resourceCalculator.compare(clusterResource, lhs, rhs) < 0);
   }
 
-  public static boolean lessThanOrEqual(Resource lhs, Resource rhs) {
-    return lhs.getMemory() <= rhs.getMemory();
+  public static boolean lessThanOrEqual(
+      ResourceCalculator resourceCalculator, 
+      Resource clusterResource,
+      Resource lhs, Resource rhs) {
+    return (resourceCalculator.compare(clusterResource, lhs, rhs) <= 0);
   }
 
-  public static boolean greaterThan(Resource lhs, Resource rhs) {
-    return lhs.getMemory() > rhs.getMemory();
+  public static boolean greaterThan(
+      ResourceCalculator resourceCalculator,
+      Resource clusterResource,
+      Resource lhs, Resource rhs) {
+    return resourceCalculator.compare(clusterResource, lhs, rhs) > 0;
   }
 
-  public static boolean greaterThanOrEqual(Resource lhs, Resource rhs) {
-    return lhs.getMemory() >= rhs.getMemory();
+  public static boolean greaterThanOrEqual(
+      ResourceCalculator resourceCalculator, 
+      Resource clusterResource,
+      Resource lhs, Resource rhs) {
+    return resourceCalculator.compare(clusterResource, lhs, rhs) >= 0;
   }
   
-  public static Resource min(Resource lhs, Resource rhs) {
-    return (lhs.getMemory() < rhs.getMemory()) ? lhs : rhs;
+  public static Resource min(
+      ResourceCalculator resourceCalculator, 
+      Resource clusterResource,
+      Resource lhs, Resource rhs) {
+    return resourceCalculator.compare(clusterResource, lhs, rhs) <= 0 ? lhs : rhs;
   }
 
-  public static Resource max(Resource lhs, Resource rhs) {
-    return (lhs.getMemory() > rhs.getMemory()) ? lhs : rhs;
+  public static Resource max(
+      ResourceCalculator resourceCalculator, 
+      Resource clusterResource,
+      Resource lhs, Resource rhs) {
+    return resourceCalculator.compare(clusterResource, lhs, rhs) >= 0 ? lhs : rhs;
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Mon Jan 14 03:44:35 2013
@@ -207,8 +207,8 @@ public class RMAppImpl implements RMApp,
   private static final ApplicationResourceUsageReport
     DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
       BuilderUtils.newApplicationResourceUsageReport(-1, -1,
-          Resources.createResource(-1), Resources.createResource(-1),
-          Resources.createResource(-1));
+          Resources.createResource(-1, -1), Resources.createResource(-1, -1),
+          Resources.createResource(-1, -1));
   private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1;
   
   public RMAppImpl(ApplicationId applicationId, RMContext rmContext,

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Mon Jan 14 03:44:35 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -591,8 +592,9 @@ public class RMAppAttemptImpl implements
     try {
       int numUsedContainers = 0;
       int numReservedContainers = 0;
-      int reservedResources = 0;
-      int currentConsumption = 0;
+      Resource currentConsumption = Resources.createResource(0, 0);
+      Resource reservedResources = Resources.createResource(0, 0);
+      
       SchedulerAppReport schedApp = 
           scheduler.getSchedulerAppInfo(this.getAppAttemptId());
       Collection<RMContainer> liveContainers;
@@ -603,22 +605,21 @@ public class RMAppAttemptImpl implements
         if (liveContainers != null) {
           numUsedContainers = liveContainers.size();
           for (RMContainer lc : liveContainers) {
-            currentConsumption += lc.getContainer().getResource().getMemory();
+            Resources.addTo(currentConsumption, lc.getContainer().getResource());
           }
         }
         if (reservedContainers != null) {
           numReservedContainers = reservedContainers.size();
           for (RMContainer rc : reservedContainers) {
-            reservedResources += rc.getContainer().getResource().getMemory();
+            Resources.addTo(reservedResources, rc.getContainer().getResource());
           }
         }
       }
 
       return BuilderUtils.newApplicationResourceUsageReport(
           numUsedContainers, numReservedContainers,
-          Resources.createResource(currentConsumption),
-          Resources.createResource(reservedResources),
-          Resources.createResource(currentConsumption + reservedResources));
+          currentConsumption, reservedResources,
+          Resources.add(currentConsumption, reservedResources));
     } finally {
       this.readLock.unlock();
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Mon Jan 14 03:44:35 2013
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import static org.apache.hadoop.metrics2.lib.Interns.info;
-import static org.apache.hadoop.yarn.server.resourcemanager.resource.Resources.multiply;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -42,6 +41,7 @@ import org.apache.hadoop.metrics2.lib.Mu
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -325,7 +325,7 @@ public class QueueMetrics implements Met
     allocatedContainers.incr(containers);
     aggregateContainersAllocated.incr(containers);
     allocatedMB.incr(res.getMemory() * containers);
-    _decrPendingResources(containers, multiply(res, containers));
+    _decrPendingResources(containers, Resources.multiply(res, containers));
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
       userMetrics.allocateResources(user, containers, res);

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java Mon Jan 14 03:44:35 2013
@@ -24,10 +24,13 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
 /**
  * Utilities shared by schedulers. 
@@ -81,32 +84,31 @@ public class SchedulerUtils {
   /**
    * Utility method to normalize a list of resource requests, by insuring that
    * the memory for each request is a multiple of minMemory and is not zero.
-   *
-   * @param asks
-   *          a list of resource requests.
-   * @param minMemory
-   *          the configured minimum memory allocation.
    */
-  public static void normalizeRequests(List<ResourceRequest> asks,
-      int minMemory) {
+  public static void normalizeRequests(
+      List<ResourceRequest> asks,
+      ResourceCalculator resourceCalculator, 
+      Resource clusterResource,
+      Resource minimumResource) {
     for (ResourceRequest ask : asks) {
-      normalizeRequest(ask, minMemory);
+      normalizeRequest(
+          ask, resourceCalculator, clusterResource, minimumResource);
     }
   }
 
   /**
    * Utility method to normalize a resource request, by insuring that the
    * requested memory is a multiple of minMemory and is not zero.
-   *
-   * @param ask
-   *          the resource request.
-   * @param minMemory
-   *          the configured minimum memory allocation.
    */
-  public static void normalizeRequest(ResourceRequest ask, int minMemory) {
-    int memory = Math.max(ask.getCapability().getMemory(), minMemory);
-    ask.getCapability().setMemory(
-        minMemory * ((memory / minMemory) + (memory % minMemory > 0 ? 1 : 0)));
+  public static void normalizeRequest(
+      ResourceRequest ask, 
+      ResourceCalculator resourceCalculator, 
+      Resource clusterResource,
+      Resource minimumResource) {
+    Resource normalized = 
+        Resources.normalize(
+            resourceCalculator, ask.getCapability(), minimumResource);
+    ask.setCapability(normalized);
   }
 
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java Mon Jan 14 03:44:35 2013
@@ -20,19 +20,33 @@ package org.apache.hadoop.yarn.server.re
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 
 @Private
 @Unstable
 public class CSAssignment {
   final private Resource resource;
   private NodeType type;
+  private final RMContainer excessReservation;
+  private final FiCaSchedulerApp application;
   
   public CSAssignment(Resource resource, NodeType type) {
     this.resource = resource;
     this.type = type;
+    this.application = null;
+    this.excessReservation = null;
+  }
+  
+  public CSAssignment(FiCaSchedulerApp application, RMContainer excessReservation) {
+    this.resource = excessReservation.getContainer().getResource();
+    this.type = NodeType.NODE_LOCAL;
+    this.application = application;
+    this.excessReservation = excessReservation;
   }
 
+
   public Resource getResource() {
     return resource;
   }
@@ -45,6 +59,14 @@ public class CSAssignment {
     this.type = type;
   }
   
+  public FiCaSchedulerApp getApplication() {
+    return application;
+  }
+
+  public RMContainer getExcessReservation() {
+    return excessReservation;
+  }
+
   @Override
   public String toString() {
     return resource.getMemory() + ":" + type;

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java Mon Jan 14 03:44:35 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.re
 
 import org.apache.hadoop.yarn.Lock;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
 class CSQueueUtils {
@@ -51,15 +52,19 @@ class CSQueueUtils {
     return (parentAbsMaxCapacity * maximumCapacity);
   }
 
-  public static int computeMaxActiveApplications(Resource clusterResource,
-      Resource minimumAllocation, float maxAMResourcePercent, 
-      float absoluteMaxCapacity) {
-    return 
+  public static int computeMaxActiveApplications(
+      ResourceCalculator calculator,
+      Resource clusterResource, Resource minimumAllocation, 
+      float maxAMResourcePercent, float absoluteMaxCapacity) {
+    return
         Math.max(
             (int)Math.ceil(
-                     ((float)clusterResource.getMemory() / 
-                         minimumAllocation.getMemory()) * 
-                     maxAMResourcePercent * absoluteMaxCapacity), 
+                Resources.ratio(
+                    calculator, 
+                    clusterResource, 
+                    minimumAllocation) * 
+                    maxAMResourcePercent * absoluteMaxCapacity
+                ), 
             1);
   }
 
@@ -73,36 +78,43 @@ class CSQueueUtils {
   
   @Lock(CSQueue.class)
   public static void updateQueueStatistics(
+      final ResourceCalculator calculator,
       final CSQueue childQueue, final CSQueue parentQueue, 
       final Resource clusterResource, final Resource minimumAllocation) {
-    final int clusterMemory = clusterResource.getMemory();
-    final int usedMemory = childQueue.getUsedResources().getMemory();
+    Resource queueLimit = Resources.none();
+    Resource usedResources = childQueue.getUsedResources();
     
-    float queueLimit = 0.0f;
     float absoluteUsedCapacity = 0.0f;
     float usedCapacity = 0.0f;
-    if (clusterMemory > 0) {
-      queueLimit = clusterMemory * childQueue.getAbsoluteCapacity();
-      absoluteUsedCapacity = ((float)usedMemory / (float)clusterMemory);
-      usedCapacity = (queueLimit == 0) ? 0 : (usedMemory / queueLimit);
+
+    if (Resources.greaterThan(
+        calculator, clusterResource, clusterResource, Resources.none())) {
+      queueLimit = 
+          Resources.multiply(clusterResource, childQueue.getAbsoluteCapacity());
+      absoluteUsedCapacity = 
+          Resources.divide(calculator, clusterResource, 
+              usedResources, clusterResource);
+      usedCapacity = 
+          Resources.equals(queueLimit, Resources.none()) ? 0 :
+          Resources.divide(calculator, clusterResource, 
+              usedResources, queueLimit);
     }
     
     childQueue.setUsedCapacity(usedCapacity);
     childQueue.setAbsoluteUsedCapacity(absoluteUsedCapacity);
     
-    int available = 
-        Math.max((roundUp(minimumAllocation, (int)queueLimit) - usedMemory), 0); 
+    Resource available = 
+        Resources.roundUp(
+            calculator, 
+            Resources.subtract(queueLimit, usedResources), 
+            minimumAllocation);
     childQueue.getMetrics().setAvailableResourcesToQueue(
-        Resources.createResource(available));
-  }
-
-  public static int roundUp(Resource minimumAllocation, int memory) {
-    int minMemory = minimumAllocation.getMemory();
-    return LeafQueue.divideAndCeil(memory, minMemory) * minMemory; 
-  }
-
-  public static int roundDown(Resource minimumAllocation, int memory) {
-    int minMemory = minimumAllocation.getMemory();
-    return (memory / minMemory) * minMemory;
-  }
+        Resources.max(
+            calculator, 
+            clusterResource, 
+            available, 
+            Resources.none()
+            )
+        );
+   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon Jan 14 03:44:35 2013
@@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@@ -141,6 +142,8 @@ implements ResourceScheduler, CapacitySc
 
   private boolean initialized = false;
 
+  private ResourceCalculator calculator;
+  
   public CapacityScheduler() {}
 
   @Override
@@ -173,6 +176,21 @@ implements ResourceScheduler, CapacitySc
   }
 
   @Override
+  public Comparator<FiCaSchedulerApp> getApplicationComparator() {
+    return applicationComparator;
+  }
+
+  @Override
+  public ResourceCalculator getResourceCalculator() {
+    return calculator;
+  }
+
+  @Override
+  public Comparator<CSQueue> getQueueComparator() {
+    return queueComparator;
+  }
+
+  @Override
   public synchronized int getNumClusterNodes() {
     return numNodeManagers;
   }
@@ -192,11 +210,20 @@ implements ResourceScheduler, CapacitySc
       reinitialize(Configuration conf, RMContext rmContext) throws IOException {
     if (!initialized) {
       this.conf = new CapacitySchedulerConfiguration(conf);
+      
       this.minimumAllocation = this.conf.getMinimumAllocation();
       this.maximumAllocation = this.conf.getMaximumAllocation();
+      this.calculator = this.conf.getResourceCalculator();
+
       this.rmContext = rmContext;
+      
       initializeQueues(this.conf);
+      
       initialized = true;
+      LOG.info("Initialized CapacityScheduler with " +
+          "calculator=" + getResourceCalculator().getClass() + ", " +
+          "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
+          "maximumAllocation=<" + getMaximumResourceCapability() + ">");
     } else {
 
       CapacitySchedulerConfiguration oldConf = this.conf; 
@@ -226,8 +253,8 @@ implements ResourceScheduler, CapacitySc
   private void initializeQueues(CapacitySchedulerConfiguration conf)
     throws IOException {
     root = 
-        parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, 
-            queueComparator, applicationComparator, noop);
+        parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, 
+            queues, queues, noop);
     LOG.info("Initialized root queue " + root);
   }
 
@@ -237,8 +264,8 @@ implements ResourceScheduler, CapacitySc
     // Parse new queues
     Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
     CSQueue newRoot = 
-        parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, 
-            queueComparator, applicationComparator, noop);
+        parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, 
+            newQueues, queues, noop); 
     
     // Ensure all existing queues are still present
     validateExistingQueues(queues, newQueues);
@@ -291,8 +318,6 @@ implements ResourceScheduler, CapacitySc
       CapacitySchedulerConfiguration conf, 
       CSQueue parent, String queueName, Map<String, CSQueue> queues,
       Map<String, CSQueue> oldQueues, 
-      Comparator<CSQueue> queueComparator,
-      Comparator<FiCaSchedulerApp> applicationComparator,
       QueueHook hook) throws IOException {
     CSQueue queue;
     String[] childQueueNames = 
@@ -303,15 +328,14 @@ implements ResourceScheduler, CapacitySc
         throw new IllegalStateException(
             "Queue configuration missing child queue names for " + queueName);
       }
-      queue = new LeafQueue(csContext, queueName, parent, applicationComparator,
-                            oldQueues.get(queueName));
+      queue = 
+          new LeafQueue(csContext, queueName, parent,oldQueues.get(queueName));
       
       // Used only for unit tests
       queue = hook.hook(queue);
     } else {
       ParentQueue parentQueue = 
-        new ParentQueue(csContext, queueName, queueComparator, parent,
-                        oldQueues.get(queueName));
+        new ParentQueue(csContext, queueName, parent,oldQueues.get(queueName));
 
       // Used only for unit tests
       queue = hook.hook(parentQueue);
@@ -320,7 +344,7 @@ implements ResourceScheduler, CapacitySc
       for (String childQueueName : childQueueNames) {
         CSQueue childQueue = 
           parseQueue(csContext, conf, queue, childQueueName, 
-              queues, oldQueues, queueComparator, applicationComparator, hook);
+              queues, oldQueues, hook);
         childQueues.add(childQueue);
       }
       parentQueue.setChildQueues(childQueues);
@@ -442,7 +466,7 @@ implements ResourceScheduler, CapacitySc
   }
 
   private static final Allocation EMPTY_ALLOCATION = 
-      new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
+      new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0, 0));
 
   @Override
   @Lock(Lock.NoLock.class)
@@ -457,7 +481,8 @@ implements ResourceScheduler, CapacitySc
     }
     
     // Sanity check
-    SchedulerUtils.normalizeRequests(ask, minimumAllocation.getMemory());
+    SchedulerUtils.normalizeRequests(
+        ask, calculator, getClusterResources(), minimumAllocation);
 
     // Release containers
     for (ContainerId releasedContainerId : release) {
@@ -579,7 +604,20 @@ implements ResourceScheduler, CapacitySc
           reservedApplication.getApplicationId() + " on node: " + nm);
       
       LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
-      queue.assignContainers(clusterResource, node);
+      CSAssignment assignment = queue.assignContainers(clusterResource, node);
+      
+      RMContainer excessReservation = assignment.getExcessReservation();
+      if (excessReservation != null) {
+      Container container = excessReservation.getContainer();
+      queue.completedContainer(
+          clusterResource, assignment.getApplication(), node, 
+          excessReservation, 
+          SchedulerUtils.createAbnormalContainerStatus(
+              container.getId(), 
+              SchedulerUtils.UNRESERVED_CONTAINER), 
+          RMContainerEventType.RELEASED);
+      }
+
     }
 
     // Try to schedule more if there are no reservations to fulfill

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Mon Jan 14 03:44:35 2013
@@ -26,11 +26,14 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
 public class CapacitySchedulerConfiguration extends Configuration {
@@ -112,6 +115,13 @@ public class CapacitySchedulerConfigurat
       PREFIX +"user-metrics.enable";
   @Private public static final boolean DEFAULT_ENABLE_USER_METRICS = false;
 
+  /** ResourceComparator for scheduling. */
+  @Private public static final String RESOURCE_CALCULATOR_CLASS =
+      PREFIX + "resource-calculator";
+
+  @Private public static final Class<? extends ResourceCalculator> 
+  DEFAULT_RESOURCE_CALCULATOR_CLASS = DefaultResourceCalculator.class;
+  
   @Private
   public static final String ROOT = "root";
 
@@ -289,14 +299,20 @@ public class CapacitySchedulerConfigurat
     int minimumMemory = getInt(
         YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
-    return Resources.createResource(minimumMemory);
+    int minimumCores = getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+    return Resources.createResource(minimumMemory, minimumCores);
   }
 
   public Resource getMaximumAllocation() {
     int maximumMemory = getInt(
         YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
-    return Resources.createResource(maximumMemory);
+    int maximumCores = getInt(
+        YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_CORES,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_CORES);
+    return Resources.createResource(maximumMemory, maximumCores);
   }
 
   public boolean getEnableUserMetrics() {
@@ -307,4 +323,21 @@ public class CapacitySchedulerConfigurat
     int delay = getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
     return (delay == DEFAULT_NODE_LOCALITY_DELAY) ? 0 : delay;
   }
+  
+  public ResourceCalculator getResourceCalculator() {
+    return ReflectionUtils.newInstance(
+        getClass(
+            RESOURCE_CALCULATOR_CLASS, 
+            DEFAULT_RESOURCE_CALCULATOR_CLASS, 
+            ResourceCalculator.class), 
+        this);
+  }
+
+  public void setResourceComparator(
+      Class<? extends ResourceCalculator> resourceCalculatorClass) {
+    setClass(
+        RESOURCE_CALCULATOR_CLASS, 
+        resourceCalculatorClass, 
+        ResourceCalculator.class);
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java Mon Jan 14 03:44:35 2013
@@ -18,10 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import java.util.Comparator;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 
 /**
  * Read-only interface to {@link CapacityScheduler} context.
@@ -45,4 +49,10 @@ public interface CapacitySchedulerContex
    * Get the yarn configuration.
    */
   Configuration getConf();
+
+  Comparator<FiCaSchedulerApp> getApplicationComparator();
+
+  ResourceCalculator getResourceCalculator();
+
+  Comparator<CSQueue> getQueueComparator();
 }



Mime
View raw message