hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1189880 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ hadoop-mapreduce-client/hadoop-mapreduce-client-ap...
Date Thu, 27 Oct 2011 17:32:40 GMT
Author: acmurthy
Date: Thu Oct 27 17:32:39 2011
New Revision: 1189880

URL: http://svn.apache.org/viewvc?rev=1189880&view=rev
Log:
Merge -c 1189879 from trunk to branch-0.23 to fix MAPREDUCE-3228.

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java
      - copied unchanged from r1189879, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1189880&r1=1189879&r2=1189880&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Oct 27 17:32:39
2011
@@ -1752,6 +1752,9 @@ Release 0.23.0 - Unreleased
 
     MAPREDUCE-3281. Fixed a bug in TestLinuxContainerExecutorWithMocks. (vinodkv)
 
+    MAPREDUCE-3228. Fixed MR AM to timeout RPCs to bad NodeManagers. (vinodkv
+    via acmurthy)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java?rev=1189880&r1=1189879&r2=1189880&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
Thu Oct 27 17:32:39 2011
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app.launcher;
 
 
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.yarn.event.EventHandler;
 
 public interface ContainerLauncher 
@@ -28,4 +29,12 @@ public interface ContainerLauncher 
     CONTAINER_REMOTE_LAUNCH,
     CONTAINER_REMOTE_CLEANUP
   }
+
+  // Not a documented config. Only used for tests
+  static final String MR_AM_NM_COMMAND_TIMEOUT = MRJobConfig.MR_AM_PREFIX
+      + "nm-command-timeout";
+  /**
+   *  Maximum of 1 minute timeout for a Node to react to the command
+   */
+  static final int DEFAULT_NM__COMMAND_TIMEOUT = 60000;
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1189880&r1=1189879&r2=1189880&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
Thu Oct 27 17:32:39 2011
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -69,7 +71,9 @@ import com.google.common.util.concurrent
 public class ContainerLauncherImpl extends AbstractService implements
     ContainerLauncher {
 
-  private static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
+  static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
+
+  int nmTimeOut;
 
   private AppContext context;
   private ThreadPoolExecutor launcherPool;
@@ -95,14 +99,17 @@ public class ContainerLauncherImpl exten
     this.limitOnPoolSize = conf.getInt(
         MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
+    this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT,
+        ContainerLauncher.DEFAULT_NM__COMMAND_TIMEOUT);
     super.init(conf);
   }
 
   public void start() {
+
+    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+        "ContainerLauncher #%d").setDaemon(true).build();
+
     // Start with a default core-pool size of 10 and change it dynamically.
-    ThreadFactory tf = new ThreadFactoryBuilder()
-      .setNameFormat("ContainerLauncher #%d")
-      .build();
     launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE,
         Integer.MAX_VALUE, 1, TimeUnit.HOURS,
         new LinkedBlockingQueue<Runnable>(),
@@ -156,11 +163,11 @@ public class ContainerLauncherImpl exten
 
   public void stop() {
     eventHandlingThread.interrupt();
-    launcherPool.shutdown();
+    launcherPool.shutdownNow();
     super.stop();
   }
 
-  protected ContainerManager getCMProxy(ContainerId containerID,
+  protected ContainerManager getCMProxy(
       final String containerManagerBindAddr, ContainerToken containerToken)
       throws IOException {
 
@@ -193,6 +200,27 @@ public class ContainerLauncherImpl exten
     return proxy;
   }
 
+  private static class CommandTimer extends TimerTask {
+    private final Thread commandThread;
+    protected final ContainerLauncherEvent event;
+    protected final String message;
+
+    public CommandTimer(Thread thread, ContainerLauncherEvent event) {
+      this.commandThread = thread;
+      this.event = event;
+      this.message = "Couldn't complete " + event.getType() + " on "
+          + event.getContainerID() + "/" + event.getTaskAttemptID()
+          + ". Interrupting and returning";
+    }
+
+    
+    @Override
+    public void run() {
+      LOG.warn(this.message);
+      this.commandThread.interrupt();
+    }
+  }
+
   /**
    * Setup and start the container on remote nodemanager.
    */
@@ -213,27 +241,53 @@ public class ContainerLauncherImpl exten
       final String containerManagerBindAddr = event.getContainerMgrAddress();
       ContainerId containerID = event.getContainerID();
       ContainerToken containerToken = event.getContainerToken();
+      TaskAttemptId taskAttemptID = event.getTaskAttemptID();
+
+      Timer timer = new Timer(true);
 
       switch(event.getType()) {
 
       case CONTAINER_REMOTE_LAUNCH:
-        ContainerRemoteLaunchEvent launchEv = (ContainerRemoteLaunchEvent) event;
+        ContainerRemoteLaunchEvent launchEvent
+            = (ContainerRemoteLaunchEvent) event;
 
-        TaskAttemptId taskAttemptID = launchEv.getTaskAttemptID();
         try {
-          
-          ContainerManager proxy = 
-            getCMProxy(containerID, containerManagerBindAddr, containerToken);
-          
+          timer.schedule(new CommandTimer(Thread.currentThread(), event),
+              nmTimeOut);
+
+          ContainerManager proxy = getCMProxy(containerManagerBindAddr,
+              containerToken);
+
+          // Interruped during getProxy, but that didn't throw exception
+          if (Thread.currentThread().isInterrupted()) {
+            // The timer cancelled the command in the mean while.
+            String message = "Start-container for " + event.getContainerID()
+                + " got interrupted. Returning.";
+            sendContainerLaunchFailedMsg(taskAttemptID, message);
+            return;
+          }
+
           // Construct the actual Container
           ContainerLaunchContext containerLaunchContext =
-              launchEv.getContainer();
+              launchEvent.getContainer();
 
           // Now launch the actual container
           StartContainerRequest startRequest = recordFactory
               .newRecordInstance(StartContainerRequest.class);
           startRequest.setContainerLaunchContext(containerLaunchContext);
           StartContainerResponse response = proxy.startContainer(startRequest);
+
+          // container started properly. Stop the timer
+          timer.cancel();
+          if (Thread.currentThread().isInterrupted()) {
+            // The timer cancelled the command in the mean while, but
+            // startContainer didn't throw exception
+            String message = "Start-container for " + event.getContainerID()
+                + " got interrupted. Returning.";
+            sendContainerLaunchFailedMsg(taskAttemptID, message);
+            return;
+          }
+
           ByteBuffer portInfo = response
               .getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
           int port = -1;
@@ -255,12 +309,9 @@ public class ContainerLauncherImpl exten
         } catch (Throwable t) {
           String message = "Container launch failed for " + containerID
               + " : " + StringUtils.stringifyException(t);
-          LOG.error(message);
-          context.getEventHandler().handle(
-              new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
-          context.getEventHandler().handle(
-              new TaskAttemptEvent(taskAttemptID,
-                  TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
+          sendContainerLaunchFailedMsg(taskAttemptID, message);
+        } finally {
+          timer.cancel();
         }
 
         break;
@@ -272,24 +323,44 @@ public class ContainerLauncherImpl exten
           eventQueue.remove(event); // TODO: Any synchro needed?
           //deallocate the container
           context.getEventHandler().handle(
-              new ContainerAllocatorEvent(event.getTaskAttemptID(),
-              ContainerAllocator.EventType.CONTAINER_DEALLOCATE));
+              new ContainerAllocatorEvent(taskAttemptID,
+                  ContainerAllocator.EventType.CONTAINER_DEALLOCATE));
         } else {
+
           try {
-            ContainerManager proxy = 
-              getCMProxy(containerID, containerManagerBindAddr, containerToken);
-            // TODO:check whether container is launched
-
-            // kill the remote container if already launched
-            StopContainerRequest stopRequest = recordFactory
-                .newRecordInstance(StopContainerRequest.class);
-            stopRequest.setContainerId(event.getContainerID());
-            proxy.stopContainer(stopRequest);
+            timer.schedule(new CommandTimer(Thread.currentThread(), event),
+                nmTimeOut);
+
+            ContainerManager proxy = getCMProxy(containerManagerBindAddr,
+                containerToken);
 
+            if (Thread.currentThread().isInterrupted()) {
+              // The timer cancelled the command in the mean while. No need to
+              // return, send cleanedup event anyways.
+              LOG.info("Stop-container for " + event.getContainerID()
+                  + " got interrupted.");
+            } else {
+
+              // TODO:check whether container is launched
+
+              // kill the remote container if already launched
+              StopContainerRequest stopRequest = recordFactory
+                  .newRecordInstance(StopContainerRequest.class);
+              stopRequest.setContainerId(event.getContainerID());
+              proxy.stopContainer(stopRequest);
+            }
           } catch (Throwable t) {
-            //ignore the cleanup failure
-            LOG.warn("cleanup failed for container " + event.getContainerID() ,
-                t);
+            // ignore the cleanup failure
+            String message = "cleanup failed for container "
+                + event.getContainerID() + " : "
+                + StringUtils.stringifyException(t);
+            context.getEventHandler()
+                .handle(
+                    new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
+                        message));
+            LOG.warn(message);
+          } finally {
+            timer.cancel();
           }
 
           // after killing, send killed event to taskattempt
@@ -300,7 +371,17 @@ public class ContainerLauncherImpl exten
         break;
       }
     }
-    
+  }
+
+  @SuppressWarnings("unchecked")
+  void sendContainerLaunchFailedMsg(TaskAttemptId taskAttemptID,
+      String message) {
+    LOG.error(message);
+    context.getEventHandler().handle(
+        new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(taskAttemptID,
+            TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1189880&r1=1189879&r2=1189880&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
Thu Oct 27 17:32:39 2011
@@ -41,7 +41,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
 import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.junit.Test;
 
@@ -219,7 +218,7 @@ public class TestFail {
         }
 
         @Override
-        protected ContainerManager getCMProxy(ContainerId containerID,
+        protected ContainerManager getCMProxy(
             String containerManagerBindAddr, ContainerToken containerToken)
             throws IOException {
           try {



Mime
View raw message