hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bil...@apache.org
Subject hadoop git commit: YARN-2674. Fix distributed shell AM container relaunch during RM work preserving restart. Contributed by Shane Kumpf
Date Tue, 01 May 2018 15:04:13 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk d6139c510 -> 4e1382aca


YARN-2674. Fix distributed shell AM container relaunch during RM work preserving restart.
Contributed by Shane Kumpf


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4e1382ac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4e1382ac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4e1382ac

Branch: refs/heads/trunk
Commit: 4e1382aca4cf23ca229bdd24e0f143c22449b329
Parents: d6139c5
Author: Billie Rinaldi <billie@apache.org>
Authored: Mon Apr 30 14:34:51 2018 -0700
Committer: Billie Rinaldi <billie@apache.org>
Committed: Tue May 1 07:27:47 2018 -0700

----------------------------------------------------------------------
 .../distributedshell/ApplicationMaster.java     | 68 +++++++++++++-------
 .../distributedshell/TestDSAppMaster.java       |  8 +--
 2 files changed, 46 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e1382ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 75f4073..cca5676 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -31,6 +31,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -105,6 +106,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.client.api.TimelineV2Client;
@@ -1060,32 +1062,48 @@ public class ApplicationMaster {
     public void onContainersAllocated(List<Container> allocatedContainers) {
       LOG.info("Got response from RM for container ask, allocatedCnt="
           + allocatedContainers.size());
-      numAllocatedContainers.addAndGet(allocatedContainers.size());
       for (Container allocatedContainer : allocatedContainers) {
-        String yarnShellId = Integer.toString(yarnShellIdCounter);
-        yarnShellIdCounter++;
-        LOG.info("Launching shell command on a new container."
-            + ", containerId=" + allocatedContainer.getId()
-            + ", yarnShellId=" + yarnShellId
-            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
-            + ":" + allocatedContainer.getNodeId().getPort()
-            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
-            + ", containerResourceMemory"
-            + allocatedContainer.getResource().getMemorySize()
-            + ", containerResourceVirtualCores"
-            + allocatedContainer.getResource().getVirtualCores());
-        // + ", containerToken"
-        // +allocatedContainer.getContainerToken().getIdentifier().toString());
-
-        Thread launchThread = createLaunchContainerThread(allocatedContainer,
-            yarnShellId);
-
-        // launch and start the container on a separate thread to keep
-        // the main thread unblocked
-        // as all containers may not be allocated at one go.
-        launchThreads.add(launchThread);
-        launchedContainers.add(allocatedContainer.getId());
-        launchThread.start();
+        if (numAllocatedContainers.get() == numTotalContainers) {
+          LOG.info("The requested number of containers have been allocated."
+              + " Releasing the extra container allocation from the RM.");
+          amRMClient.releaseAssignedContainer(allocatedContainer.getId());
+        } else {
+          numAllocatedContainers.addAndGet(1);
+          String yarnShellId = Integer.toString(yarnShellIdCounter);
+          yarnShellIdCounter++;
+          LOG.info(
+              "Launching shell command on a new container."
+                  + ", containerId=" + allocatedContainer.getId()
+                  + ", yarnShellId=" + yarnShellId
+                  + ", containerNode="
+                  + allocatedContainer.getNodeId().getHost()
+                  + ":" + allocatedContainer.getNodeId().getPort()
+                  + ", containerNodeURI="
+                  + allocatedContainer.getNodeHttpAddress()
+                  + ", containerResourceMemory"
+                  + allocatedContainer.getResource().getMemorySize()
+                  + ", containerResourceVirtualCores"
+                  + allocatedContainer.getResource().getVirtualCores());
+
+          Thread launchThread =
+              createLaunchContainerThread(allocatedContainer, yarnShellId);
+
+          // launch and start the container on a separate thread to keep
+          // the main thread unblocked
+          // as all containers may not be allocated at one go.
+          launchThreads.add(launchThread);
+          launchedContainers.add(allocatedContainer.getId());
+          launchThread.start();
+
+          // Remove the corresponding request
+          Collection<AMRMClient.ContainerRequest> requests =
+              amRMClient.getMatchingRequests(
+                  allocatedContainer.getAllocationRequestId());
+          if (requests.iterator().hasNext()) {
+            AMRMClient.ContainerRequest request = requests.iterator().next();
+            amRMClient.removeContainerRequest(request);
+          }
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e1382ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
index f11bdf8..f2a8041 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
@@ -106,7 +106,6 @@ public class TestDSAppMaster {
     handler.onContainersAllocated(containers);
     Assert.assertEquals("Wrong container allocation count", 1,
         master.getAllocatedContainers());
-    Mockito.verifyZeroInteractions(mockClient);
     Assert.assertEquals("Incorrect number of threads launched", 1,
         master.threadsLaunched);
     Assert.assertEquals("Incorrect YARN Shell IDs",
@@ -121,15 +120,14 @@ public class TestDSAppMaster {
     ContainerId id4 = BuilderUtils.newContainerId(1, 1, 1, 4);
     containers.add(generateContainer(id4));
     handler.onContainersAllocated(containers);
-    Assert.assertEquals("Wrong final container allocation count", 4,
+    Assert.assertEquals("Wrong final container allocation count", 2,
         master.getAllocatedContainers());
 
-    Assert.assertEquals("Incorrect number of threads launched", 4,
+    Assert.assertEquals("Incorrect number of threads launched", 2,
         master.threadsLaunched);
 
     Assert.assertEquals("Incorrect YARN Shell IDs",
-        Arrays.asList("1", "2", "3", "4"), master.yarnShellIds);
-
+        Arrays.asList("1", "2"), master.yarnShellIds);
     // make sure we handle completion events correctly
     List<ContainerStatus> status = new ArrayList<>();
     status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message