hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1229395 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ hadoop-mapreduce-client/hadoop-mapreduce-client-ap...
Date Mon, 09 Jan 2012 22:21:42 GMT
Author: sseth
Date: Mon Jan  9 22:21:42 2012
New Revision: 1229395

URL: http://svn.apache.org/viewvc?rev=1229395&view=rev
Log:
merge MAPREDUCE-3616 from trunk

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/
      - copied from r1229394, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
      - copied unchanged from r1229394, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
Removed:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1229395&r1=1229394&r2=1229395&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Mon Jan  9 22:21:42
2012
@@ -376,6 +376,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3624. Remove unnecessary dependency on JDK's tools.jar. (mahadev
     via acmurthy)
 
+    MAPREDUCE-3616. Thread pool for launching containers in MR AM not
+    expanding as expected. (vinodkv via sseth)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1229395&r1=1229394&r2=1229395&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
Mon Jan  9 22:21:42 2012
@@ -76,8 +76,8 @@ public class ContainerLauncherImpl exten
   int nmTimeOut;
 
   private AppContext context;
-  private ThreadPoolExecutor launcherPool;
-  private static final int INITIAL_POOL_SIZE = 10;
+  protected ThreadPoolExecutor launcherPool;
+  protected static final int INITIAL_POOL_SIZE = 10;
   private int limitOnPoolSize;
   private Thread eventHandlingThread;
   private BlockingQueue<ContainerLauncherEvent> eventQueue =
@@ -102,6 +102,7 @@ public class ContainerLauncherImpl exten
     this.limitOnPoolSize = conf.getInt(
         MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
+    LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
     this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT,
         ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT);
     this.rpc = YarnRPC.create(conf);
@@ -141,20 +142,21 @@ public class ContainerLauncherImpl exten
             int numNodes = allNodes.size();
             int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
 
-            if (poolSize <= idealPoolSize) {
+            if (poolSize < idealPoolSize) {
               // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
               // later is just a buffer so we are not always increasing the
               // pool-size
-              int newPoolSize = idealPoolSize + INITIAL_POOL_SIZE;
-              LOG.info("Setting ContainerLauncher pool size to "
-                  + newPoolSize);
+              int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize
+                  + INITIAL_POOL_SIZE);
+              LOG.info("Setting ContainerLauncher pool size to " + newPoolSize
+                  + " as number-of-nodes to talk to is " + numNodes);
               launcherPool.setCorePoolSize(newPoolSize);
             }
           }
 
           // the events from the queue are handled in parallel
           // using a thread pool
-          launcherPool.execute(new EventProcessor(event));
+          launcherPool.execute(createEventProcessor(event));
 
           // TODO: Group launching of multiple containers to a single
           // NodeManager into a single connection
@@ -172,14 +174,16 @@ public class ContainerLauncherImpl exten
     super.stop();
   }
 
+  protected EventProcessor createEventProcessor(ContainerLauncherEvent event) {
+    return new EventProcessor(event);
+  }
+
   protected ContainerManager getCMProxy(ContainerId containerID,
       final String containerManagerBindAddr, ContainerToken containerToken)
       throws IOException {
 
     UserGroupInformation user = UserGroupInformation.getCurrentUser();
 
-    this.allNodes.add(containerManagerBindAddr);
-
     if (UserGroupInformation.isSecurityEnabled()) {
       Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
           containerToken.getIdentifier().array(), containerToken
@@ -244,7 +248,7 @@ public class ContainerLauncherImpl exten
   /**
    * Setup and start the container on remote nodemanager.
    */
-  private class EventProcessor implements Runnable {
+  class EventProcessor implements Runnable {
     private ContainerLauncherEvent event;
 
     EventProcessor(ContainerLauncherEvent event) {
@@ -280,7 +284,7 @@ public class ContainerLauncherImpl exten
           proxy = getCMProxy(containerID, containerManagerBindAddr,
               containerToken);
 
-          // Interruped during getProxy, but that didn't throw exception
+          // Interrupted during getProxy, but that didn't throw exception
           if (Thread.interrupted()) {
             // The timer cancelled the command in the mean while.
             String message = "Container launch failed for " + containerID
@@ -438,6 +442,7 @@ public class ContainerLauncherImpl exten
   public void handle(ContainerLauncherEvent event) {
     try {
       eventQueue.put(event);
+      this.allNodes.add(event.getContainerMgrAddress());
     } catch (InterruptedException e) {
       throw new YarnException(e);
     }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1229395&r1=1229394&r2=1229395&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
Mon Jan  9 22:21:42 2012
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URI;
+import java.net.URL;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -181,23 +182,31 @@ public class MRApps extends Apps {
       String mrAppGeneratedClasspathFile = "mrapp-generated-classpath";
       classpathFileStream =
           thisClassLoader.getResourceAsStream(mrAppGeneratedClasspathFile);
+
       // Put the file itself on classpath for tasks.
-      String classpathElement = thisClassLoader.getResource(mrAppGeneratedClasspathFile).getFile();
-      if (classpathElement.contains("!")) {
-        classpathElement = classpathElement.substring(0, classpathElement.indexOf("!"));
+      URL classpathResource = thisClassLoader
+        .getResource(mrAppGeneratedClasspathFile);
+      if (classpathResource != null) {
+        String classpathElement = classpathResource.getFile();
+        if (classpathElement.contains("!")) {
+          classpathElement = classpathElement.substring(0,
+            classpathElement.indexOf("!"));
+        } else {
+          classpathElement = new File(classpathElement).getParent();
+        }
+        Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+          classpathElement);
       }
-      else {
-        classpathElement = new File(classpathElement).getParent();
+
+      if (classpathFileStream != null) {
+        reader = new BufferedReader(new InputStreamReader(classpathFileStream));
+        String cp = reader.readLine();
+        if (cp != null) {
+          Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+            cp.trim());
+        }
       }
-      Apps.addToEnvironment(
-          environment,
-          Environment.CLASSPATH.name(), classpathElement);
-
-      reader = new BufferedReader(new InputStreamReader(classpathFileStream));
-      String cp = reader.readLine();
-      if (cp != null) {
-        Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), cp.trim());
-      }      
+
       // Add standard Hadoop classes
       for (String c : ApplicationConstants.APPLICATION_CLASSPATH) {
         Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c);



Mime
View raw message