hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject hadoop git commit: YARN-2604. Scheduler should consider max-allocation-* in conjunction with the largest node. (Robert Kanter via kasha)
Date Fri, 21 Nov 2014 18:42:34 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk c298a9a84 -> 3114d4731


YARN-2604. Scheduler should consider max-allocation-* in conjunction with the largest node.
(Robert Kanter via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3114d473
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3114d473
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3114d473

Branch: refs/heads/trunk
Commit: 3114d4731dcca7cb6c16aaa7c7a6550b7dd7dccb
Parents: c298a9a
Author: Karthik Kambatla <kasha@apache.org>
Authored: Fri Nov 21 10:32:28 2014 -0800
Committer: Karthik Kambatla <kasha@apache.org>
Committed: Fri Nov 21 10:32:28 2014 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/AbstractYarnScheduler.java        | 104 ++++++++-
 .../scheduler/capacity/CapacityScheduler.java   |  15 +-
 .../scheduler/fair/FairScheduler.java           |  10 +-
 .../scheduler/fifo/FifoScheduler.java           |  16 +-
 .../resourcemanager/TestFifoScheduler.java      |   1 +
 .../scheduler/TestAbstractYarnScheduler.java    | 213 +++++++++++++++++++
 .../capacity/TestContainerAllocation.java       |   4 +-
 .../scheduler/fair/TestFairScheduler.java       |  31 +--
 9 files changed, 348 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3114d473/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 6d86d75..5c68254 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -84,6 +84,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2375. Allow enabling/disabling timeline server per framework.
     (Mit Desai via jeagles)
 
+    YARN-2604. Scheduler should consider max-allocation-* in conjunction
+    with the largest node. (Robert Kanter via kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3114d473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 8e8d627..bf720ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -77,7 +78,14 @@ public abstract class AbstractYarnScheduler
   protected Resource clusterResource = Resource.newInstance(0, 0);
 
   protected Resource minimumAllocation;
-  protected Resource maximumAllocation;
+  private Resource maximumAllocation;
+  private Resource configuredMaximumAllocation;
+  private int maxNodeMemory = -1;
+  private int maxNodeVCores = -1;
+  private ReentrantReadWriteLock maximumAllocationLock =
+      new ReentrantReadWriteLock();
+  private boolean useConfiguredMaximumAllocationOnly = true;
+  private long configuredMaximumAllocationWaitTime;
 
   protected RMContext rmContext;
   protected Map<ApplicationId, SchedulerApplication<T>> applications;
@@ -102,6 +110,9 @@ public abstract class AbstractYarnScheduler
     nmExpireInterval =
         conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
           YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+    configuredMaximumAllocationWaitTime =
+        conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+          YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
     createReleaseCache();
     super.serviceInit(conf);
   }
@@ -145,7 +156,37 @@ public abstract class AbstractYarnScheduler
 
   @Override
   public Resource getMaximumResourceCapability() {
-    return maximumAllocation;
+    Resource maxResource;
+    ReentrantReadWriteLock.ReadLock readLock = maximumAllocationLock.readLock();
+    readLock.lock();
+    try {
+      if (useConfiguredMaximumAllocationOnly) {
+        if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
+            > configuredMaximumAllocationWaitTime) {
+          useConfiguredMaximumAllocationOnly = false;
+        }
+        maxResource = Resources.clone(configuredMaximumAllocation);
+      } else {
+        maxResource = Resources.clone(maximumAllocation);
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return maxResource;
+  }
+
+  protected void initMaximumResourceCapability(Resource maximumAllocation) {
+    ReentrantReadWriteLock.WriteLock writeLock =
+        maximumAllocationLock.writeLock();
+    writeLock.lock();
+    try {
+      if (this.configuredMaximumAllocation == null) {
+        this.configuredMaximumAllocation = Resources.clone(maximumAllocation);
+        this.maximumAllocation = Resources.clone(maximumAllocation);
+      }
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   protected void containerLaunchedOnNode(ContainerId containerId,
@@ -528,4 +569,63 @@ public abstract class AbstractYarnScheduler
     throw new YarnException(getClass().getSimpleName()
         + " does not support reservations");
   }
+
+  protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
+    ReentrantReadWriteLock.WriteLock writeLock =
+        maximumAllocationLock.writeLock();
+    writeLock.lock();
+    try {
+      if (add) { // added node
+        int nodeMemory = node.getAvailableResource().getMemory();
+        if (nodeMemory > maxNodeMemory) {
+          maxNodeMemory = nodeMemory;
+          maximumAllocation.setMemory(Math.min(
+              configuredMaximumAllocation.getMemory(), maxNodeMemory));
+        }
+        int nodeVCores = node.getAvailableResource().getVirtualCores();
+        if (nodeVCores > maxNodeVCores) {
+          maxNodeVCores = nodeVCores;
+          maximumAllocation.setVirtualCores(Math.min(
+              configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
+        }
+      } else {  // removed node
+        if (maxNodeMemory == node.getAvailableResource().getMemory()) {
+          maxNodeMemory = -1;
+        }
+        if (maxNodeVCores == node.getAvailableResource().getVirtualCores()) {
+          maxNodeVCores = -1;
+        }
+        // We only have to iterate through the nodes if the current max memory
+        // or vcores was equal to the removed node's
+        if (maxNodeMemory == -1 || maxNodeVCores == -1) {
+          for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
+            int nodeMemory =
+                nodeEntry.getValue().getAvailableResource().getMemory();
+            if (nodeMemory > maxNodeMemory) {
+              maxNodeMemory = nodeMemory;
+            }
+            int nodeVCores =
+                nodeEntry.getValue().getAvailableResource().getVirtualCores();
+            if (nodeVCores > maxNodeVCores) {
+              maxNodeVCores = nodeVCores;
+            }
+          }
+          if (maxNodeMemory == -1) {  // no nodes
+            maximumAllocation.setMemory(configuredMaximumAllocation.getMemory());
+          } else {
+            maximumAllocation.setMemory(
+                Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory));
+          }
+          if (maxNodeVCores == -1) {  // no nodes
+            maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores());
+          } else {
+            maximumAllocation.setVirtualCores(
+                Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
+          }
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3114d473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index c383e43..28158c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.HashSet;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -284,7 +283,7 @@ public class CapacityScheduler extends
     this.conf = loadCapacitySchedulerConfiguration(configuration);
     validateConf(this.conf);
     this.minimumAllocation = this.conf.getMinimumAllocation();
-    this.maximumAllocation = this.conf.getMaximumAllocation();
+    initMaximumResourceCapability(this.conf.getMaximumAllocation());
     this.calculator = this.conf.getResourceCalculator();
     this.usePortForNodeName = this.conf.getUsePortForNodeName();
     this.applications =
@@ -321,8 +320,8 @@ public class CapacityScheduler extends
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     Configuration configuration = new Configuration(conf);
-    initScheduler(configuration);
     super.serviceInit(conf);
+    initScheduler(configuration);
   }
 
   @Override
@@ -849,7 +848,7 @@ public class CapacityScheduler extends
     // Sanity check
     SchedulerUtils.normalizeRequests(
         ask, getResourceCalculator(), getClusterResource(),
-        getMinimumResourceCapability(), maximumAllocation);
+        getMinimumResourceCapability(), getMaximumResourceCapability());
 
     // Release containers
     releaseContainers(release, application);
@@ -1123,12 +1122,13 @@ public class CapacityScheduler extends
       labelManager.activateNode(nodeManager.getNodeID(),
           nodeManager.getTotalCapability());
     }
-    
-    this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
-        usePortForNodeName));
+    FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
+        usePortForNodeName);
+    this.nodes.put(nodeManager.getNodeID(), schedulerNode);
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     root.updateClusterResource(clusterResource);
     int numNodes = numNodeManagers.incrementAndGet();
+    updateMaximumAllocation(schedulerNode, true);
     
     LOG.info("Added node " + nodeManager.getNodeAddress() + 
         " clusterResource: " + clusterResource);
@@ -1177,6 +1177,7 @@ public class CapacityScheduler extends
     }
 
     this.nodes.remove(nodeInfo.getNodeID());
+    updateMaximumAllocation(node, false);
 
     LOG.info("Removed node " + nodeInfo.getNodeAddress() + 
         " clusterResource: " + clusterResource);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3114d473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 94fb849..55e4549 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -817,9 +817,11 @@ public class FairScheduler extends
   }
 
   private synchronized void addNode(RMNode node) {
-    nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
+    FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
+    nodes.put(node.getNodeID(), schedulerNode);
     Resources.addTo(clusterResource, node.getTotalCapability());
     updateRootQueueMetrics();
+    updateMaximumAllocation(schedulerNode, true);
 
     queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
     queueMgr.getRootQueue().recomputeSteadyShares();
@@ -859,6 +861,7 @@ public class FairScheduler extends
     nodes.remove(rmNode.getNodeID());
     queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
     queueMgr.getRootQueue().recomputeSteadyShares();
+    updateMaximumAllocation(node, false);
     LOG.info("Removed node " + rmNode.getNodeAddress() +
         " cluster capacity: " + clusterResource);
   }
@@ -877,7 +880,8 @@ public class FairScheduler extends
 
     // Sanity check
     SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
-        clusterResource, minimumAllocation, maximumAllocation, incrAllocation);
+        clusterResource, minimumAllocation, getMaximumResourceCapability(),
+        incrAllocation);
 
     // Set amResource for this app
     if (!application.getUnmanagedAM() && ask.size() == 1
@@ -1225,7 +1229,7 @@ public class FairScheduler extends
       this.conf = new FairSchedulerConfiguration(conf);
       validateConf(this.conf);
       minimumAllocation = this.conf.getMinimumAllocation();
-      maximumAllocation = this.conf.getMaximumAllocation();
+      initMaximumResourceCapability(this.conf.getMaximumAllocation());
       incrAllocation = this.conf.getIncrementAllocation();
       continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
       continuousSchedulingSleepMs =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3114d473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 532edc7..3d4c9dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -215,10 +215,13 @@ public class FifoScheduler extends
         Resources.createResource(conf.getInt(
             YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
             YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
-    this.maximumAllocation =
+    initMaximumResourceCapability(
         Resources.createResource(conf.getInt(
             YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
+          conf.getInt(
+            YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)));
     this.usePortForNodeName = conf.getBoolean(
         YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
@@ -303,7 +306,7 @@ public class FifoScheduler extends
 
     // Sanity check
     SchedulerUtils.normalizeRequests(ask, resourceCalculator, 
-        clusterResource, minimumAllocation, maximumAllocation);
+        clusterResource, minimumAllocation, getMaximumResourceCapability());
 
     // Release containers
     releaseContainers(release, application);
@@ -899,6 +902,7 @@ public class FifoScheduler extends
     
     //Remove the node
     this.nodes.remove(nodeInfo.getNodeID());
+    updateMaximumAllocation(node, false);
     
     // Update cluster metrics
     Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
@@ -916,9 +920,11 @@ public class FifoScheduler extends
   }
 
   private synchronized void addNode(RMNode nodeManager) {
-    this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
-        usePortForNodeName));
+    FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
+        usePortForNodeName);
+    this.nodes.put(nodeManager.getNodeID(), schedulerNode);
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+    updateMaximumAllocation(schedulerNode, true);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3114d473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
index 12f7498..225e225 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceReque
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3114d473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
new file mode 100644
index 0000000..67bdae2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
+
+  public TestAbstractYarnScheduler(SchedulerType type) {
+    super(type);
+  }
+
+  @Test
+  public void testMaximimumAllocationMemory() throws Exception {
+    final int node1MaxMemory = 15 * 1024;
+    final int node2MaxMemory = 5 * 1024;
+    final int node3MaxMemory = 6 * 1024;
+    final int configuredMaxMemory = 10 * 1024;
+    configureScheduler();
+    YarnConfiguration conf = getConf();
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        configuredMaxMemory);
+    conf.setLong(
+        YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+        1000 * 1000);
+    MockRM rm = new MockRM(conf);
+    try {
+      rm.start();
+      testMaximumAllocationMemoryHelper(
+          (AbstractYarnScheduler) rm.getResourceScheduler(),
+          node1MaxMemory, node2MaxMemory, node3MaxMemory,
+          configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
+          configuredMaxMemory, configuredMaxMemory, configuredMaxMemory);
+    } finally {
+      rm.stop();
+    }
+
+    conf.setLong(
+        YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+        0);
+    rm = new MockRM(conf);
+    try {
+      rm.start();
+      testMaximumAllocationMemoryHelper(
+          (AbstractYarnScheduler) rm.getResourceScheduler(),
+          node1MaxMemory, node2MaxMemory, node3MaxMemory,
+          configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
+          node2MaxMemory, node3MaxMemory, node2MaxMemory);
+    } finally {
+      rm.stop();
+    }
+  }
+
+  private void testMaximumAllocationMemoryHelper(
+       AbstractYarnScheduler scheduler,
+       final int node1MaxMemory, final int node2MaxMemory,
+       final int node3MaxMemory, final int... expectedMaxMemory)
+       throws Exception {
+    Assert.assertEquals(6, expectedMaxMemory.length);
+
+    Assert.assertEquals(0, scheduler.getNumClusterNodes());
+    int maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+    Assert.assertEquals(expectedMaxMemory[0], maxMemory);
+
+    RMNode node1 = MockNodes.newNodeInfo(
+        0, Resources.createResource(node1MaxMemory), 1, "127.0.0.2");
+    scheduler.handle(new NodeAddedSchedulerEvent(node1));
+    Assert.assertEquals(1, scheduler.getNumClusterNodes());
+    maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+    Assert.assertEquals(expectedMaxMemory[1], maxMemory);
+
+    scheduler.handle(new NodeRemovedSchedulerEvent(node1));
+    Assert.assertEquals(0, scheduler.getNumClusterNodes());
+    maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+    Assert.assertEquals(expectedMaxMemory[2], maxMemory);
+
+    RMNode node2 = MockNodes.newNodeInfo(
+        0, Resources.createResource(node2MaxMemory), 2, "127.0.0.3");
+    scheduler.handle(new NodeAddedSchedulerEvent(node2));
+    Assert.assertEquals(1, scheduler.getNumClusterNodes());
+    maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+    Assert.assertEquals(expectedMaxMemory[3], maxMemory);
+
+    RMNode node3 = MockNodes.newNodeInfo(
+        0, Resources.createResource(node3MaxMemory), 3, "127.0.0.4");
+    scheduler.handle(new NodeAddedSchedulerEvent(node3));
+    Assert.assertEquals(2, scheduler.getNumClusterNodes());
+    maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+    Assert.assertEquals(expectedMaxMemory[4], maxMemory);
+
+    scheduler.handle(new NodeRemovedSchedulerEvent(node3));
+    Assert.assertEquals(1, scheduler.getNumClusterNodes());
+    maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+    Assert.assertEquals(expectedMaxMemory[5], maxMemory);
+
+    scheduler.handle(new NodeRemovedSchedulerEvent(node2));
+    Assert.assertEquals(0, scheduler.getNumClusterNodes());
+  }
+
+  @Test
+  public void testMaximimumAllocationVCores() throws Exception {
+    final int node1MaxVCores = 15;
+    final int node2MaxVCores = 5;
+    final int node3MaxVCores = 6;
+    final int configuredMaxVCores = 10;
+    configureScheduler();
+    YarnConfiguration conf = getConf();
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        configuredMaxVCores);
+    conf.setLong(
+        YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+        1000 * 1000);
+    MockRM rm = new MockRM(conf);
+    try {
+      rm.start();
+      testMaximumAllocationVCoresHelper(
+          (AbstractYarnScheduler) rm.getResourceScheduler(),
+          node1MaxVCores, node2MaxVCores, node3MaxVCores,
+          configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
+          configuredMaxVCores, configuredMaxVCores, configuredMaxVCores);
+    } finally {
+      rm.stop();
+    }
+
+    conf.setLong(
+        YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+        0);
+    rm = new MockRM(conf);
+    try {
+      rm.start();
+      testMaximumAllocationVCoresHelper(
+          (AbstractYarnScheduler) rm.getResourceScheduler(),
+          node1MaxVCores, node2MaxVCores, node3MaxVCores,
+          configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
+          node2MaxVCores, node3MaxVCores, node2MaxVCores);
+    } finally {
+      rm.stop();
+    }
+  }
+
+  private void testMaximumAllocationVCoresHelper(
+      AbstractYarnScheduler scheduler,
+      final int node1MaxVCores, final int node2MaxVCores,
+      final int node3MaxVCores, final int... expectedMaxVCores)
+      throws Exception {
+    Assert.assertEquals(6, expectedMaxVCores.length);
+
+    Assert.assertEquals(0, scheduler.getNumClusterNodes());
+    int maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
+    Assert.assertEquals(expectedMaxVCores[0], maxVCores);
+
+    RMNode node1 = MockNodes.newNodeInfo(
+        0, Resources.createResource(1024, node1MaxVCores), 1, "127.0.0.2");
+    scheduler.handle(new NodeAddedSchedulerEvent(node1));
+    Assert.assertEquals(1, scheduler.getNumClusterNodes());
+    maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
+    Assert.assertEquals(expectedMaxVCores[1], maxVCores);
+
+    scheduler.handle(new NodeRemovedSchedulerEvent(node1));
+    Assert.assertEquals(0, scheduler.getNumClusterNodes());
+    maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
+    Assert.assertEquals(expectedMaxVCores[2], maxVCores);
+
+    RMNode node2 = MockNodes.newNodeInfo(
+        0, Resources.createResource(1024, node2MaxVCores), 2, "127.0.0.3");
+    scheduler.handle(new NodeAddedSchedulerEvent(node2));
+    Assert.assertEquals(1, scheduler.getNumClusterNodes());
+    maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
+    Assert.assertEquals(expectedMaxVCores[3], maxVCores);
+
+    RMNode node3 = MockNodes.newNodeInfo(
+        0, Resources.createResource(1024, node3MaxVCores), 3, "127.0.0.4");
+    scheduler.handle(new NodeAddedSchedulerEvent(node3));
+    Assert.assertEquals(2, scheduler.getNumClusterNodes());
+    maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
+    Assert.assertEquals(expectedMaxVCores[4], maxVCores);
+
+    scheduler.handle(new NodeRemovedSchedulerEvent(node3));
+    Assert.assertEquals(1, scheduler.getNumClusterNodes());
+    maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
+    Assert.assertEquals(expectedMaxVCores[5], maxVCores);
+
+    scheduler.handle(new NodeRemovedSchedulerEvent(node2));
+    Assert.assertEquals(0, scheduler.getNumClusterNodes());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3114d473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index ad834ac..9a29bff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -116,7 +116,7 @@ public class TestContainerAllocation {
     am1.registerAppAttempt();
 
     LOG.info("sending container requests ");
-    am1.addRequests(new String[] {"*"}, 3 * GB, 1, 1);
+    am1.addRequests(new String[] {"*"}, 2 * GB, 1, 1);
     AllocateResponse alloc1Response = am1.schedule(); // send the request
 
     // kick the scheduler
@@ -291,7 +291,7 @@ public class TestContainerAllocation {
   // This is to test fetching AM container will be retried, if AM container is
   // not fetchable since DNS is unavailable causing container token/NMtoken
   // creation failure.
-  @Test(timeout = 20000)
+  @Test(timeout = 30000)
   public void testAMContainerAllocationWhenDNSUnavailable() throws Exception {
     MockRM rm1 = new MockRM(conf) {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3114d473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index b5169bf..8344040 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
@@ -2590,37 +2591,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     }
     assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus());
   }
-  
-  @Test
-  public void testReservationThatDoesntFit() throws IOException {
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
-    RMNode node1 =
-        MockNodes
-            .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-    
-    ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1",
-        "user1", 1);
-    scheduler.update();
-    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
-    scheduler.handle(updateEvent);
-    
-    FSAppAttempt app = scheduler.getSchedulerApp(attId);
-    assertEquals(0, app.getLiveContainers().size());
-    assertEquals(0, app.getReservedContainers().size());
-    
-    createSchedulingRequestExistingApplication(1024, 2, attId);
-    scheduler.update();
-    scheduler.handle(updateEvent);
-    
-    assertEquals(1, app.getLiveContainers().size());
-    assertEquals(0, app.getReservedContainers().size());
-  }
-  
   @Test
   public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException {
     scheduler.init(conf);


Mime
View raw message