hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lium...@apache.org
Subject [15/48] hadoop git commit: YARN-4752. Improved preemption in FairScheduler. (kasha)
Date Wed, 30 Nov 2016 05:01:05 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/10468529/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 8e6272a..992b75d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -17,14 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import org.junit.Assert;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -39,7 +31,9 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -50,9 +44,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptM
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import org.junit.Assert;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
 public class FairSchedulerTestBase {
   public final static String TEST_DIR =
       new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
@@ -70,9 +72,14 @@ public class FairSchedulerTestBase {
   private static final int SLEEP_DURATION = 10;
   private static final int SLEEP_RETRIES = 1000;
 
+  /**
+   * The list of nodes added to the cluster using the {@link #addNode} method.
+   */
+  protected final List<RMNode> rmNodes = new ArrayList<>();
+
   // Helper methods
   public Configuration createConfiguration() {
-    Configuration conf = new YarnConfiguration();
+    conf = new YarnConfiguration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
         ResourceScheduler.class);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
@@ -280,4 +287,18 @@ public class FairSchedulerTestBase {
     Assert.assertEquals(resource.getVirtualCores(),
         app.getCurrentConsumption().getVirtualCores());
   }
+
+  /**
+   * Add a node to the cluster and track the nodes in {@link #rmNodes}.
+   * @param memory memory capacity of the node
+   * @param cores cpu capacity of the node
+   */
+  protected void addNode(int memory, int cores) {
+    int id = rmNodes.size() + 1;
+    RMNode node =
+        MockNodes.newNodeInfo(1, Resources.createResource(memory, cores), id,
+            "127.0.0." + id);
+    scheduler.handle(new NodeAddedSchedulerEvent(node));
+    rmNodes.add(node);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10468529/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
new file mode 100644
index 0000000..25780cd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class FairSchedulerWithMockPreemption extends FairScheduler {
+  @Override
+  protected void createPreemptionThread() {
+    preemptionThread = new MockPreemptionThread(this);
+  }
+
+  static class MockPreemptionThread extends FSPreemptionThread {
+    private Set<FSAppAttempt> appsAdded = new HashSet<>();
+    private int totalAppsAdded = 0;
+
+    MockPreemptionThread(FairScheduler scheduler) {
+      super(scheduler);
+    }
+
+    @Override
+    public void run() {
+      while (!Thread.interrupted()) {
+        try {
+          FSAppAttempt app = context.getStarvedApps().take();
+          appsAdded.add(app);
+          totalAppsAdded++;
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    }
+
+    int uniqueAppsAdded() {
+      return appsAdded.size();
+    }
+
+    int totalAppsAdded() {
+      return totalAppsAdded;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10468529/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
index 5a170cf..e802f42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
@@ -86,11 +86,6 @@ public class FakeSchedulable implements Schedulable {
   }
 
   @Override
-  public RMContainer preemptContainer() {
-    return null;
-  }
-
-  @Override
   public Resource getFairShare() {
     return this.fairShare;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10468529/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
new file mode 100644
index 0000000..a5b2d86
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+/**
+ * Test class to verify identification of app starvation
+ */
+public class TestFSAppStarvation extends FairSchedulerTestBase {
+
+  private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES");
+
+  // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
+  private static final int NODE_CAPACITY_MULTIPLE = 4;
+  private static final String[] QUEUES =
+      {"no-preemption", "minshare", "fairshare.child", "drf.child"};
+
+  private FairSchedulerWithMockPreemption.MockPreemptionThread preemptionThread;
+
+  @Before
+  public void setup() {
+    createConfiguration();
+    conf.set(YarnConfiguration.RM_SCHEDULER,
+        FairSchedulerWithMockPreemption.class.getCanonicalName());
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
+        ALLOC_FILE.getAbsolutePath());
+    conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
+    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+  }
+
+  @After
+  public void teardown() {
+    ALLOC_FILE.delete();
+    conf = null;
+    if (resourceManager != null) {
+      resourceManager.stop();
+      resourceManager = null;
+    }
+  }
+
+  /*
+   * Test to verify application starvation is computed only when preemption
+   * is enabled.
+   */
+  @Test
+  public void testPreemptionDisabled() throws Exception {
+    conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, false);
+
+    setupClusterAndSubmitJobs();
+
+    assertNull("Found starved apps even when preemption is turned off",
+        scheduler.getContext().getStarvedApps());
+  }
+
+  /*
+   * Test to verify application starvation is computed correctly when
+   * preemption is turned on.
+   */
+  @Test
+  public void testPreemptionEnabled() throws Exception {
+    setupClusterAndSubmitJobs();
+
+    assertNotNull("FSContext does not have an FSStarvedApps instance",
+        scheduler.getContext().getStarvedApps());
+    assertEquals("Expecting 3 starved applications, one each for the "
+            + "minshare and fairshare queues",
+        3, preemptionThread.uniqueAppsAdded());
+
+    // Verify the apps get added again on a subsequent update
+    scheduler.update();
+    Thread.yield();
+
+    verifyLeafQueueStarvation();
+    assertTrue("Each app is marked as starved exactly once",
+        preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded());
+  }
+
+  /*
+   * Test to verify app starvation is computed only when the cluster
+   * utilization threshold is over the preemption threshold.
+   */
+  @Test
+  public void testClusterUtilizationThreshold() throws Exception {
+    // Set preemption threshold to 1.1, so the utilization is always lower
+    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 1.1f);
+
+    setupClusterAndSubmitJobs();
+
+    assertNotNull("FSContext does not have an FSStarvedApps instance",
+        scheduler.getContext().getStarvedApps());
+    assertEquals("Found starved apps when preemption threshold is over 100%", 0,
+        preemptionThread.totalAppsAdded());
+  }
+
+  private void verifyLeafQueueStarvation() {
+    for (String q : QUEUES) {
+      if (!q.equals("no-preemption")) {
+        boolean isStarved =
+            scheduler.getQueueManager().getLeafQueue(q, false).isStarved();
+        assertTrue(isStarved);
+      }
+    }
+  }
+
+  private void setupClusterAndSubmitJobs() throws Exception {
+    setupStarvedCluster();
+    submitAppsToEachLeafQueue();
+    sendEnoughNodeUpdatesToAssignFully();
+
+    // Sleep to hit the preemption timeouts
+    Thread.sleep(10);
+
+    // Scheduler update to populate starved apps
+    scheduler.update();
+
+    // Wait for apps to be processed by MockPreemptionThread
+    Thread.yield();
+  }
+
+  /**
+   * Setup the cluster for starvation testing:
+   * 1. Create FS allocation file
+   * 2. Create and start MockRM
+   * 3. Add two nodes to the cluster
+   * 4. Submit an app that uses up all resources on the cluster
+   */
+  private void setupStarvedCluster() throws IOException {
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+
+    // Default queue
+    out.println("<queue name=\"default\">");
+    out.println("</queue>");
+
+    // Queue with preemption disabled
+    out.println("<queue name=\"no-preemption\">");
+    out.println("<fairSharePreemptionThreshold>0" +
+        "</fairSharePreemptionThreshold>");
+    out.println("</queue>");
+
+    // Queue with minshare preemption enabled
+    out.println("<queue name=\"minshare\">");
+    out.println("<fairSharePreemptionThreshold>0" +
+        "</fairSharePreemptionThreshold>");
+    out.println("<minSharePreemptionTimeout>0" +
+        "</minSharePreemptionTimeout>");
+    out.println("<minResources>2048mb,2vcores</minResources>");
+    out.println("</queue>");
+
+    // FAIR queue with fairshare preemption enabled
+    out.println("<queue name=\"fairshare\">");
+    out.println("<fairSharePreemptionThreshold>1" +
+        "</fairSharePreemptionThreshold>");
+    out.println("<fairSharePreemptionTimeout>0" +
+        "</fairSharePreemptionTimeout>");
+    out.println("<schedulingPolicy>fair</schedulingPolicy>");
+    addChildQueue(out);
+    out.println("</queue>");
+
+    // DRF queue with fairshare preemption enabled
+    out.println("<queue name=\"drf\">");
+    out.println("<fairSharePreemptionThreshold>1" +
+        "</fairSharePreemptionThreshold>");
+    out.println("<fairSharePreemptionTimeout>0" +
+        "</fairSharePreemptionTimeout>");
+    out.println("<schedulingPolicy>drf</schedulingPolicy>");
+    addChildQueue(out);
+    out.println("</queue>");
+
+    out.println("</allocations>");
+    out.close();
+
+    assertTrue("Allocation file does not exist, not running the test",
+        ALLOC_FILE.exists());
+
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+    preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread)
+        scheduler.preemptionThread;
+
+    // Create and add two nodes to the cluster
+    addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
+    addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
+
+    // Create an app that takes up all the resources on the cluster
+    ApplicationAttemptId app
+        = createSchedulingRequest(1024, 1, "root.default", "default", 8);
+
+    scheduler.update();
+    sendEnoughNodeUpdatesToAssignFully();
+
+    assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size());
+  }
+
+  private void addChildQueue(PrintWriter out) {
+    // Child queue under fairshare with same settings
+    out.println("<queue name=\"child\">");
+    out.println("<fairSharePreemptionThreshold>1" +
+        "</fairSharePreemptionThreshold>");
+    out.println("<fairSharePreemptionTimeout>0" +
+        "</fairSharePreemptionTimeout>");
+    out.println("</queue>");
+  }
+
+  private void submitAppsToEachLeafQueue() {
+    for (String queue : QUEUES) {
+      createSchedulingRequest(1024, 1, "root." + queue, "user", 1);
+    }
+    scheduler.update();
+  }
+
+  private void sendEnoughNodeUpdatesToAssignFully() {
+    for (RMNode node : rmNodes) {
+      NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent =
+          new NodeUpdateSchedulerEvent(node);
+      for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) {
+        scheduler.handle(nodeUpdateSchedulerEvent);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10468529/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index 0a2ce81..98de8db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
@@ -106,12 +105,8 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<minResources>2048mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<minResources>2048mb,0vcores</minResources>");
-    out.println("</queue>");
+    out.println("<queue name=\"queueA\"></queue>");
+    out.println("<queue name=\"queueB\"></queue>");
     out.println("</allocations>");
     out.close();
 
@@ -144,162 +139,6 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
     scheduler.update();
     Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
     assertEquals(3, queues.size());
-
-    // Queue A should be above min share, B below.
-    FSLeafQueue queueA =
-        scheduler.getQueueManager().getLeafQueue("queueA", false);
-    FSLeafQueue queueB =
-        scheduler.getQueueManager().getLeafQueue("queueB", false);
-    assertFalse(queueA.isStarvedForMinShare());
-    assertTrue(queueB.isStarvedForMinShare());
-
-    // Node checks in again, should allocate for B
-    scheduler.handle(nodeEvent2);
-    // Now B should have min share ( = demand here)
-    assertFalse(queueB.isStarvedForMinShare());
-  }
-
-  @Test (timeout = 5000)
-  public void testIsStarvedForFairShare() throws Exception {
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>.2</weight>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>.8</weight>");
-    out.println("<fairSharePreemptionThreshold>.4</fairSharePreemptionThreshold>");
-    out.println("<queue name=\"queueB1\">");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB2\">");
-    out.println("<fairSharePreemptionThreshold>.6</fairSharePreemptionThreshold>");
-    out.println("</queue>");
-    out.println("</queue>");
-    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
-    out.println("</allocations>");
-    out.close();
-
-    resourceManager = new MockRM(conf);
-    resourceManager.start();
-    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
-
-    // Add one big node (only care about aggregate capacity)
-    RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1,
-            "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    scheduler.update();
-
-    // Queue A wants 4 * 1024. Node update gives this all to A
-    createSchedulingRequest(1 * 1024, "queueA", "user1", 4);
-    scheduler.update();
-    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
-    for (int i = 0; i < 4; i ++) {
-      scheduler.handle(nodeEvent2);
-    }
-
-    QueueManager queueMgr = scheduler.getQueueManager();
-    FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
-    assertEquals(4 * 1024, queueA.getResourceUsage().getMemorySize());
-
-    // Both queue B1 and queue B2 want 3 * 1024
-    createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 3);
-    createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 3);
-    scheduler.update();
-    for (int i = 0; i < 4; i ++) {
-      scheduler.handle(nodeEvent2);
-    }
-
-    FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", false);
-    FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", false);
-    assertEquals(2 * 1024, queueB1.getResourceUsage().getMemorySize());
-    assertEquals(2 * 1024, queueB2.getResourceUsage().getMemorySize());
-
-    // For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair share
-    // threshold is 1.6 * 1024
-    assertFalse(queueB1.isStarvedForFairShare());
-
-    // For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair share
-    // threshold is 2.4 * 1024
-    assertTrue(queueB2.isStarvedForFairShare());
-
-    // Node checks in again
-    scheduler.handle(nodeEvent2);
-    scheduler.handle(nodeEvent2);
-    assertEquals(3 * 1024, queueB1.getResourceUsage().getMemorySize());
-    assertEquals(3 * 1024, queueB2.getResourceUsage().getMemorySize());
-
-    // Both queue B1 and queue B2 usages go to 3 * 1024
-    assertFalse(queueB1.isStarvedForFairShare());
-    assertFalse(queueB2.isStarvedForFairShare());
-  }
-
-  @Test (timeout = 5000)
-  public void testIsStarvedForFairShareDRF() throws Exception {
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>.5</weight>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>.5</weight>");
-    out.println("</queue>");
-    out.println("<defaultFairSharePreemptionThreshold>1</defaultFairSharePreemptionThreshold>");
-    out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
-    out.println("</allocations>");
-    out.close();
-
-    resourceManager = new MockRM(conf);
-    resourceManager.start();
-    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
-
-    // Add one big node (only care about aggregate capacity)
-    RMNode node1 =
-            MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1,
-                    "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    scheduler.update();
-
-    // Queue A wants 7 * 1024, 1. Node update gives this all to A
-    createSchedulingRequest(7 * 1024, 1, "queueA", "user1", 1);
-    scheduler.update();
-    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
-    scheduler.handle(nodeEvent2);
-
-    QueueManager queueMgr = scheduler.getQueueManager();
-    FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
-    assertEquals(7 * 1024, queueA.getResourceUsage().getMemorySize());
-    assertEquals(1, queueA.getResourceUsage().getVirtualCores());
-
-    // Queue B has 3 reqs :
-    // 1) 2 * 1024, 5 .. which will be granted
-    // 2) 1 * 1024, 1 .. which will be granted
-    // 3) 1 * 1024, 1 .. which wont
-    createSchedulingRequest(2 * 1024, 5, "queueB", "user1", 1);
-    createSchedulingRequest(1 * 1024, 2, "queueB", "user1", 2);
-    scheduler.update();
-    for (int i = 0; i < 3; i ++) {
-      scheduler.handle(nodeEvent2);
-    }
-
-    FSLeafQueue queueB = queueMgr.getLeafQueue("queueB", false);
-    assertEquals(3 * 1024, queueB.getResourceUsage().getMemorySize());
-    assertEquals(6, queueB.getResourceUsage().getVirtualCores());
-
-    scheduler.update();
-
-    // Verify that Queue us not starved for fair share..
-    // Since the Starvation logic now uses DRF when the policy = drf, The
-    // Queue should not be starved
-    assertFalse(queueB.isStarvedForFairShare());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message