Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3BEB19A3E for ; Tue, 3 Apr 2012 17:04:22 +0000 (UTC) Received: (qmail 34788 invoked by uid 500); 3 Apr 2012 17:04:22 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 34638 invoked by uid 500); 3 Apr 2012 17:04:22 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 34630 invoked by uid 99); 3 Apr 2012 17:04:22 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Apr 2012 17:04:21 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Apr 2012 17:04:16 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 7BA902388980; Tue, 3 Apr 2012 17:03:54 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1309044 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/s... Date: Tue, 03 Apr 2012 17:03:54 -0000 To: mapreduce-commits@hadoop.apache.org From: bobby@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120403170354.7BA902388980@eris.apache.org> Author: bobby Date: Tue Apr 3 17:03:53 2012 New Revision: 1309044 URL: http://svn.apache.org/viewvc?rev=1309044&view=rev Log: MAPREDUCE-4062. AM Launcher thread can hang forever (tgraves via bobby) Added: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java - copied unchanged from r1309043, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1309044&r1=1309043&r2=1309044&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue Apr 3 17:03:53 2012 @@ -127,6 +127,8 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4012 Hadoop Job setup error leaves no useful info to users (when LinuxTaskController is used). (tgraves) + MAPREDUCE-4062. AM Launcher thread can hang forever (tgraves via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java?rev=1309044&r1=1309043&r2=1309044&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java Tue Apr 3 17:03:53 2012 @@ -30,11 +30,4 @@ public interface ContainerLauncher CONTAINER_REMOTE_CLEANUP } - // Not a documented config. Only used for tests - static final String MR_AM_NM_COMMAND_TIMEOUT = MRJobConfig.MR_AM_PREFIX - + "nm-command-timeout"; - /** - * Maximum of 1 minute timeout for a Node to react to the command - */ - static final int DEFAULT_NM_COMMAND_TIMEOUT = 60000; } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1309044&r1=1309043&r2=1309044&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Tue Apr 3 17:03:53 2012 @@ -23,8 +23,6 @@ import java.nio.ByteBuffer; import java.security.PrivilegedAction; import java.util.HashSet; import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; @@ -72,8 +70,6 @@ public class ContainerLauncherImpl exten static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class); - int nmTimeOut; - private ConcurrentHashMap containers = new ConcurrentHashMap(); private AppContext context; @@ -83,7 +79,6 @@ public class ContainerLauncherImpl exten private Thread eventHandlingThread; protected BlockingQueue eventQueue = new LinkedBlockingQueue(); - final Timer commandTimer = new Timer(true); YarnRPC rpc; private Container getContainer(ContainerId id) { @@ -130,30 +125,18 @@ public class ContainerLauncherImpl exten "Container was killed before it was launched"); return; } - CommandTimerTask timerTask = new CommandTimerTask(Thread - .currentThread(), event); + final String containerManagerBindAddr = event.getContainerMgrAddress(); ContainerId containerID = event.getContainerID(); ContainerToken containerToken = event.getContainerToken(); ContainerManager proxy = null; try { - commandTimer.schedule(timerTask, nmTimeOut); proxy = getCMProxy(containerID, containerManagerBindAddr, containerToken); - // Interrupted during getProxy, but that didn't throw exception - if (Thread.interrupted()) { - // The timer canceled the command in the mean while. - String message = "Container launch failed for " + containerID - + " : Start-container for " + event.getContainerID() - + " got interrupted. Returning."; - this.state = ContainerState.FAILED; - sendContainerLaunchFailedMsg(taskAttemptID, message); - return; - } // Construct the actual Container ContainerLaunchContext containerLaunchContext = event.getContainer(); @@ -164,19 +147,6 @@ public class ContainerLauncherImpl exten startRequest.setContainerLaunchContext(containerLaunchContext); StartContainerResponse response = proxy.startContainer(startRequest); - // container started properly. Stop the timer - timerTask.cancel(); - if (Thread.interrupted()) { - // The timer canceled the command in the mean while, but - // startContainer didn't throw exception - String message = "Container launch failed for " + containerID - + " : Start-container for " + event.getContainerID() - + " got interrupted. Returning."; - this.state = ContainerState.FAILED; - sendContainerLaunchFailedMsg(taskAttemptID, message); - return; - } - ByteBuffer portInfo = response .getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID); int port = -1; @@ -198,17 +168,11 @@ public class ContainerLauncherImpl exten new TaskAttemptContainerLaunchedEvent(taskAttemptID, port)); this.state = ContainerState.RUNNING; } catch (Throwable t) { - if (Thread.interrupted()) { - // The timer canceled the command in the mean while. - LOG.info("Start-container for " + event.getContainerID() - + " got interrupted."); - } String message = "Container launch failed for " + containerID + " : " + StringUtils.stringifyException(t); this.state = ContainerState.FAILED; sendContainerLaunchFailedMsg(taskAttemptID, message); } finally { - timerTask.cancel(); if (proxy != null) { ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); } @@ -220,41 +184,24 @@ public class ContainerLauncherImpl exten if(this.state == ContainerState.PREP) { this.state = ContainerState.KILLED_BEFORE_LAUNCH; } else { - CommandTimerTask timerTask = new CommandTimerTask(Thread - .currentThread(), event); - final String containerManagerBindAddr = event.getContainerMgrAddress(); ContainerId containerID = event.getContainerID(); ContainerToken containerToken = event.getContainerToken(); TaskAttemptId taskAttemptID = event.getTaskAttemptID(); LOG.info("KILLING " + taskAttemptID); - commandTimer.schedule(timerTask, nmTimeOut); ContainerManager proxy = null; try { proxy = getCMProxy(containerID, containerManagerBindAddr, containerToken); - if (Thread.interrupted()) { - // The timer canceled the command in the mean while. No need to - // return, send cleaned up event anyways. - LOG.info("Stop-container for " + event.getContainerID() - + " got interrupted."); - } else { // kill the remote container if already launched StopContainerRequest stopRequest = Records .newRecord(StopContainerRequest.class); stopRequest.setContainerId(event.getContainerID()); proxy.stopContainer(stopRequest); - } - } catch (Throwable t) { - if (Thread.interrupted()) { - // The timer canceled the command in the mean while, clear the - // interrupt flag - LOG.info("Stop-container for " + event.getContainerID() - + " got interrupted."); - } + } catch (Throwable t) { // ignore the cleanup failure String message = "cleanup failed for container " @@ -264,15 +211,6 @@ public class ContainerLauncherImpl exten new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message)); LOG.warn(message); } finally { - timerTask.cancel(); - if (Thread.interrupted()) { - LOG.info("Stop-container for " + event.getContainerID() - + " got interrupted."); - // ignore the cleanup failure - context.getEventHandler().handle( - new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, - "cleanup failed for container " + event.getContainerID())); - } if (proxy != null) { ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); } @@ -303,8 +241,6 @@ public class ContainerLauncherImpl exten MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT); LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize); - this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, - ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT); this.rpc = createYarnRPC(conf); super.init(conf); } @@ -409,44 +345,6 @@ public class ContainerLauncherImpl exten return proxy; } - private static class CommandTimerTask extends TimerTask { - private final Thread commandThread; - protected final String message; - private boolean cancelled = false; - - public CommandTimerTask(Thread thread, ContainerLauncherEvent event) { - super(); - this.commandThread = thread; - this.message = "Couldn't complete " + event.getType() + " on " - + event.getContainerID() + "/" + event.getTaskAttemptID() - + ". Interrupting and returning"; - } - - @Override - public void run() { - synchronized (this) { - if (this.cancelled) { - return; - } - LOG.warn(this.message); - StackTraceElement[] trace = this.commandThread.getStackTrace(); - StringBuilder logMsg = new StringBuilder(); - for (int i = 0; i < trace.length; i++) { - logMsg.append("\n\tat " + trace[i]); - } - LOG.info("Stack trace of the command-thread: \n" + logMsg.toString()); - this.commandThread.interrupt(); - } - } - - @Override - public boolean cancel() { - synchronized (this) { - this.cancelled = true; - return super.cancel(); - } - } - } /** * Setup and start the container on remote nodemanager. Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java?rev=1309044&r1=1309043&r2=1309044&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java Tue Apr 3 17:03:53 2012 @@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.v2.a import static org.mockito.Mockito.mock; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; @@ -30,6 +32,7 @@ import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -44,18 +47,39 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ContainerManager; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerToken; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.factory.providers.YarnRemoteExceptionFactoryProvider; +import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; +import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; public class TestContainerLauncher { - static final Log LOG = LogFactory - .getLog(TestContainerLauncher.class); + private static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + Configuration conf; + Server server; + + static final Log LOG = LogFactory.getLog(TestContainerLauncher.class); @Test public void testPoolSize() throws InterruptedException { @@ -104,10 +128,10 @@ public class TestContainerLauncher { Assert.assertEquals(10, containerLauncher.numEventsProcessed.get()); containerLauncher.finishEventHandling = false; for (int i = 0; i < 10; i++) { - ContainerId containerId = - BuilderUtils.newContainerId(appAttemptId, i + 10); - TaskAttemptId taskAttemptId = - MRBuilderUtils.newTaskAttemptId(taskId, i + 10); + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, + i + 10); + TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, + i + 10); containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerId, "host" + i + ":1234", null, ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); @@ -119,8 +143,7 @@ public class TestContainerLauncher { // Different hosts, there should be an increase in core-thread-pool size to // 21(11hosts+10buffer) // Core pool size should be 21 but the live pool size should be only 11. - containerLauncher.expectedCorePoolSize = - 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE; + containerLauncher.expectedCorePoolSize = 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE; containerLauncher.finishEventHandling = false; ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 21); TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21); @@ -200,26 +223,28 @@ public class TestContainerLauncher { @Test public void testSlowNM() throws Exception { - test(false); - } - - @Test - public void testSlowNMWithInterruptsSwallowed() throws Exception { - test(true); + test(); } - private void test(boolean swallowInterrupts) throws Exception { + private void test() throws Exception { - MRApp app = new MRAppWithSlowNM(swallowInterrupts); - - Configuration conf = new Configuration(); + conf = new Configuration(); int maxAttempts = 1; conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts); conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + // set timeout low for the test + conf.setInt("yarn.rpc.nm-command-timeout", 3000); + conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class.getName()); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + server = rpc.getServer(ContainerManager.class, new DummyContainerManager(), + addr, conf, null, 1); + server.start(); - // Set low timeout for NM commands - conf.setInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, 3000); + MRApp app = new MRAppWithSlowNM(); + try { Job job = app.submit(conf); app.waitForState(job, JobState.RUNNING); @@ -231,8 +256,8 @@ public class TestContainerLauncher { Map attempts = tasks.values().iterator() .next().getAttempts(); - Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts - .size()); + Assert.assertEquals("Num attempts is not correct", maxAttempts, + attempts.size()); TaskAttempt attempt = attempts.values().iterator().next(); app.waitForState(attempt, TaskAttemptState.ASSIGNED); @@ -241,20 +266,18 @@ public class TestContainerLauncher { String diagnostics = attempt.getDiagnostics().toString(); LOG.info("attempt.getDiagnostics: " + diagnostics); - if (swallowInterrupts) { - Assert.assertEquals("[Container launch failed for " - + "container_0_0000_01_000000 : Start-container for " - + "container_0_0000_01_000000 got interrupted. Returning.]", - diagnostics); - } else { + Assert.assertTrue(diagnostics.contains("Container launch failed for " + "container_0_0000_01_000000 : ")); - Assert.assertTrue(diagnostics - .contains(": java.lang.InterruptedException")); - } + Assert + .assertTrue(diagnostics + .contains("java.net.SocketTimeoutException: 3000 millis timeout while waiting for channel")); + } finally { + server.stop(); app.stop(); } + } private final class CustomContainerLauncher extends ContainerLauncherImpl { @@ -317,13 +340,10 @@ public class TestContainerLauncher { } } - private static class MRAppWithSlowNM extends MRApp { - - final boolean swallowInterrupts; + private class MRAppWithSlowNM extends MRApp { - public MRAppWithSlowNM(boolean swallowInterrupts) { + public MRAppWithSlowNM() { super(1, 0, false, "TestContainerLauncher", true); - this.swallowInterrupts = swallowInterrupts; } @Override @@ -333,20 +353,57 @@ public class TestContainerLauncher { protected ContainerManager getCMProxy(ContainerId containerID, String containerManagerBindAddr, ContainerToken containerToken) throws IOException { + // make proxy connect to our local containerManager server + ContainerManager proxy = (ContainerManager) rpc.getProxy( + ContainerManager.class, + NetUtils.createSocketAddr("localhost:" + server.getPort()), conf); + return proxy; + } + }; + + }; + } + + public class DummyContainerManager implements ContainerManager { + + private ContainerStatus status = null; + + @Override + public GetContainerStatusResponse getContainerStatus( + GetContainerStatusRequest request) throws YarnRemoteException { + GetContainerStatusResponse response = recordFactory + .newRecordInstance(GetContainerStatusResponse.class); + response.setStatus(status); + return response; + } + + @Override + public StartContainerResponse startContainer(StartContainerRequest request) + throws YarnRemoteException { + ContainerLaunchContext container = request.getContainerLaunchContext(); + StartContainerResponse response = recordFactory + .newRecordInstance(StartContainerResponse.class); + status = recordFactory.newRecordInstance(ContainerStatus.class); try { - synchronized (this) { - wait(); // Just hang the thread simulating a very slow NM. + // make the thread sleep to look like its not going to respond + Thread.sleep(15000); + } catch (Exception e) { + LOG.error(e); + throw new UndeclaredThrowableException(e); } - } catch (InterruptedException e) { - LOG.info(e); - if (!MRAppWithSlowNM.this.swallowInterrupts) { - throw new IOException(e); + status.setState(ContainerState.RUNNING); + status.setContainerId(container.getContainerId()); + status.setExitStatus(0); + return response; } - Thread.currentThread().interrupt(); + + @Override + public StopContainerResponse stopContainer(StopContainerRequest request) + throws YarnRemoteException { + Exception e = new Exception("Dummy function", new Exception( + "Dummy function cause")); + throw YarnRemoteExceptionFactoryProvider.getYarnRemoteExceptionFactory( + null).createYarnRemoteException(e); } - return null; } - }; - }; } -} Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java?rev=1309044&r1=1309043&r2=1309044&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java Tue Apr 3 17:03:53 2012 @@ -24,6 +24,8 @@ import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManagerPB; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; @@ -38,6 +40,7 @@ import org.apache.hadoop.yarn.api.protoc import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerResponsePBImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusRequestProto; @@ -48,12 +51,25 @@ import com.google.protobuf.ServiceExcept public class ContainerManagerPBClientImpl implements ContainerManager { + // Not a documented config. Only used for tests + static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX + + "rpc.nm-command-timeout"; + + /** + * Maximum of 1 minute timeout for a Node to react to the command + */ + static final int DEFAULT_COMMAND_TIMEOUT = 60000; + private ContainerManagerPB proxy; public ContainerManagerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { RPC.setProtocolEngine(conf, ContainerManagerPB.class, ProtobufRpcEngine.class); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT); proxy = (ContainerManagerPB)RPC.getProxy( - ContainerManagerPB.class, clientVersion, addr, conf); + ContainerManagerPB.class, clientVersion, addr, ugi, conf, + NetUtils.getDefaultSocketFactory(conf), expireIntvl); } public void close() {