hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject svn commit: r1309045 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...
Date Tue, 03 Apr 2012 17:04:45 GMT
Author: bobby
Date: Tue Apr  3 17:04:44 2012
New Revision: 1309045

URL: http://svn.apache.org/viewvc?rev=1309045&view=rev
Log:
MAPREDUCE-4062. AM Launcher thread can hang forever (tgraves via bobby)

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java?rev=1309045&r1=1309044&r2=1309045&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
Tue Apr  3 17:04:44 2012
@@ -30,11 +30,4 @@ public interface ContainerLauncher 
     CONTAINER_REMOTE_CLEANUP
   }
 
-  // Not a documented config. Only used for tests
-  static final String MR_AM_NM_COMMAND_TIMEOUT = MRJobConfig.MR_AM_PREFIX
-      + "nm-command-timeout";
-  /**
-   *  Maximum of 1 minute timeout for a Node to react to the command
-   */
-  static final int DEFAULT_NM_COMMAND_TIMEOUT = 60000;
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1309045&r1=1309044&r2=1309045&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
Tue Apr  3 17:04:44 2012
@@ -23,8 +23,6 @@ import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -72,8 +70,6 @@ public class ContainerLauncherImpl exten
 
   static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
 
-  int nmTimeOut;
-
   private ConcurrentHashMap<ContainerId, Container> containers = 
     new ConcurrentHashMap<ContainerId, Container>(); 
   private AppContext context;
@@ -83,7 +79,6 @@ public class ContainerLauncherImpl exten
   private Thread eventHandlingThread;
   protected BlockingQueue<ContainerLauncherEvent> eventQueue =
       new LinkedBlockingQueue<ContainerLauncherEvent>();
-  final Timer commandTimer = new Timer(true);
   YarnRPC rpc;
 
   private Container getContainer(ContainerId id) {
@@ -130,30 +125,18 @@ public class ContainerLauncherImpl exten
             "Container was killed before it was launched");
         return;
       }
-      CommandTimerTask timerTask = new CommandTimerTask(Thread
-          .currentThread(), event);
       
+
       final String containerManagerBindAddr = event.getContainerMgrAddress();
       ContainerId containerID = event.getContainerID();
       ContainerToken containerToken = event.getContainerToken();
 
       ContainerManager proxy = null;
       try {
-        commandTimer.schedule(timerTask, nmTimeOut);
 
         proxy = getCMProxy(containerID, containerManagerBindAddr,
             containerToken);
 
-        // Interrupted during getProxy, but that didn't throw exception
-        if (Thread.interrupted()) {
-          // The timer canceled the command in the mean while.
-          String message = "Container launch failed for " + containerID
-              + " : Start-container for " + event.getContainerID()
-              + " got interrupted. Returning.";
-          this.state = ContainerState.FAILED;
-          sendContainerLaunchFailedMsg(taskAttemptID, message);
-          return;
-        }
         // Construct the actual Container
         ContainerLaunchContext containerLaunchContext =
           event.getContainer();
@@ -164,19 +147,6 @@ public class ContainerLauncherImpl exten
         startRequest.setContainerLaunchContext(containerLaunchContext);
         StartContainerResponse response = proxy.startContainer(startRequest);
 
-        // container started properly. Stop the timer
-        timerTask.cancel();
-        if (Thread.interrupted()) {
-          // The timer canceled the command in the mean while, but
-          // startContainer didn't throw exception
-          String message = "Container launch failed for " + containerID
-              + " : Start-container for " + event.getContainerID()
-              + " got interrupted. Returning.";
-          this.state = ContainerState.FAILED;
-          sendContainerLaunchFailedMsg(taskAttemptID, message);
-          return;
-        }
-
         ByteBuffer portInfo = response
           .getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
         int port = -1;
@@ -198,17 +168,11 @@ public class ContainerLauncherImpl exten
             new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
         this.state = ContainerState.RUNNING;
       } catch (Throwable t) {
-        if (Thread.interrupted()) {
-          // The timer canceled the command in the mean while.
-          LOG.info("Start-container for " + event.getContainerID()
-              + " got interrupted.");
-        }
         String message = "Container launch failed for " + containerID + " : "
             + StringUtils.stringifyException(t);
         this.state = ContainerState.FAILED;
         sendContainerLaunchFailedMsg(taskAttemptID, message);
       } finally {
-        timerTask.cancel();
         if (proxy != null) {
           ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
         }
@@ -220,41 +184,24 @@ public class ContainerLauncherImpl exten
       if(this.state == ContainerState.PREP) {
         this.state = ContainerState.KILLED_BEFORE_LAUNCH;
       } else {
-        CommandTimerTask timerTask = new CommandTimerTask(Thread
-            .currentThread(), event);
-
         final String containerManagerBindAddr = event.getContainerMgrAddress();
         ContainerId containerID = event.getContainerID();
         ContainerToken containerToken = event.getContainerToken();
         TaskAttemptId taskAttemptID = event.getTaskAttemptID();
         LOG.info("KILLING " + taskAttemptID);
-        commandTimer.schedule(timerTask, nmTimeOut);
 
         ContainerManager proxy = null;
         try {
           proxy = getCMProxy(containerID, containerManagerBindAddr,
               containerToken);
 
-          if (Thread.interrupted()) {
-            // The timer canceled the command in the mean while. No need to
-            // return, send cleaned up event anyways.
-            LOG.info("Stop-container for " + event.getContainerID()
-                + " got interrupted.");
-          } else {
             // kill the remote container if already launched
             StopContainerRequest stopRequest = Records
               .newRecord(StopContainerRequest.class);
             stopRequest.setContainerId(event.getContainerID());
             proxy.stopContainer(stopRequest);
-          }
-        } catch (Throwable t) {
 
-          if (Thread.interrupted()) {
-            // The timer canceled the command in the mean while, clear the
-            // interrupt flag
-            LOG.info("Stop-container for " + event.getContainerID()
-                + " got interrupted.");
-          }
+        } catch (Throwable t) {
 
           // ignore the cleanup failure
           String message = "cleanup failed for container "
@@ -264,15 +211,6 @@ public class ContainerLauncherImpl exten
             new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
           LOG.warn(message);
         } finally {
-          timerTask.cancel();
-          if (Thread.interrupted()) {
-            LOG.info("Stop-container for " + event.getContainerID()
-                + " got interrupted.");
-            // ignore the cleanup failure
-            context.getEventHandler().handle(
-              new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
-                "cleanup failed for container " + event.getContainerID()));
-          }
           if (proxy != null) {
             ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
           }
@@ -303,8 +241,6 @@ public class ContainerLauncherImpl exten
         MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
     LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
-    this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT,
-        ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT);
     this.rpc = createYarnRPC(conf);
     super.init(conf);
   }
@@ -409,44 +345,6 @@ public class ContainerLauncherImpl exten
     return proxy;
   }
 
-  private static class CommandTimerTask extends TimerTask {
-    private final Thread commandThread;
-    protected final String message;
-    private boolean cancelled = false;
-
-    public CommandTimerTask(Thread thread, ContainerLauncherEvent event) {
-      super();
-      this.commandThread = thread;
-      this.message = "Couldn't complete " + event.getType() + " on "
-          + event.getContainerID() + "/" + event.getTaskAttemptID()
-          + ". Interrupting and returning";
-    }
-
-    @Override
-    public void run() {
-      synchronized (this) {
-        if (this.cancelled) {
-          return;
-        }
-        LOG.warn(this.message);
-        StackTraceElement[] trace = this.commandThread.getStackTrace();
-        StringBuilder logMsg = new StringBuilder();
-        for (int i = 0; i < trace.length; i++) {
-          logMsg.append("\n\tat " + trace[i]);
-        }
-        LOG.info("Stack trace of the command-thread: \n" + logMsg.toString());
-        this.commandThread.interrupt();
-      }
-    }
-
-    @Override
-    public boolean cancel() {
-      synchronized (this) {
-        this.cancelled = true;
-        return super.cancel();
-      }
-    }
-  }
 
   /**
    * Setup and start the container on remote nodemanager.

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java?rev=1309045&r1=1309044&r2=1309045&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
Tue Apr  3 17:04:44 2012
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.v2.a
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -30,6 +32,7 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -44,18 +47,39 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.factory.providers.YarnRemoteExceptionFactoryProvider;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
 
 public class TestContainerLauncher {
 
-  static final Log LOG = LogFactory
-      .getLog(TestContainerLauncher.class);
+  private static final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+  Configuration conf;
+  Server server;
+
+  static final Log LOG = LogFactory.getLog(TestContainerLauncher.class);
 
   @Test
   public void testPoolSize() throws InterruptedException {
@@ -104,10 +128,10 @@ public class TestContainerLauncher {
     Assert.assertEquals(10, containerLauncher.numEventsProcessed.get());
     containerLauncher.finishEventHandling = false;
     for (int i = 0; i < 10; i++) {
-      ContainerId containerId =
-          BuilderUtils.newContainerId(appAttemptId, i + 10);
-      TaskAttemptId taskAttemptId =
-          MRBuilderUtils.newTaskAttemptId(taskId, i + 10);
+      ContainerId containerId = BuilderUtils.newContainerId(appAttemptId,
+          i + 10);
+      TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId,
+          i + 10);
       containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
         containerId, "host" + i + ":1234", null,
         ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
@@ -119,8 +143,7 @@ public class TestContainerLauncher {
     // Different hosts, there should be an increase in core-thread-pool size to
     // 21(11hosts+10buffer)
     // Core pool size should be 21 but the live pool size should be only 11.
-    containerLauncher.expectedCorePoolSize =
-        11 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
+    containerLauncher.expectedCorePoolSize = 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
     containerLauncher.finishEventHandling = false;
     ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 21);
     TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21);
@@ -200,26 +223,28 @@ public class TestContainerLauncher {
 
   @Test
   public void testSlowNM() throws Exception {
-    test(false);
-  }
-
-  @Test
-  public void testSlowNMWithInterruptsSwallowed() throws Exception {
-    test(true);
+    test();
   }
 
-  private void test(boolean swallowInterrupts) throws Exception {
+  private void test() throws Exception {
 
-    MRApp app = new MRAppWithSlowNM(swallowInterrupts);
-
-    Configuration conf = new Configuration();
+    conf = new Configuration();
     int maxAttempts = 1;
     conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
     conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    // set timeout low for the test
+    conf.setInt("yarn.rpc.nm-command-timeout", 3000);
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class.getName());
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    server = rpc.getServer(ContainerManager.class, new DummyContainerManager(),
+        addr, conf, null, 1);
+    server.start();
 
-    // Set low timeout for NM commands
-    conf.setInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, 3000);
+    MRApp app = new MRAppWithSlowNM();
 
+    try {
     Job job = app.submit(conf);
     app.waitForState(job, JobState.RUNNING);
 
@@ -231,8 +256,8 @@ public class TestContainerLauncher {
 
     Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator()
         .next().getAttempts();
-    Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
-        .size());
+      Assert.assertEquals("Num attempts is not correct", maxAttempts,
+          attempts.size());
 
     TaskAttempt attempt = attempts.values().iterator().next();
     app.waitForState(attempt, TaskAttemptState.ASSIGNED);
@@ -241,20 +266,18 @@ public class TestContainerLauncher {
 
     String diagnostics = attempt.getDiagnostics().toString();
     LOG.info("attempt.getDiagnostics: " + diagnostics);
-    if (swallowInterrupts) {
-      Assert.assertEquals("[Container launch failed for "
-          + "container_0_0000_01_000000 : Start-container for "
-          + "container_0_0000_01_000000 got interrupted. Returning.]",
-          diagnostics);
-    } else {
+
       Assert.assertTrue(diagnostics.contains("Container launch failed for "
           + "container_0_0000_01_000000 : "));
-      Assert.assertTrue(diagnostics
-          .contains(": java.lang.InterruptedException"));
-    }
+      Assert
+          .assertTrue(diagnostics
+              .contains("java.net.SocketTimeoutException: 3000 millis timeout while waiting
for channel"));
 
+    } finally {
+      server.stop();
     app.stop();
   }
+  }
 
   private final class CustomContainerLauncher extends ContainerLauncherImpl {
 
@@ -317,13 +340,10 @@ public class TestContainerLauncher {
     }
   }
 
-  private static class MRAppWithSlowNM extends MRApp {
-
-    final boolean swallowInterrupts;
+  private class MRAppWithSlowNM extends MRApp {
 
-    public MRAppWithSlowNM(boolean swallowInterrupts) {
+    public MRAppWithSlowNM() {
       super(1, 0, false, "TestContainerLauncher", true);
-      this.swallowInterrupts = swallowInterrupts;
     }
 
     @Override
@@ -333,20 +353,57 @@ public class TestContainerLauncher {
         protected ContainerManager getCMProxy(ContainerId containerID,
             String containerManagerBindAddr, ContainerToken containerToken)
             throws IOException {
+          // make proxy connect to our local containerManager server
+          ContainerManager proxy = (ContainerManager) rpc.getProxy(
+              ContainerManager.class,
+              NetUtils.createSocketAddr("localhost:" + server.getPort()), conf);
+          return proxy;
+        }
+      };
+
+    };
+  }
+
+  public class DummyContainerManager implements ContainerManager {
+
+    private ContainerStatus status = null;
+
+    @Override
+    public GetContainerStatusResponse getContainerStatus(
+        GetContainerStatusRequest request) throws YarnRemoteException {
+      GetContainerStatusResponse response = recordFactory
+          .newRecordInstance(GetContainerStatusResponse.class);
+      response.setStatus(status);
+      return response;
+    }
+
+    @Override
+    public StartContainerResponse startContainer(StartContainerRequest request)
+        throws YarnRemoteException {
+      ContainerLaunchContext container = request.getContainerLaunchContext();
+      StartContainerResponse response = recordFactory
+          .newRecordInstance(StartContainerResponse.class);
+      status = recordFactory.newRecordInstance(ContainerStatus.class);
           try {
-            synchronized (this) {
-              wait(); // Just hang the thread simulating a very slow NM.
+        // make the thread sleep to look like its not going to respond
+        Thread.sleep(15000);
+      } catch (Exception e) {
+        LOG.error(e);
+        throw new UndeclaredThrowableException(e);
             }
-          } catch (InterruptedException e) {
-            LOG.info(e);
-            if (!MRAppWithSlowNM.this.swallowInterrupts) {
-              throw new IOException(e);
+      status.setState(ContainerState.RUNNING);
+      status.setContainerId(container.getContainerId());
+      status.setExitStatus(0);
+      return response;
             }
-            Thread.currentThread().interrupt();
+
+    @Override
+    public StopContainerResponse stopContainer(StopContainerRequest request)
+        throws YarnRemoteException {
+      Exception e = new Exception("Dummy function", new Exception(
+          "Dummy function cause"));
+      throw YarnRemoteExceptionFactoryProvider.getYarnRemoteExceptionFactory(
+          null).createYarnRemoteException(e);
           }
-          return null;
         }
-      };
-    };
   }
-}

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java?rev=1309045&r1=1309044&r2=1309045&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java
Tue Apr  3 17:04:44 2012
@@ -24,6 +24,8 @@ import java.net.InetSocketAddress;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
@@ -37,6 +39,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerResponsePBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine;
 import org.apache.hadoop.yarn.proto.ContainerManager.ContainerManagerService;
@@ -48,12 +51,24 @@ import com.google.protobuf.ServiceExcept
 
 public class ContainerManagerPBClientImpl implements ContainerManager {
 
+  // Not a documented config. Only used for tests
+  static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX
+      + "rpc.nm-command-timeout";
+  /**
+   *  Maximum of 1 minute timeout for a Node to react to the command
+   */
+  static final int DEFAULT_COMMAND_TIMEOUT = 60000;
+
   private ContainerManagerService.BlockingInterface proxy;
   
   public ContainerManagerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration
conf) throws IOException {
     RPC.setProtocolEngine(conf, ContainerManagerService.BlockingInterface.class, ProtoOverHadoopRpcEngine.class);
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+    int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT);
     proxy = (ContainerManagerService.BlockingInterface)RPC.getProxy(
-        ContainerManagerService.BlockingInterface.class, clientVersion, addr, conf);
+        ContainerManagerService.BlockingInterface.class, clientVersion, addr, ugi, conf,
+         NetUtils.getDefaultSocketFactory(conf), expireIntvl);
   }
 
   public void close() {

Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java?rev=1309045&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
(added)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
Tue Apr  3 17:04:44 2012
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.factory.providers.YarnRemoteExceptionFactoryProvider;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.junit.Test;
+
+/*
+ * Test that the container launcher rpc times out properly. This is used
+ * by both RM to launch an AM as well as an AM to launch containers.
+ */
+public class TestContainerLaunchRPC {
+
+  static final Log LOG = LogFactory.getLog(TestContainerLaunchRPC.class);
+
+  private static final String EXCEPTION_CAUSE = "java.net.SocketTimeoutException";
+  private static final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  @Test
+  public void testHadoopProtoRPCTimeout() throws Exception {
+    testRPCTimeout(HadoopYarnProtoRPC.class.getName());
+  }
+
+  private void testRPCTimeout(String rpcClass) throws Exception {
+    Configuration conf = new Configuration();
+    // set timeout low for the test
+    conf.setInt("yarn.rpc.nm-command-timeout", 3000);
+
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass);
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    Server server = rpc.getServer(ContainerManager.class,
+        new DummyContainerManager(), addr, conf, null, 1);
+    server.start();
+    try {
+
+      ContainerManager proxy = (ContainerManager) rpc.getProxy(
+          ContainerManager.class,
+          NetUtils.createSocketAddr("localhost:" + server.getPort()), conf);
+      ContainerLaunchContext containerLaunchContext = recordFactory
+          .newRecordInstance(ContainerLaunchContext.class);
+      containerLaunchContext.setUser("dummy-user");
+      ContainerId containerId = recordFactory
+          .newRecordInstance(ContainerId.class);
+      ApplicationId applicationId = recordFactory
+          .newRecordInstance(ApplicationId.class);
+      ApplicationAttemptId applicationAttemptId = recordFactory
+          .newRecordInstance(ApplicationAttemptId.class);
+      applicationId.setClusterTimestamp(0);
+      applicationId.setId(0);
+      applicationAttemptId.setApplicationId(applicationId);
+      applicationAttemptId.setAttemptId(0);
+      containerId.setApplicationAttemptId(applicationAttemptId);
+      containerId.setId(100);
+      containerLaunchContext.setContainerId(containerId);
+      containerLaunchContext.setResource(recordFactory
+          .newRecordInstance(Resource.class));
+
+      StartContainerRequest scRequest = recordFactory
+          .newRecordInstance(StartContainerRequest.class);
+      scRequest.setContainerLaunchContext(containerLaunchContext);
+      try {
+        proxy.startContainer(scRequest);
+      } catch (Exception e) {
+        LOG.info(StringUtils.stringifyException(e));
+        Assert.assertTrue("Error, exception does not contain: "
+            + EXCEPTION_CAUSE,
+            e.getCause().getMessage().contains(EXCEPTION_CAUSE));
+
+        return;
+      }
+    } finally {
+      server.stop();
+    }
+
+    Assert.fail("timeout exception should have occurred!");
+  }
+
+  public class DummyContainerManager implements ContainerManager {
+
+    private ContainerStatus status = null;
+
+    @Override
+    public GetContainerStatusResponse getContainerStatus(
+        GetContainerStatusRequest request) throws YarnRemoteException {
+      GetContainerStatusResponse response = recordFactory
+          .newRecordInstance(GetContainerStatusResponse.class);
+      response.setStatus(status);
+      return response;
+    }
+
+    @Override
+    public StartContainerResponse startContainer(StartContainerRequest request)
+        throws YarnRemoteException {
+      ContainerLaunchContext container = request.getContainerLaunchContext();
+      StartContainerResponse response = recordFactory
+          .newRecordInstance(StartContainerResponse.class);
+      status = recordFactory.newRecordInstance(ContainerStatus.class);
+      try {
+        // make the thread sleep to look like its not going to respond
+        Thread.sleep(10000);
+      } catch (Exception e) {
+        LOG.error(e);
+        throw new UndeclaredThrowableException(e);
+      }
+      status.setState(ContainerState.RUNNING);
+      status.setContainerId(container.getContainerId());
+      status.setExitStatus(0);
+      return response;
+    }
+
+    @Override
+    public StopContainerResponse stopContainer(StopContainerRequest request)
+        throws YarnRemoteException {
+      Exception e = new Exception("Dummy function", new Exception(
+          "Dummy function cause"));
+      throw YarnRemoteExceptionFactoryProvider.getYarnRemoteExceptionFactory(
+          null).createYarnRemoteException(e);
+    }
+  }
+}



Mime
View raw message