hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [08/50] [abbrv] hadoop git commit: YARN-4287. Capacity Scheduler: Rack Locality improvement (Nathan Roberts via wangda)
Date Sat, 21 Nov 2015 07:56:12 GMT
YARN-4287. Capacity Scheduler: Rack Locality improvement (Nathan Roberts via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/796638d9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/796638d9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/796638d9

Branch: refs/heads/HDFS-7240
Commit: 796638d9bc86235b9f3e5d1a3a9a25bbf5c04d1c
Parents: 0ca8df7
Author: Wangda Tan <wangda@apache.org>
Authored: Thu Nov 12 11:09:37 2015 -0800
Committer: Wangda Tan <wangda@apache.org>
Committed: Thu Nov 12 11:09:37 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  2 +
 .../scheduler/AppSchedulingInfo.java            |  4 +-
 .../resourcemanager/scheduler/QueueMetrics.java | 37 +++++++++++
 .../CapacitySchedulerConfiguration.java         | 14 +++-
 .../scheduler/capacity/LeafQueue.java           |  7 ++
 .../allocator/AbstractContainerAllocator.java   |  2 +-
 .../allocator/RegularContainerAllocator.java    | 17 +++--
 .../scheduler/TestQueueMetrics.java             | 55 ++++++++++++++++
 .../scheduler/capacity/TestLeafQueue.java       | 67 ++++++++++++++++++--
 9 files changed, 191 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/796638d9/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 33b4c24..52c6669 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -554,6 +554,8 @@ Release 2.8.0 - UNRELEASED
 
     YARN-4310. FairScheduler: Log skipping reservation messages at DEBUG level (asuresh)
 
+    YARN-4287. Capacity Scheduler: Rack Locality improvement (Nathan Roberts via wangda)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/796638d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 3156086..c5f8cd1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -595,9 +595,11 @@ public class AppSchedulingInfo {
           + " container=" + container.getId()
           + " host=" + container.getNodeId().toString()
           + " user=" + user
-          + " resource=" + request.getCapability());
+          + " resource=" + request.getCapability()
+          + " type=" + type);
     }
     metrics.allocateResources(user, 1, request.getCapability(), true);
+    metrics.incrNodeTypeAggregations(user, type);
     return resourceRequests;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/796638d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
index d94b621..68ae364 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
@@ -64,6 +64,12 @@ public class QueueMetrics implements MetricsSource {
   @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
   @Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
   @Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated;
+  @Metric("Aggregate # of allocated node-local containers")
+    MutableCounterLong aggregateNodeLocalContainersAllocated;
+  @Metric("Aggregate # of allocated rack-local containers")
+    MutableCounterLong aggregateRackLocalContainersAllocated;
+  @Metric("Aggregate # of allocated off-switch containers")
+    MutableCounterLong aggregateOffSwitchContainersAllocated;
   @Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased;
   @Metric("Available memory in MB") MutableGaugeInt availableMB;
   @Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores;
@@ -379,6 +385,25 @@ public class QueueMetrics implements MetricsSource {
     pendingVCores.decr(res.getVirtualCores() * Math.max(containers, 1));
   }
 
+  public void incrNodeTypeAggregations(String user, NodeType type) {
+    if (type == NodeType.NODE_LOCAL) {
+      aggregateNodeLocalContainersAllocated.incr();
+    } else if (type == NodeType.RACK_LOCAL) {
+      aggregateRackLocalContainersAllocated.incr();
+    } else if (type == NodeType.OFF_SWITCH) {
+      aggregateOffSwitchContainersAllocated.incr();
+    } else {
+      return;
+    }
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.incrNodeTypeAggregations(user, type);
+    }
+    if (parent != null) {
+      parent.incrNodeTypeAggregations(user, type);
+    }
+  }
+
   public void allocateResources(String user, int containers, Resource res,
       boolean decrPending) {
     // if #containers = 0, means change container resource
@@ -562,6 +587,18 @@ public class QueueMetrics implements MetricsSource {
     return aggregateContainersAllocated.value();
   }
 
+  public long getAggregateNodeLocalContainersAllocated() {
+    return aggregateNodeLocalContainersAllocated.value();
+  }
+
+  public long getAggregateRackLocalContainersAllocated() {
+    return aggregateRackLocalContainersAllocated.value();
+  }
+
+  public long getAggregateOffSwitchContainersAllocated() {
+    return aggregateOffSwitchContainersAllocated.value();
+  }
+
   public long getAggegatedReleasedContainers() {
     return aggregateContainersReleased.value();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/796638d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index a99122d..1e62b44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -185,6 +185,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   public static final int DEFAULT_NODE_LOCALITY_DELAY = 40;
 
   @Private
+  public static final String RACK_LOCALITY_FULL_RESET =
+      PREFIX + "rack-locality-full-reset";
+
+  @Private
+  public static final boolean DEFAULT_RACK_LOCALITY_FULL_RESET = true;
+
+  @Private
   public static final String SCHEDULE_ASYNCHRONOUSLY_PREFIX =
       PREFIX + "schedule-asynchronously";
 
@@ -664,7 +671,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   public int getNodeLocalityDelay() {
     return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
   }
-  
+
+  public boolean getRackLocalityFullReset() {
+    return getBoolean(RACK_LOCALITY_FULL_RESET,
+        DEFAULT_RACK_LOCALITY_FULL_RESET);
+  }
+
   public ResourceCalculator getResourceCalculator() {
     return ReflectionUtils.newInstance(
         getClass(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/796638d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index d92a821..ef2bf7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -86,6 +86,7 @@ public class LeafQueue extends AbstractCSQueue {
   private float maxAMResourcePerQueuePercent;
 
   private volatile int nodeLocalityDelay;
+  private volatile boolean rackLocalityFullReset;
 
   Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
       new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
@@ -190,6 +191,7 @@ public class LeafQueue extends AbstractCSQueue {
     }
     
     nodeLocalityDelay = conf.getNodeLocalityDelay();
+    rackLocalityFullReset = conf.getRackLocalityFullReset();
 
     // re-init this since max allocation could have changed
     this.minimumAllocationFactor =
@@ -1003,6 +1005,11 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   @Lock(NoLock.class)
+  public boolean getRackLocalityFullReset() {
+    return rackLocalityFullReset;
+  }
+
+  @Lock(NoLock.class)
   private Resource computeUserLimit(FiCaSchedulerApp application,
       Resource clusterResource, User user,
       String nodePartition, SchedulingMode schedulingMode) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/796638d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
index b986b1f..ee01bd1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
@@ -89,7 +89,7 @@ public abstract class AbstractContainerAllocator {
         LOG.info("assignedContainer" + " application attempt="
             + application.getApplicationAttemptId() + " container="
             + updatedContainer.getId() + " queue=" + this + " clusterResource="
-            + clusterResource);
+            + clusterResource + " type=" + assignment.getType());
 
         application
             .getCSLeafQueue()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/796638d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index fd99d29..6bd4d17 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -246,8 +246,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator
{
       float localityWaitFactor =
           getLocalityWaitFactor(priority, rmContext.getScheduler()
               .getNumClusterNodes());
-
-      return ((requiredContainers * localityWaitFactor) < missedOpportunities);
+      // Cap the delay by the number of nodes in the cluster. Under most conditions
+      // this means we will consider each node in the cluster before
+      // accepting an off-switch assignment.
+      return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
+          (requiredContainers * localityWaitFactor)) < missedOpportunities);
     }
 
     // Check if we need containers on this rack
@@ -643,9 +646,15 @@ public class RegularContainerAllocator extends AbstractContainerAllocator
{
         if (LOG.isDebugEnabled()) {
           LOG.debug("Resetting scheduling opportunities");
         }
-        application.resetSchedulingOpportunities(priority);
+        // Only reset scheduling opportunities for RACK_LOCAL if configured
+        // to do so. Not resetting means we will continue to schedule
+        // RACK_LOCAL without delay.
+        if (allocationResult.containerNodeType == NodeType.NODE_LOCAL
+            || application.getCSLeafQueue().getRackLocalityFullReset()) {
+          application.resetSchedulingOpportunities(priority);
+        }
       }
-      
+
       // Non-exclusive scheduling opportunity is different: we need reset
       // it every time to make sure non-labeled resource request will be
       // most likely allocated on non-labeled nodes first.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/796638d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
index 8ad71d2..864620f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
@@ -198,6 +198,53 @@ public class TestQueueMetrics {
     checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
   }
 
+
+  @Test public void testNodeTypeMetrics() {
+    String parentQueueName = "root";
+    String leafQueueName = "root.leaf";
+    String user = "alice";
+
+    QueueMetrics parentMetrics =
+      QueueMetrics.forQueue(ms, parentQueueName, null, true, conf);
+    Queue parentQueue = make(stub(Queue.class).returning(parentMetrics).
+        from.getMetrics());
+    QueueMetrics metrics =
+      QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, conf);
+    MetricsSource parentQueueSource = queueSource(ms, parentQueueName);
+    MetricsSource queueSource = queueSource(ms, leafQueueName);
+    //AppSchedulingInfo app = mockApp(user);
+
+    metrics.submitApp(user);
+    MetricsSource userSource = userSource(ms, leafQueueName, user);
+    MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
+
+    metrics.incrNodeTypeAggregations(user, NodeType.NODE_LOCAL);
+    checkAggregatedNodeTypes(queueSource,1L,0L,0L);
+    checkAggregatedNodeTypes(parentQueueSource,1L,0L,0L);
+    checkAggregatedNodeTypes(userSource,1L,0L,0L);
+    checkAggregatedNodeTypes(parentUserSource,1L,0L,0L);
+
+    metrics.incrNodeTypeAggregations(user, NodeType.RACK_LOCAL);
+    checkAggregatedNodeTypes(queueSource,1L,1L,0L);
+    checkAggregatedNodeTypes(parentQueueSource,1L,1L,0L);
+    checkAggregatedNodeTypes(userSource,1L,1L,0L);
+    checkAggregatedNodeTypes(parentUserSource,1L,1L,0L);
+
+    metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
+    checkAggregatedNodeTypes(queueSource,1L,1L,1L);
+    checkAggregatedNodeTypes(parentQueueSource,1L,1L,1L);
+    checkAggregatedNodeTypes(userSource,1L,1L,1L);
+    checkAggregatedNodeTypes(parentUserSource,1L,1L,1L);
+
+    metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
+    checkAggregatedNodeTypes(queueSource,1L,1L,2L);
+    checkAggregatedNodeTypes(parentQueueSource,1L,1L,2L);
+    checkAggregatedNodeTypes(userSource,1L,1L,2L);
+    checkAggregatedNodeTypes(parentUserSource,1L,1L,2L);
+
+  }
+
+
   @Test public void testTwoLevelWithUserMetrics() {
     String parentQueueName = "root";
     String leafQueueName = "root.leaf";
@@ -367,6 +414,14 @@ public class TestQueueMetrics {
     assertGauge("ReservedContainers", reservedCtnrs, rb);
   }
 
+  public static void checkAggregatedNodeTypes(MetricsSource source,
+      long nodeLocal, long rackLocal, long offSwitch) {
+    MetricsRecordBuilder rb = getMetrics(source);
+    assertCounter("AggregateNodeLocalContainersAllocated", nodeLocal, rb);
+    assertCounter("AggregateRackLocalContainersAllocated", rackLocal, rb);
+    assertCounter("AggregateOffSwitchContainersAllocated", offSwitch, rb);
+  }
+
   private static AppSchedulingInfo mockApp(String user) {
     AppSchedulingInfo app = mock(AppSchedulingInfo.class);
     when(app.getUser()).thenReturn(user);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/796638d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index b85c697..d3365de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -1506,8 +1506,8 @@ public class TestLeafQueue {
   @Test
   public void testLocalityScheduling() throws Exception {
 
-    // Manipulate queue 'a'
-    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+    // Manipulate queue 'b'
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(B));
 
     // User
     String user_0 = "user_0";
@@ -1614,34 +1614,87 @@ public class TestLeafQueue {
         TestUtils.createResourceRequest(host_1, 1*GB, 1, 
             true, priority, recordFactory));
     app_0_requests_0.add(
-        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
+        TestUtils.createResourceRequest(rack_1, 1*GB, 3, 
             true, priority, recordFactory));
     app_0_requests_0.add(
-        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, // one extra
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 4, // one extra
             true, priority, recordFactory));
     app_0.updateResourceRequests(app_0_requests_0);
-    assertEquals(2, app_0.getTotalRequiredResources(priority));
+    assertEquals(4, app_0.getTotalRequiredResources(priority));
     
     String host_3 = "127.0.0.4"; // on rack_1
     FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
     
     // Rack-delay
+    doReturn(true).when(a).getRackLocalityFullReset();
     doReturn(1).when(a).getNodeLocalityDelay();
     
     // Shouldn't assign RACK_LOCAL yet
     assignment = a.assignContainers(clusterResource, node_3,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(1, app_0.getSchedulingOpportunities(priority));
-    assertEquals(2, app_0.getTotalRequiredResources(priority));
+    assertEquals(4, app_0.getTotalRequiredResources(priority));
 
     // Should assign RACK_LOCAL now
     assignment = a.assignContainers(clusterResource, node_3,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
+    assertEquals(3, app_0.getTotalRequiredResources(priority));
+    
+    // Shouldn't assign RACK_LOCAL because schedulingOpportunities should have gotten reset.
+    assignment = a.assignContainers(clusterResource, node_3,
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    assertEquals(1, app_0.getSchedulingOpportunities(priority));
+    assertEquals(3, app_0.getTotalRequiredResources(priority));
+
+    // Next time we schedule RACK_LOCAL, don't reset
+    doReturn(false).when(a).getRackLocalityFullReset();
+
+    // Should assign RACK_LOCAL now
+    assignment = a.assignContainers(clusterResource, node_3,
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
+    assertEquals(2, app_0.getSchedulingOpportunities(priority)); // should NOT reset
+    assertEquals(2, app_0.getTotalRequiredResources(priority));
+
+    // Another RACK_LOCAL since schedulingOpportunities not reset
+    assignment = a.assignContainers(clusterResource, node_3,
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
+    assertEquals(3, app_0.getSchedulingOpportunities(priority)); // should NOT reset
     assertEquals(1, app_0.getTotalRequiredResources(priority));
+    
+    // Add a request larger than cluster size to verify
+    // OFF_SWITCH delay is capped by cluster size
+    app_0.resetSchedulingOpportunities(priority);
+    app_0_requests_0.clear();
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(host_0, 1*GB, 100,
+            true, priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(rack_0, 1*GB, 100,
+            true, priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 100,
+            true, priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+
+    // Start with off switch. 3 nodes in cluster so shouldn't allocate first 3
+    for (int i = 0; i < numNodes; i++) {
+      assignment =
+          a.assignContainers(clusterResource, node_2, new ResourceLimits(
+              clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+      verifyNoContainerAllocated(assignment);
+      assertEquals(i+1, app_0.getSchedulingOpportunities(priority));
+    }
+    // delay should be capped at numNodes so next one should allocate
+    assignment = a.assignContainers(clusterResource, node_2,
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
+    assertEquals(numNodes+1, app_0.getSchedulingOpportunities(priority));
   }
-  
+
   @Test
   public void testApplicationPriorityScheduling() throws Exception {
     // Manipulate queue 'a'


Mime
View raw message