hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjs...@apache.org
Subject [33/50] [abbrv] hadoop git commit: YARN-2821. Fixed a problem that DistributedShell AM may hang if restarted. Contributed by Varun Vasudev
Date Fri, 22 May 2015 21:41:51 GMT
YARN-2821. Fixed a problem that DistributedShell AM may hang if restarted. Contributed by Varun
Vasudev

Conflicts:
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d2057047
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d2057047
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d2057047

Branch: refs/heads/YARN-2928
Commit: d2057047a4cb606c1b2d05003cdb82128fd528fc
Parents: 8d096be
Author: Jian He <jianhe@apache.org>
Authored: Tue May 19 14:20:31 2015 -0700
Committer: Zhijie Shen <zjshen@apache.org>
Committed: Fri May 22 12:18:19 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../pom.xml                                     |   5 +
 .../distributedshell/ApplicationMaster.java     |  88 ++++++++++---
 .../distributedshell/TestDSAppMaster.java       | 130 +++++++++++++++++++
 4 files changed, 205 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2057047/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e5a0b95..7fbc09a 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -501,6 +501,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3302. TestDockerContainerExecutor should run automatically if it can
     detect docker in the usual place (Ravindra Kumar Naik via raviprak)
 
+    YARN-2821. Fixed a problem that DistributedShell AM may hang if restarted.
+    (Varun Vasudev via jianhe)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2057047/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
index 18e325c..713f12b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
@@ -126,6 +126,11 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2057047/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index add34af..13bf500 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -30,10 +30,12 @@ import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -177,7 +179,7 @@ public class ApplicationMaster {
   public static enum DSEvent {
     DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
   }
-  
+
   @VisibleForTesting
   @Private
   public static enum DSEntity {
@@ -220,7 +222,7 @@ public class ApplicationMaster {
   private static ExecutorService threadPool =
       Executors.newCachedThreadPool(
           new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
-          .build());
+              .build());
 
   // App Master configuration
   // No. of containers to run shell command on
@@ -290,6 +292,10 @@ public class ApplicationMaster {
   private final String linux_bash_command = "bash";
   private final String windows_command = "cmd /c";
 
+  @VisibleForTesting
+  protected final Set<ContainerId> launchedContainers =
+      Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
+
   /**
    * @param args Command line args
    */
@@ -304,7 +310,7 @@ public class ApplicationMaster {
       }
       appMaster.run();
       result = appMaster.finish();
-      
+
       shutdownAndAwaitTermination();
     } catch (Throwable t) {
       LOG.fatal("Error running ApplicationMaster", t);
@@ -319,7 +325,7 @@ public class ApplicationMaster {
       System.exit(2);
     }
   }
-  
+
   //TODO remove threadPool after adding non-blocking call in TimelineClient
   private static void shutdownAndAwaitTermination() {
     threadPool.shutdown();
@@ -639,7 +645,7 @@ public class ApplicationMaster {
     // resource manager
     int maxMem = response.getMaximumResourceCapability().getMemory();
     LOG.info("Max mem capability of resources in this cluster " + maxMem);
-    
+
     int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
     LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
 
@@ -662,8 +668,12 @@ public class ApplicationMaster {
         response.getContainersFromPreviousAttempts();
     LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
       + " previous attempts' running containers on AM registration.");
+    for(Container container: previousAMRunningContainers) {
+      launchedContainers.add(container.getId());
+    }
     numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
 
+
     int numTotalContainersToRequest =
         numTotalContainers - previousAMRunningContainers.size();
     // Setup ask for containers from RM
@@ -758,7 +768,7 @@ public class ApplicationMaster {
     FinalApplicationStatus appStatus;
     String appMessage = null;
     boolean success = true;
-    if (numFailedContainers.get() == 0 && 
+    if (numFailedContainers.get() == 0 &&
         numCompletedContainers.get() == numTotalContainers) {
       appStatus = FinalApplicationStatus.SUCCEEDED;
     } else {
@@ -777,7 +787,7 @@ public class ApplicationMaster {
     } catch (IOException e) {
       LOG.error("Failed to unregister application", e);
     }
-    
+
     amRMClient.stop();
 
     // Stop Timeline Client
@@ -787,8 +797,9 @@ public class ApplicationMaster {
 
     return success;
   }
-  
-  private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+
+  @VisibleForTesting
+  class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
     @SuppressWarnings("unchecked")
     @Override
     public void onContainersCompleted(List<ContainerStatus> completedContainers) {
@@ -803,6 +814,14 @@ public class ApplicationMaster {
 
         // non complete containers should not be here
         assert (containerStatus.getState() == ContainerState.COMPLETE);
+        // ignore containers we know nothing about - probably from a previous
+        // attempt
+        if (!launchedContainers.contains(containerStatus.getContainerId())) {
+          LOG.info("Ignoring completed status of "
+              + containerStatus.getContainerId()
+              + "; unknown container(probably launched by previous attempt)");
+          continue;
+        }
 
         // increment counters for completed/failed containers
         int exitStatus = containerStatus.getExitStatus();
@@ -838,7 +857,7 @@ public class ApplicationMaster {
           }
         }
       }
-      
+
       // ask for more containers if any failed
       int askCount = numTotalContainers - numRequestedContainers.get();
       numRequestedContainers.addAndGet(askCount);
@@ -849,7 +868,7 @@ public class ApplicationMaster {
           amRMClient.addContainerRequest(containerAsk);
         }
       }
-      
+
       if (numCompletedContainers.get() == numTotalContainers) {
         done = true;
       }
@@ -873,14 +892,13 @@ public class ApplicationMaster {
         // + ", containerToken"
         // +allocatedContainer.getContainerToken().getIdentifier().toString());
 
-        LaunchContainerRunnable runnableLaunchContainer =
-            new LaunchContainerRunnable(allocatedContainer, containerListener);
-        Thread launchThread = new Thread(runnableLaunchContainer);
+        Thread launchThread = createLaunchContainerThread(allocatedContainer);
 
         // launch and start the container on a separate thread to keep
         // the main thread unblocked
         // as all containers may not be allocated at one go.
         launchThreads.add(launchThread);
+        launchedContainers.add(allocatedContainer.getId());
         launchThread.start();
       }
     }
@@ -1160,7 +1178,7 @@ public class ApplicationMaster {
       org.apache.commons.io.IOUtils.closeQuietly(ds);
     }
   }
-  
+
   private static void publishContainerStartEvent(
       final TimelineClient timelineClient, Container container, String domainId,
       UserGroupInformation ugi) {
@@ -1249,7 +1267,8 @@ public class ApplicationMaster {
   private static void publishContainerStartEventOnNewTimelineServiceBase(
       final TimelineClient timelineClient, Container container, String domainId,
       UserGroupInformation ugi) {
-    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
+    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
+        entity =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
     entity.setId(container.getId().toString());
     entity.setType(DSEntity.DS_CONTAINER.toString());
@@ -1277,8 +1296,8 @@ public class ApplicationMaster {
       final String domainId, final UserGroupInformation ugi) {
     Runnable publishWrapper = new Runnable() {
       public void run() {
-          publishContainerEndEventOnNewTimelineServiceBase(timelineClient,
-              container, domainId, ugi);
+        publishContainerEndEventOnNewTimelineServiceBase(timelineClient,
+            container, domainId, ugi);
       }
     };
     threadPool.execute(publishWrapper);
@@ -1287,14 +1306,15 @@ public class ApplicationMaster {
   private static void publishContainerEndEventOnNewTimelineServiceBase(
       final TimelineClient timelineClient, final ContainerStatus container,
       final String domainId, final UserGroupInformation ugi) {
-    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
+    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
+        entity =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
     entity.setId(container.getContainerId().toString());
     entity.setType(DSEntity.DS_CONTAINER.toString());
     //entity.setDomainId(domainId);
     entity.addInfo("user", ugi.getShortUserName());
     org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
-        new  org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
+        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
     event.setTimestamp(System.currentTimeMillis());
     event.setId(DSEvent.DS_CONTAINER_END.toString());
     event.addInfo("State", container.getState().name());
@@ -1325,7 +1345,8 @@ public class ApplicationMaster {
   private static void publishApplicationAttemptEventOnNewTimelineServiceBase(
       final TimelineClient timelineClient, String appAttemptId,
       DSEvent appEvent, String domainId, UserGroupInformation ugi) {
-    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
+    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
+        entity =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
     entity.setId(appAttemptId);
     entity.setType(DSEntity.DS_APP_ATTEMPT.toString());
@@ -1347,4 +1368,29 @@ public class ApplicationMaster {
     }
   }
 
+  RMCallbackHandler getRMCallbackHandler() {
+    return new RMCallbackHandler();
+  }
+
+  @VisibleForTesting
+  void setAmRMClient(AMRMClientAsync client) {
+    this.amRMClient = client;
+  }
+
+  @VisibleForTesting
+  int getNumCompletedContainers() {
+    return numCompletedContainers.get();
+  }
+
+  @VisibleForTesting
+  boolean getDone() {
+    return done;
+  }
+
+  @VisibleForTesting
+  Thread createLaunchContainerThread(Container allocatedContainer) {
+    LaunchContainerRunnable runnableLaunchContainer =
+        new LaunchContainerRunnable(allocatedContainer, containerListener);
+    return new Thread(runnableLaunchContainer);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2057047/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
index 11e840a..0fed14d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
@@ -20,13 +20,143 @@ package org.apache.hadoop.yarn.applications.distributedshell;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A bunch of tests to make sure that the container allocations
+ * and releases occur correctly.
+ */
 public class TestDSAppMaster {
 
+  static class TestAppMaster extends ApplicationMaster {
+    private int threadsLaunched = 0;
+
+    @Override
+    protected Thread createLaunchContainerThread(Container allocatedContainer) {
+      threadsLaunched++;
+      launchedContainers.add(allocatedContainer.getId());
+      return new Thread();
+    }
+
+    void setNumTotalContainers(int numTotalContainers) {
+      this.numTotalContainers = numTotalContainers;
+    }
+
+    int getAllocatedContainers() {
+      return this.numAllocatedContainers.get();
+    }
+
+    @Override
+    void startTimelineClient(final Configuration conf) throws YarnException,
+        IOException, InterruptedException {
+      timelineClient = null;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testDSAppMasterAllocateHandler() throws Exception {
+
+    TestAppMaster master = new TestAppMaster();
+    int targetContainers = 2;
+    AMRMClientAsync mockClient = Mockito.mock(AMRMClientAsync.class);
+    master.setAmRMClient(mockClient);
+    master.setNumTotalContainers(targetContainers);
+    Mockito.doNothing().when(mockClient)
+        .addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
+
+    ApplicationMaster.RMCallbackHandler handler = master.getRMCallbackHandler();
+
+    List<Container> containers = new ArrayList<>(1);
+    ContainerId id1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+    containers.add(generateContainer(id1));
+
+    master.numRequestedContainers.set(targetContainers);
+
+    // first allocate a single container, everything should be fine
+    handler.onContainersAllocated(containers);
+    Assert.assertEquals("Wrong container allocation count", 1,
+        master.getAllocatedContainers());
+    Mockito.verifyZeroInteractions(mockClient);
+    Assert.assertEquals("Incorrect number of threads launched", 1,
+        master.threadsLaunched);
+
+    // now send 3 extra containers
+    containers.clear();
+    ContainerId id2 = BuilderUtils.newContainerId(1, 1, 1, 2);
+    containers.add(generateContainer(id2));
+    ContainerId id3 = BuilderUtils.newContainerId(1, 1, 1, 3);
+    containers.add(generateContainer(id3));
+    ContainerId id4 = BuilderUtils.newContainerId(1, 1, 1, 4);
+    containers.add(generateContainer(id4));
+    handler.onContainersAllocated(containers);
+    Assert.assertEquals("Wrong final container allocation count", 4,
+        master.getAllocatedContainers());
+
+    Assert.assertEquals("Incorrect number of threads launched", 4,
+        master.threadsLaunched);
+
+    // make sure we handle completion events correctly
+    List<ContainerStatus> status = new ArrayList<>();
+    status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS));
+    status.add(generateContainerStatus(id2, ContainerExitStatus.SUCCESS));
+    status.add(generateContainerStatus(id3, ContainerExitStatus.ABORTED));
+    status.add(generateContainerStatus(id4, ContainerExitStatus.ABORTED));
+    handler.onContainersCompleted(status);
+
+    Assert.assertEquals("Unexpected number of completed containers",
+        targetContainers, master.getNumCompletedContainers());
+    Assert.assertTrue("Master didn't finish containers as expected",
+        master.getDone());
+
+    // test for events from containers we know nothing about
+    // these events should be ignored
+    status = new ArrayList<>();
+    ContainerId id5 = BuilderUtils.newContainerId(1, 1, 1, 5);
+    status.add(generateContainerStatus(id5, ContainerExitStatus.ABORTED));
+    Assert.assertEquals("Unexpected number of completed containers",
+        targetContainers, master.getNumCompletedContainers());
+    Assert.assertTrue("Master didn't finish containers as expected",
+        master.getDone());
+    status.add(generateContainerStatus(id5, ContainerExitStatus.SUCCESS));
+    Assert.assertEquals("Unexpected number of completed containers",
+        targetContainers, master.getNumCompletedContainers());
+    Assert.assertTrue("Master didn't finish containers as expected",
+        master.getDone());
+  }
+
+  private Container generateContainer(ContainerId cid) {
+    return Container.newInstance(cid, NodeId.newInstance("host", 5000),
+      "host:80", Resource.newInstance(1024, 1), Priority.newInstance(0), null);
+  }
+
+  private ContainerStatus
+      generateContainerStatus(ContainerId id, int exitStatus) {
+    return ContainerStatus.newInstance(id, ContainerState.COMPLETE, "",
+      exitStatus);
+  }
+
   @Test
   public void testTimelineClientInDSAppMaster() throws Exception {
     ApplicationMaster appMaster = new ApplicationMaster();


Mime
View raw message