hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [06/15] hadoop git commit: YARN-5918. Handle Opportunistic scheduling allocate request failure when NM is lost. (Bibin A Chundatt via asuresh)
Date Fri, 06 Jan 2017 19:34:36 GMT
YARN-5918. Handle Opportunistic scheduling allocate request failure when NM is lost. (Bibin
A Chundatt via asuresh)

(cherry picked from commit 005850b28feb2f7bb8c2844d11e3f9d21b45d754)
(cherry picked from commit cbff10b4147f98a89b393519b17e16385294af07)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/44774eb2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/44774eb2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/44774eb2

Branch: refs/heads/branch-2
Commit: 44774eb21c64d725e2880860c998a1cdccfbb488
Parents: 368565f
Author: Arun Suresh <asuresh@apache.org>
Authored: Wed Nov 23 09:53:31 2016 -0800
Committer: Arun Suresh <asuresh@apache.org>
Committed: Fri Jan 6 11:15:09 2017 -0800

----------------------------------------------------------------------
 ...pportunisticContainerAllocatorAMService.java | 13 ++-
 .../distributed/NodeQueueLoadMonitor.java       |  8 +-
 ...pportunisticContainerAllocatorAMService.java | 97 +++++++++++++++++++-
 3 files changed, 110 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/44774eb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index 1f83127..ada1a63 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
 
@@ -409,15 +410,19 @@ public class OpportunisticContainerAllocatorAMService
   private List<RemoteNode> convertToRemoteNodes(List<NodeId> nodeIds) {
     ArrayList<RemoteNode> retNodes = new ArrayList<>();
     for (NodeId nId : nodeIds) {
-      retNodes.add(convertToRemoteNode(nId));
+      RemoteNode remoteNode = convertToRemoteNode(nId);
+      if (null != remoteNode) {
+        retNodes.add(remoteNode);
+      }
     }
     return retNodes;
   }
 
   private RemoteNode convertToRemoteNode(NodeId nodeId) {
-    return RemoteNode.newInstance(nodeId,
-        ((AbstractYarnScheduler)rmContext.getScheduler()).getNode(nodeId)
-            .getHttpAddress());
+    SchedulerNode node =
+        ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(nodeId);
+    return node != null ? RemoteNode.newInstance(nodeId, node.getHttpAddress())
+        : null;
   }
 
   private Resource createMaxContainerResource() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/44774eb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
index 232b4ad..dec55ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
@@ -165,9 +165,11 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
   }
 
   @Override
-  public void addNode(List<NMContainerStatus> containerStatuses, RMNode
-      rmNode) {
-    LOG.debug("Node added event from: " + rmNode.getNode().getName());
+  public void addNode(List<NMContainerStatus> containerStatuses,
+      RMNode rmNode) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Node added event from: " + rmNode.getNode().getName());
+    }
     // Ignoring this currently : at least one NODE_UPDATE heartbeat is
     // required to ensure node eligibility.
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/44774eb2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index deaee3f..4ed92f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -61,11 +61,23 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
-
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -77,6 +89,89 @@ import java.util.List;
  */
 public class TestOpportunisticContainerAllocatorAMService {
 
+  private static final int GB = 1024;
+
+  @Test(timeout = 60000)
+  public void testNodeRemovalDuringAllocate() throws Exception {
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+    conf.setInt(
+        YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+    nm1.registerNode();
+    nm2.registerNode();
+    OpportunisticContainerAllocatorAMService amservice =
+        (OpportunisticContainerAllocatorAMService) rm
+            .getApplicationMasterService();
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    ApplicationAttemptId attemptId =
+        app1.getCurrentAppAttempt().getAppAttemptId();
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+    ResourceScheduler scheduler = rm.getResourceScheduler();
+    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+    ((RMNodeImpl) rmNode1)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+    ((RMNodeImpl) rmNode2)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+    OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
+        .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
+    // Send add and update node events to AM Service.
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    // Both node 1 and node 2 will be applicable for scheduling.
+    for (int i = 0; i < 10; i++) {
+      am1.allocate(
+          Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+              "*", Resources.createResource(1 * GB), 2)),
+          null);
+      if (ctxt.getNodeMap().size() == 2) {
+        break;
+      }
+      Thread.sleep(50);
+    }
+    Assert.assertEquals(2, ctxt.getNodeMap().size());
+    // Remove node from scheduler but not from AM Service.
+    scheduler.handle(new NodeRemovedSchedulerEvent(rmNode1));
+    // After removal of node 1, only 1 node will be applicable for scheduling.
+    for (int i = 0; i < 10; i++) {
+      try {
+        am1.allocate(
+            Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+                "*", Resources.createResource(1 * GB), 2)),
+            null);
+      } catch (Exception e) {
+        Assert.fail("Allocate request should be handled on node removal");
+      }
+      if (ctxt.getNodeMap().size() == 1) {
+        break;
+      }
+      Thread.sleep(50);
+    }
+    Assert.assertEquals(1, ctxt.getNodeMap().size());
+  }
+
+  private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime,
+      int queueLength) {
+    OpportunisticContainersStatus status1 =
+        Mockito.mock(OpportunisticContainersStatus.class);
+    Mockito.when(status1.getEstimatedQueueWaitTime()).thenReturn(waitTime);
+    Mockito.when(status1.getWaitQueueLength()).thenReturn(queueLength);
+    return status1;
+  }
+
   // Test if the OpportunisticContainerAllocatorAMService can handle both
   // DSProtocol as well as AMProtocol clients
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message