Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8A05C9D8D for ; Tue, 10 Jan 2012 02:16:18 +0000 (UTC) Received: (qmail 16694 invoked by uid 500); 10 Jan 2012 02:16:18 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 16630 invoked by uid 500); 10 Jan 2012 02:16:17 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 16622 invoked by uid 99); 10 Jan 2012 02:16:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Jan 2012 02:16:17 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Jan 2012 02:16:11 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 539022388860; Tue, 10 Jan 2012 02:15:49 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1229451 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java... Date: Tue, 10 Jan 2012 02:15:49 -0000 To: mapreduce-commits@hadoop.apache.org From: vinodkv@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120110021549.539022388860@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: vinodkv Date: Tue Jan 10 02:15:48 2012 New Revision: 1229451 URL: http://svn.apache.org/viewvc?rev=1229451&view=rev Log: MAPREDUCE-3312. Modified MR AM to not send a stop-container request for a container that isn't launched at all. Contributed by Robert Joseph Evans. Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1229451&r1=1229450&r2=1229451&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Jan 10 02:15:48 2012 @@ -165,6 +165,9 @@ Release 0.23.1 - Unreleased for the thread loop interval separate from task-timeout configuration property. (Siddharth Seth via vinodkv) + MAPREDUCE-3312. Modified MR AM to not send a stop-container request for + a container that isn't launched at all. (Robert Joseph Evans via vinodkv) + OPTIMIZATIONS MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1229451&r1=1229450&r2=1229451&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Tue Jan 10 02:15:48 2012 @@ -26,6 +26,7 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -44,8 +45,6 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; -import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; -import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -75,16 +74,217 @@ public class ContainerLauncherImpl exten int nmTimeOut; + private ConcurrentHashMap containers = + new ConcurrentHashMap(); private AppContext context; protected ThreadPoolExecutor launcherPool; protected static final int INITIAL_POOL_SIZE = 10; private int limitOnPoolSize; private Thread eventHandlingThread; - private BlockingQueue eventQueue = + protected BlockingQueue eventQueue = new LinkedBlockingQueue(); final Timer commandTimer = new Timer(true); YarnRPC rpc; + private Container getContainer(ContainerId id) { + Container c = containers.get(id); + if(c == null) { + c = new Container(); + Container old = containers.putIfAbsent(id, c); + if(old != null) { + c = old; + } + } + return c; + } + + private void removeContainerIfDone(ContainerId id) { + Container c = containers.get(id); + if(c != null && c.isCompletelyDone()) { + containers.remove(id); + } + } + + private static enum ContainerState { + PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH + } + + private class Container { + private ContainerState state; + + public Container() { + this.state = ContainerState.PREP; + } + + public synchronized boolean isCompletelyDone() { + return state == ContainerState.DONE || state == ContainerState.FAILED; + } + + @SuppressWarnings("unchecked") + public synchronized void launch(ContainerRemoteLaunchEvent event) { + TaskAttemptId taskAttemptID = event.getTaskAttemptID(); + LOG.info("Launching " + taskAttemptID); + if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) { + state = ContainerState.DONE; + sendContainerLaunchFailedMsg(taskAttemptID, + "Container was killed before it was launched"); + return; + } + CommandTimerTask timerTask = new CommandTimerTask(Thread + .currentThread(), event); + + final String containerManagerBindAddr = event.getContainerMgrAddress(); + ContainerId containerID = event.getContainerID(); + ContainerToken containerToken = event.getContainerToken(); + + ContainerManager proxy = null; + try { + commandTimer.schedule(timerTask, nmTimeOut); + + proxy = getCMProxy(containerID, containerManagerBindAddr, + containerToken); + + // Interrupted during getProxy, but that didn't throw exception + if (Thread.interrupted()) { + // The timer canceled the command in the mean while. + String message = "Container launch failed for " + containerID + + " : Start-container for " + event.getContainerID() + + " got interrupted. Returning."; + this.state = ContainerState.FAILED; + sendContainerLaunchFailedMsg(taskAttemptID, message); + return; + } + // Construct the actual Container + ContainerLaunchContext containerLaunchContext = + event.getContainer(); + + // Now launch the actual container + StartContainerRequest startRequest = Records + .newRecord(StartContainerRequest.class); + startRequest.setContainerLaunchContext(containerLaunchContext); + StartContainerResponse response = proxy.startContainer(startRequest); + + // container started properly. Stop the timer + timerTask.cancel(); + if (Thread.interrupted()) { + // The timer canceled the command in the mean while, but + // startContainer didn't throw exception + String message = "Container launch failed for " + containerID + + " : Start-container for " + event.getContainerID() + + " got interrupted. Returning."; + this.state = ContainerState.FAILED; + sendContainerLaunchFailedMsg(taskAttemptID, message); + return; + } + + ByteBuffer portInfo = response + .getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID); + int port = -1; + if(portInfo != null) { + port = ShuffleHandler.deserializeMetaData(portInfo); + } + LOG.info("Shuffle port returned by ContainerManager for " + + taskAttemptID + " : " + port); + + if(port < 0) { + this.state = ContainerState.FAILED; + throw new IllegalStateException("Invalid shuffle port number " + + port + " returned for " + taskAttemptID); + } + + // after launching, send launched event to task attempt to move + // it from ASSIGNED to RUNNING state + context.getEventHandler().handle( + new TaskAttemptContainerLaunchedEvent(taskAttemptID, port)); + this.state = ContainerState.RUNNING; + } catch (Throwable t) { + if (Thread.interrupted()) { + // The timer canceled the command in the mean while. + LOG.info("Start-container for " + event.getContainerID() + + " got interrupted."); + } + String message = "Container launch failed for " + containerID + " : " + + StringUtils.stringifyException(t); + this.state = ContainerState.FAILED; + sendContainerLaunchFailedMsg(taskAttemptID, message); + } finally { + timerTask.cancel(); + if (proxy != null) { + ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); + } + } + } + + @SuppressWarnings("unchecked") + public synchronized void kill(ContainerLauncherEvent event) { + if(this.state == ContainerState.PREP) { + this.state = ContainerState.KILLED_BEFORE_LAUNCH; + } else { + CommandTimerTask timerTask = new CommandTimerTask(Thread + .currentThread(), event); + + final String containerManagerBindAddr = event.getContainerMgrAddress(); + ContainerId containerID = event.getContainerID(); + ContainerToken containerToken = event.getContainerToken(); + TaskAttemptId taskAttemptID = event.getTaskAttemptID(); + LOG.info("KILLING " + taskAttemptID); + commandTimer.schedule(timerTask, nmTimeOut); + + ContainerManager proxy = null; + try { + proxy = getCMProxy(containerID, containerManagerBindAddr, + containerToken); + + if (Thread.interrupted()) { + // The timer canceled the command in the mean while. No need to + // return, send cleaned up event anyways. + LOG.info("Stop-container for " + event.getContainerID() + + " got interrupted."); + } else { + // kill the remote container if already launched + StopContainerRequest stopRequest = Records + .newRecord(StopContainerRequest.class); + stopRequest.setContainerId(event.getContainerID()); + proxy.stopContainer(stopRequest); + } + } catch (Throwable t) { + + if (Thread.interrupted()) { + // The timer canceled the command in the mean while, clear the + // interrupt flag + LOG.info("Stop-container for " + event.getContainerID() + + " got interrupted."); + } + + // ignore the cleanup failure + String message = "cleanup failed for container " + + event.getContainerID() + " : " + + StringUtils.stringifyException(t); + context.getEventHandler().handle( + new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message)); + LOG.warn(message); + } finally { + timerTask.cancel(); + if (Thread.interrupted()) { + LOG.info("Stop-container for " + event.getContainerID() + + " got interrupted."); + // ignore the cleanup failure + context.getEventHandler().handle( + new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, + "cleanup failed for container " + event.getContainerID())); + } + if (proxy != null) { + ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); + } + } + this.state = ContainerState.DONE; + } + // after killing, send killed event to task attempt + context.getEventHandler().handle( + new TaskAttemptEvent(event.getTaskAttemptID(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + } + } // To track numNodes. Set allNodes = new HashSet(); @@ -105,9 +305,13 @@ public class ContainerLauncherImpl exten LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize); this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT); - this.rpc = YarnRPC.create(conf); + this.rpc = createYarnRPC(conf); super.init(conf); } + + protected YarnRPC createYarnRPC(Configuration conf) { + return YarnRPC.create(conf); + } public void start() { @@ -119,7 +323,7 @@ public class ContainerLauncherImpl exten Integer.MAX_VALUE, 1, TimeUnit.HOURS, new LinkedBlockingQueue(), tf); - eventHandlingThread = new Thread(new Runnable() { + eventHandlingThread = new Thread() { @Override public void run() { ContainerLauncherEvent event = null; @@ -162,7 +366,7 @@ public class ContainerLauncherImpl exten // NodeManager into a single connection } } - }); + }; eventHandlingThread.setName("ContainerLauncher Event Handler"); eventHandlingThread.start(); super.start(); @@ -255,175 +459,28 @@ public class ContainerLauncherImpl exten this.event = event; } - @SuppressWarnings("unchecked") @Override public void run() { LOG.info("Processing the event " + event.toString()); // Load ContainerManager tokens before creating a connection. // TODO: Do it only once per NodeManager. - final String containerManagerBindAddr = event.getContainerMgrAddress(); ContainerId containerID = event.getContainerID(); - ContainerToken containerToken = event.getContainerToken(); - TaskAttemptId taskAttemptID = event.getTaskAttemptID(); - - ContainerManager proxy = null; - - CommandTimerTask timerTask = new CommandTimerTask(Thread - .currentThread(), event); + Container c = getContainer(containerID); switch(event.getType()) { case CONTAINER_REMOTE_LAUNCH: ContainerRemoteLaunchEvent launchEvent = (ContainerRemoteLaunchEvent) event; - - try { - commandTimer.schedule(timerTask, nmTimeOut); - - proxy = getCMProxy(containerID, containerManagerBindAddr, - containerToken); - - // Interrupted during getProxy, but that didn't throw exception - if (Thread.interrupted()) { - // The timer cancelled the command in the mean while. - String message = "Container launch failed for " + containerID - + " : Start-container for " + event.getContainerID() - + " got interrupted. Returning."; - sendContainerLaunchFailedMsg(taskAttemptID, message); - return; - } - - // Construct the actual Container - ContainerLaunchContext containerLaunchContext = - launchEvent.getContainer(); - - // Now launch the actual container - StartContainerRequest startRequest = Records - .newRecord(StartContainerRequest.class); - startRequest.setContainerLaunchContext(containerLaunchContext); - StartContainerResponse response = proxy.startContainer(startRequest); - - // container started properly. Stop the timer - timerTask.cancel(); - if (Thread.interrupted()) { - // The timer cancelled the command in the mean while, but - // startContainer didn't throw exception - String message = "Container launch failed for " + containerID - + " : Start-container for " + event.getContainerID() - + " got interrupted. Returning."; - sendContainerLaunchFailedMsg(taskAttemptID, message); - return; - } - - ByteBuffer portInfo = response - .getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID); - int port = -1; - if(portInfo != null) { - port = ShuffleHandler.deserializeMetaData(portInfo); - } - LOG.info("Shuffle port returned by ContainerManager for " - + taskAttemptID + " : " + port); - - if(port < 0) { - throw new IllegalStateException("Invalid shuffle port number " - + port + " returned for " + taskAttemptID); - } - - // after launching, send launched event to task attempt to move - // it from ASSIGNED to RUNNING state - context.getEventHandler().handle( - new TaskAttemptContainerLaunchedEvent(taskAttemptID, port)); - } catch (Throwable t) { - if (Thread.interrupted()) { - // The timer cancelled the command in the mean while. - LOG.info("Start-container for " + event.getContainerID() - + " got interrupted."); - } - String message = "Container launch failed for " + containerID - + " : " + StringUtils.stringifyException(t); - sendContainerLaunchFailedMsg(taskAttemptID, message); - } finally { - timerTask.cancel(); - if (proxy != null) { - ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); - } - } - + c.launch(launchEvent); break; case CONTAINER_REMOTE_CLEANUP: - // We will have to remove the launch (meant "cleanup"? FIXME) event if it is still in eventQueue - // and not yet processed - if (eventQueue.contains(event)) { - eventQueue.remove(event); // TODO: Any synchro needed? - //deallocate the container - context.getEventHandler().handle( - new ContainerAllocatorEvent(taskAttemptID, - ContainerAllocator.EventType.CONTAINER_DEALLOCATE)); - } else { - - try { - commandTimer.schedule(timerTask, nmTimeOut); - - proxy = getCMProxy(containerID, containerManagerBindAddr, - containerToken); - - if (Thread.interrupted()) { - // The timer cancelled the command in the mean while. No need to - // return, send cleanedup event anyways. - LOG.info("Stop-container for " + event.getContainerID() - + " got interrupted."); - } else { - - // TODO:check whether container is launched - - // kill the remote container if already launched - StopContainerRequest stopRequest = Records - .newRecord(StopContainerRequest.class); - stopRequest.setContainerId(event.getContainerID()); - proxy.stopContainer(stopRequest); - } - } catch (Throwable t) { - - if (Thread.interrupted()) { - // The timer cancelled the command in the mean while, clear the - // interrupt flag - LOG.info("Stop-container for " + event.getContainerID() - + " got interrupted."); - } - - // ignore the cleanup failure - String message = "cleanup failed for container " - + event.getContainerID() + " : " - + StringUtils.stringifyException(t); - context.getEventHandler() - .handle( - new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, - message)); - LOG.warn(message); - } finally { - timerTask.cancel(); - if (Thread.interrupted()) { - LOG.info("Stop-container for " + event.getContainerID() - + " got interrupted."); - // ignore the cleanup failure - context.getEventHandler() - .handle(new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, - "cleanup failed for container " + event.getContainerID())); - } - if (proxy != null) { - ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); - } - } - - // after killing, send killed event to taskattempt - context.getEventHandler().handle( - new TaskAttemptEvent(event.getTaskAttemptID(), - TaskAttemptEventType.TA_CONTAINER_CLEANED)); - } + c.kill(event); break; } + removeContainerIfDone(containerID); } } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java?rev=1229451&r1=1229450&r2=1229451&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java Tue Jan 10 02:15:48 2012 @@ -46,6 +46,11 @@ public class ContainerRemoteLaunchEvent public Task getRemoteTask() { return this.task; } + + @Override + public int hashCode() { + return super.hashCode(); + } @Override public boolean equals(Object obj) { Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1229451&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Tue Jan 10 02:15:48 2012 @@ -0,0 +1,223 @@ +package org.apache.hadoop.mapreduce.v2.app.launcher; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.ShuffleHandler; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.api.ContainerManager; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.Test; + +public class TestContainerLauncherImpl { + static final Log LOG = LogFactory.getLog(TestContainerLauncherImpl.class); + private static final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + + private static class ContainerLauncherImplUnderTest extends + ContainerLauncherImpl { + + private YarnRPC rpc; + + public ContainerLauncherImplUnderTest(AppContext context, YarnRPC rpc) { + super(context); + this.rpc = rpc; + } + + @Override + protected YarnRPC createYarnRPC(Configuration conf) { + return rpc; + } + + public void waitForPoolToIdle() throws InterruptedException { + //I wish that we did not need the sleep, but it is here so that we are sure + // That the other thread had time to insert the event into the queue and + // start processing it. For some reason we were getting interrupted + // exceptions within eventQueue without this sleep. + Thread.sleep(100l); + LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+ + " POOL SIZE 2: "+this.launcherPool.getQueue().size()+ + " ACTIVE COUNT: "+ this.launcherPool.getActiveCount()); + while(!this.eventQueue.isEmpty() || + !this.launcherPool.getQueue().isEmpty() || + this.launcherPool.getActiveCount() > 0) { + Thread.sleep(100l); + LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+ + " POOL SIZE 2: "+this.launcherPool.getQueue().size()+ + " ACTIVE COUNT: "+ this.launcherPool.getActiveCount()); + } + LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+ + " POOL SIZE 2: "+this.launcherPool.getQueue().size()+ + " ACTIVE COUNT: "+ this.launcherPool.getActiveCount()); + } + } + + public static ContainerId makeContainerId(long ts, int appId, int attemptId, + int id) { + return BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(ts, appId), attemptId), id); + } + + public static TaskAttemptId makeTaskAttemptId(long ts, int appId, int taskId, + TaskType taskType, int id) { + ApplicationId aID = BuilderUtils.newApplicationId(ts, appId); + JobId jID = MRBuilderUtils.newJobId(aID, id); + TaskId tID = MRBuilderUtils.newTaskId(jID, taskId, taskType); + return MRBuilderUtils.newTaskAttemptId(tID, id); + } + + @Test + public void testHandle() throws Exception { + LOG.info("STARTING testHandle"); + YarnRPC mockRpc = mock(YarnRPC.class); + AppContext mockContext = mock(AppContext.class); + @SuppressWarnings("rawtypes") + EventHandler mockEventHandler = mock(EventHandler.class); + when(mockContext.getEventHandler()).thenReturn(mockEventHandler); + + ContainerManager mockCM = mock(ContainerManager.class); + when(mockRpc.getProxy(eq(ContainerManager.class), + any(InetSocketAddress.class), any(Configuration.class))) + .thenReturn(mockCM); + + ContainerLauncherImplUnderTest ut = + new ContainerLauncherImplUnderTest(mockContext, mockRpc); + + Configuration conf = new Configuration(); + ut.init(conf); + ut.start(); + try { + ContainerId contId = makeContainerId(0l, 0, 0, 1); + TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0); + String cmAddress = "127.0.0.1:8000"; + StartContainerResponse startResp = + recordFactory.newRecordInstance(StartContainerResponse.class); + startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, + ShuffleHandler.serializeMetaData(80)); + + + LOG.info("inserting launch event"); + ContainerRemoteLaunchEvent mockLaunchEvent = + mock(ContainerRemoteLaunchEvent.class); + when(mockLaunchEvent.getType()) + .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH); + when(mockLaunchEvent.getContainerID()) + .thenReturn(contId); + when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId); + when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress); + when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp); + ut.handle(mockLaunchEvent); + + ut.waitForPoolToIdle(); + + verify(mockCM).startContainer(any(StartContainerRequest.class)); + + LOG.info("inserting cleanup event"); + ContainerLauncherEvent mockCleanupEvent = + mock(ContainerLauncherEvent.class); + when(mockCleanupEvent.getType()) + .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP); + when(mockCleanupEvent.getContainerID()) + .thenReturn(contId); + when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId); + when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress); + ut.handle(mockCleanupEvent); + + ut.waitForPoolToIdle(); + + verify(mockCM).stopContainer(any(StopContainerRequest.class)); + } finally { + ut.stop(); + } + } + + @Test + public void testOutOfOrder() throws Exception { + LOG.info("STARTING testOutOfOrder"); + YarnRPC mockRpc = mock(YarnRPC.class); + AppContext mockContext = mock(AppContext.class); + @SuppressWarnings("rawtypes") + EventHandler mockEventHandler = mock(EventHandler.class); + when(mockContext.getEventHandler()).thenReturn(mockEventHandler); + + ContainerManager mockCM = mock(ContainerManager.class); + when(mockRpc.getProxy(eq(ContainerManager.class), + any(InetSocketAddress.class), any(Configuration.class))) + .thenReturn(mockCM); + + ContainerLauncherImplUnderTest ut = + new ContainerLauncherImplUnderTest(mockContext, mockRpc); + + Configuration conf = new Configuration(); + ut.init(conf); + ut.start(); + try { + ContainerId contId = makeContainerId(0l, 0, 0, 1); + TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0); + String cmAddress = "127.0.0.1:8000"; + StartContainerResponse startResp = + recordFactory.newRecordInstance(StartContainerResponse.class); + startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, + ShuffleHandler.serializeMetaData(80)); + + LOG.info("inserting cleanup event"); + ContainerLauncherEvent mockCleanupEvent = + mock(ContainerLauncherEvent.class); + when(mockCleanupEvent.getType()) + .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP); + when(mockCleanupEvent.getContainerID()) + .thenReturn(contId); + when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId); + when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress); + ut.handle(mockCleanupEvent); + + ut.waitForPoolToIdle(); + + verify(mockCM, never()).stopContainer(any(StopContainerRequest.class)); + + LOG.info("inserting launch event"); + ContainerRemoteLaunchEvent mockLaunchEvent = + mock(ContainerRemoteLaunchEvent.class); + when(mockLaunchEvent.getType()) + .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH); + when(mockLaunchEvent.getContainerID()) + .thenReturn(contId); + when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId); + when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress); + when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp); + ut.handle(mockLaunchEvent); + + ut.waitForPoolToIdle(); + + verify(mockCM, never()).startContainer(any(StartContainerRequest.class)); + } finally { + ut.stop(); + } + } +} Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1229451&r1=1229450&r2=1229451&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Tue Jan 10 02:15:48 2012 @@ -164,7 +164,7 @@ public class ShuffleHandler extends Abst * @param port the port to be sent to the ApplciationMaster * @return the serialized form of the port. */ - static ByteBuffer serializeMetaData(int port) throws IOException { + public static ByteBuffer serializeMetaData(int port) throws IOException { //TODO these bytes should be versioned DataOutputBuffer port_dob = new DataOutputBuffer(); port_dob.writeInt(port);