hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [28/33] hadoop git commit: YARN-6207. Move application across queues should handle delayed event processing. Contributed by Bibin A Chundatt.
Date Wed, 08 Mar 2017 23:46:26 GMT
YARN-6207. Move application across queues should handle delayed event processing. Contributed
by Bibin A Chundatt.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7e68257f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7e68257f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7e68257f

Branch: refs/heads/HDFS-7240
Commit: 7e68257ffafc86d7f6926603ea6d3deda66914d4
Parents: afc2c43
Author: Sunil G <sunilg@apache.org>
Authored: Wed Mar 8 12:04:30 2017 +0530
Committer: Anu Engineer <aengineer@apache.org>
Committed: Wed Mar 8 15:34:03 2017 -0800

----------------------------------------------------------------------
 .../scheduler/SchedulerApplicationAttempt.java  |   5 +-
 .../scheduler/capacity/CapacityScheduler.java   |  69 ++++---
 .../capacity/TestCapacityScheduler.java         | 200 +++++++++++++++++++
 3 files changed, 248 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e68257f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index f894a40..91e29d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -1069,6 +1069,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity
{
       QueueMetrics newMetrics = newQueue.getMetrics();
       String newQueueName = newQueue.getQueueName();
       String user = getUser();
+
       for (RMContainer liveContainer : liveContainers.values()) {
         Resource resource = liveContainer.getContainer().getResource();
         ((RMContainerImpl) liveContainer).setQueueName(newQueueName);
@@ -1084,7 +1085,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity
{
         }
       }
 
-      appSchedulingInfo.move(newQueue);
+      if (!isStopped) {
+        appSchedulingInfo.move(newQueue);
+      }
       this.queue = newQueue;
     } finally {
       writeLock.unlock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e68257f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 20ea607..f6e7942 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1939,36 +1939,47 @@ public class CapacityScheduler extends
       String targetQueueName) throws YarnException {
     try {
       writeLock.lock();
-      FiCaSchedulerApp app = getApplicationAttempt(
-          ApplicationAttemptId.newInstance(appId, 0));
-      String sourceQueueName = app.getQueue().getQueueName();
-      LeafQueue source = this.queueManager.getAndCheckLeafQueue(
-          sourceQueueName);
+      SchedulerApplication<FiCaSchedulerApp> application =
+          applications.get(appId);
+      if (application == null) {
+        throw new YarnException("App to be moved " + appId + " not found.");
+      }
+      String sourceQueueName = application.getQueue().getQueueName();
+      LeafQueue source =
+          this.queueManager.getAndCheckLeafQueue(sourceQueueName);
       String destQueueName = handleMoveToPlanQueue(targetQueueName);
       LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
 
-      String user = app.getUser();
+      String user = application.getUser();
       try {
         dest.submitApplication(appId, user, destQueueName);
       } catch (AccessControlException e) {
         throw new YarnException(e);
       }
-      // Move all live containers
-      for (RMContainer rmContainer : app.getLiveContainers()) {
-        source.detachContainer(getClusterResource(), app, rmContainer);
-        // attach the Container to another queue
-        dest.attachContainer(getClusterResource(), app, rmContainer);
+
+      FiCaSchedulerApp app = application.getCurrentAppAttempt();
+      if (app != null) {
+        // Move all live containers even when stopped.
+        // For transferStateFromPreviousAttempt required
+        for (RMContainer rmContainer : app.getLiveContainers()) {
+          source.detachContainer(getClusterResource(), app, rmContainer);
+          // attach the Container to another queue
+          dest.attachContainer(getClusterResource(), app, rmContainer);
+        }
+        if (!app.isStopped()) {
+          source.finishApplicationAttempt(app, sourceQueueName);
+          // Submit to a new queue
+          dest.submitApplicationAttempt(app, user);
+        }
+        // Finish app & update metrics
+        app.move(dest);
       }
+      source.appFinished();
       // Detach the application..
-      source.finishApplicationAttempt(app, sourceQueueName);
-      source.getParent().finishApplication(appId, app.getUser());
-      // Finish app & update metrics
-      app.move(dest);
-      // Submit to a new queue
-      dest.submitApplicationAttempt(app, user);
-      applications.get(appId).setQueue(dest);
-      LOG.info("App: " + app.getApplicationId() + " successfully moved from "
-          + sourceQueueName + " to: " + destQueueName);
+      source.getParent().finishApplication(appId, user);
+      application.setQueue(dest);
+      LOG.info("App: " + appId + " successfully moved from " + sourceQueueName
+          + " to: " + destQueueName);
       return targetQueueName;
     } finally {
       writeLock.unlock();
@@ -1980,15 +1991,23 @@ public class CapacityScheduler extends
       String newQueue) throws YarnException {
     try {
       writeLock.lock();
-      FiCaSchedulerApp app = getApplicationAttempt(
-          ApplicationAttemptId.newInstance(appId, 0));
-      String sourceQueueName = app.getQueue().getQueueName();
+      SchedulerApplication<FiCaSchedulerApp> application =
+          applications.get(appId);
+      if (application == null) {
+        throw new YarnException("App to be moved " + appId + " not found.");
+      }
+      String sourceQueueName = application.getQueue().getQueueName();
       this.queueManager.getAndCheckLeafQueue(sourceQueueName);
       String destQueueName = handleMoveToPlanQueue(newQueue);
       LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
       // Validation check - ACLs, submission limits for user & queue
-      String user = app.getUser();
-      checkQueuePartition(app, dest);
+      String user = application.getUser();
+      // Check active partition only when attempt is available
+      FiCaSchedulerApp appAttempt =
+          getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0));
+      if (null != appAttempt) {
+        checkQueuePartition(appAttempt, dest);
+      }
       try {
         dest.validateSubmitApplication(appId, user, destQueueName);
       } catch (AccessControlException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e68257f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 2b60ecf..293bac2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -110,12 +111,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -2209,6 +2212,203 @@ public class TestCapacityScheduler {
     rm.stop();
   }
 
+  @Test(timeout = 60000)
+  public void testMoveAttemptNotAdded() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm = new MockRM(getCapacityConfiguration(conf));
+    rm.start();
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(appId, 1);
+
+    RMAppAttemptMetrics attemptMetric =
+        new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
+    RMAppImpl app = mock(RMAppImpl.class);
+    when(app.getApplicationId()).thenReturn(appId);
+    RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
+    Container container = mock(Container.class);
+    when(attempt.getMasterContainer()).thenReturn(container);
+    ApplicationSubmissionContext submissionContext =
+        mock(ApplicationSubmissionContext.class);
+    when(attempt.getSubmissionContext()).thenReturn(submissionContext);
+    when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
+    when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
+    when(app.getCurrentAppAttempt()).thenReturn(attempt);
+
+    rm.getRMContext().getRMApps().put(appId, app);
+
+    SchedulerEvent addAppEvent =
+        new AppAddedSchedulerEvent(appId, "a1", "user");
+    try {
+      cs.moveApplication(appId, "b1");
+      fail("Move should throw exception app not available");
+    } catch (YarnException e) {
+      assertEquals("App to be moved application_100_0001 not found.",
+          e.getMessage());
+    }
+    cs.handle(addAppEvent);
+    cs.moveApplication(appId, "b1");
+    SchedulerEvent addAttemptEvent =
+        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
+    cs.handle(addAttemptEvent);
+    CSQueue rootQ = cs.getRootQueue();
+    CSQueue queueB = cs.getQueue("b");
+    CSQueue queueA = cs.getQueue("a");
+    CSQueue queueA1 = cs.getQueue("a1");
+    CSQueue queueB1 = cs.getQueue("b1");
+    Assert.assertEquals(1, rootQ.getNumApplications());
+    Assert.assertEquals(0, queueA.getNumApplications());
+    Assert.assertEquals(1, queueB.getNumApplications());
+    Assert.assertEquals(0, queueA1.getNumApplications());
+    Assert.assertEquals(1, queueB1.getNumApplications());
+
+    rm.close();
+  }
+
+  @Test
+  public void testRemoveAttemptMoveAdded() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        CapacityScheduler.class);
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+    // Create Mock RM
+    MockRM rm = new MockRM(getCapacityConfiguration(conf));
+    CapacityScheduler sch = (CapacityScheduler) rm.getResourceScheduler();
+    // add node
+    Resource newResource = Resource.newInstance(4 * GB, 1);
+    RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
+    SchedulerEvent addNode = new NodeAddedSchedulerEvent(node);
+    sch.handle(addNode);
+    // create appid
+    ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(appId, 1);
+
+    RMAppAttemptMetrics attemptMetric =
+        new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
+    RMAppImpl app = mock(RMAppImpl.class);
+    when(app.getApplicationId()).thenReturn(appId);
+    RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
+    Container container = mock(Container.class);
+    when(attempt.getMasterContainer()).thenReturn(container);
+    ApplicationSubmissionContext submissionContext =
+        mock(ApplicationSubmissionContext.class);
+    when(attempt.getSubmissionContext()).thenReturn(submissionContext);
+    when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
+    when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
+    when(app.getCurrentAppAttempt()).thenReturn(attempt);
+
+    rm.getRMContext().getRMApps().put(appId, app);
+    // Add application
+    SchedulerEvent addAppEvent =
+        new AppAddedSchedulerEvent(appId, "a1", "user");
+    sch.handle(addAppEvent);
+    // Add application attempt
+    SchedulerEvent addAttemptEvent =
+        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
+    sch.handle(addAttemptEvent);
+    // get Queues
+    CSQueue queueA1 = sch.getQueue("a1");
+    CSQueue queueB = sch.getQueue("b");
+    CSQueue queueB1 = sch.getQueue("b1");
+
+    // add Running rm container and simulate live containers to a1
+    ContainerId newContainerId = ContainerId.newContainerId(appAttemptId, 2);
+    RMContainerImpl rmContainer = mock(RMContainerImpl.class);
+    when(rmContainer.getState()).thenReturn(RMContainerState.RUNNING);
+    Container container2 = mock(Container.class);
+    when(rmContainer.getContainer()).thenReturn(container2);
+    Resource resource = Resource.newInstance(1024, 1);
+    when(container2.getResource()).thenReturn(resource);
+    when(rmContainer.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
+    when(container2.getNodeId()).thenReturn(node.getNodeID());
+    when(container2.getId()).thenReturn(newContainerId);
+    when(rmContainer.getNodeLabelExpression())
+        .thenReturn(RMNodeLabelsManager.NO_LABEL);
+    when(rmContainer.getContainerId()).thenReturn(newContainerId);
+    sch.getApplicationAttempt(appAttemptId).getLiveContainersMap()
+        .put(newContainerId, rmContainer);
+    QueueMetrics queueA1M = queueA1.getMetrics();
+    queueA1M.incrPendingResources("user1", 1, resource);
+    queueA1M.allocateResources("user1", resource);
+    // remove attempt
+    sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId,
+        RMAppAttemptState.KILLED, true));
+    // Move application to queue b1
+    sch.moveApplication(appId, "b1");
+    // Check queue metrics after move
+    Assert.assertEquals(0, queueA1.getNumApplications());
+    Assert.assertEquals(1, queueB.getNumApplications());
+    Assert.assertEquals(0, queueB1.getNumApplications());
+
+    // Release attempt add event
+    ApplicationAttemptId appAttemptId2 =
+        BuilderUtils.newApplicationAttemptId(appId, 2);
+    SchedulerEvent addAttemptEvent2 =
+        new AppAttemptAddedSchedulerEvent(appAttemptId2, true);
+    sch.handle(addAttemptEvent2);
+
+    // Check metrics after attempt added
+    Assert.assertEquals(0, queueA1.getNumApplications());
+    Assert.assertEquals(1, queueB.getNumApplications());
+    Assert.assertEquals(1, queueB1.getNumApplications());
+
+
+    QueueMetrics queueB1M = queueB1.getMetrics();
+    QueueMetrics queueBM = queueB.getMetrics();
+    // Verify allocation MB of current state
+    Assert.assertEquals(0, queueA1M.getAllocatedMB());
+    Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores());
+    Assert.assertEquals(1024, queueB1M.getAllocatedMB());
+    Assert.assertEquals(1, queueB1M.getAllocatedVirtualCores());
+
+    // remove attempt
+    sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId2,
+        RMAppAttemptState.FINISHED, false));
+
+    Assert.assertEquals(0, queueA1M.getAllocatedMB());
+    Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores());
+    Assert.assertEquals(0, queueB1M.getAllocatedMB());
+    Assert.assertEquals(0, queueB1M.getAllocatedVirtualCores());
+
+    verifyQueueMetrics(queueB1M);
+    verifyQueueMetrics(queueBM);
+    // Verify queue A1 metrics
+    verifyQueueMetrics(queueA1M);
+    rm.close();
+  }
+
+  private void verifyQueueMetrics(QueueMetrics queue) {
+    Assert.assertEquals(0, queue.getPendingMB());
+    Assert.assertEquals(0, queue.getActiveUsers());
+    Assert.assertEquals(0, queue.getActiveApps());
+    Assert.assertEquals(0, queue.getAppsPending());
+    Assert.assertEquals(0, queue.getAppsRunning());
+    Assert.assertEquals(0, queue.getAllocatedMB());
+    Assert.assertEquals(0, queue.getAllocatedVirtualCores());
+  }
+
+  private Configuration getCapacityConfiguration(Configuration config) {
+    CapacitySchedulerConfiguration conf =
+        new CapacitySchedulerConfiguration(config);
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] {"a", "b"});
+    conf.setCapacity(A, 50);
+    conf.setCapacity(B, 50);
+    conf.setQueues(A, new String[] {"a1", "a2"});
+    conf.setCapacity(A1, 50);
+    conf.setCapacity(A2, 50);
+    conf.setQueues(B, new String[] {"b1"});
+    conf.setCapacity(B1, 100);
+    return conf;
+  }
+
   @Test
   public void testKillAllAppsInQueue() throws Exception {
     MockRM rm = setUpMove();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message