Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E694F71A3 for ; Thu, 1 Dec 2011 08:35:54 +0000 (UTC) Received: (qmail 77725 invoked by uid 500); 1 Dec 2011 08:35:54 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 77641 invoked by uid 500); 1 Dec 2011 08:35:49 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 77629 invoked by uid 99); 1 Dec 2011 08:35:45 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Dec 2011 08:35:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Dec 2011 08:35:42 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 98EF0238897A; Thu, 1 Dec 2011 08:35:22 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1208994 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apac... Date: Thu, 01 Dec 2011 08:35:21 -0000 To: mapreduce-commits@hadoop.apache.org From: mahadev@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111201083522.98EF0238897A@eris.apache.org> Author: mahadev Date: Thu Dec 1 08:35:20 2011 New Revision: 1208994 URL: http://svn.apache.org/viewvc?rev=1208994&view=rev Log: MAPREDUCE-3463. Second AM fails to recover properly when first AM is killed with java.lang.IllegalArgumentException causing lost job. (Siddharth Seth via mahadev) Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1208994&r1=1208993&r2=1208994&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Dec 1 08:35:20 2011 @@ -204,6 +204,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3488. Streaming jobs are failing because the main class isnt set in the pom files. (mahadev) + + MAPREDUCE-3463. Second AM fails to recover properly when first AM is killed with + java.lang.IllegalArgumentException causing lost job. (Siddharth Seth via mahadev) Release 0.23.0 - 2011-11-01 Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1208994&r1=1208993&r2=1208994&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Dec 1 08:35:20 2011 @@ -217,8 +217,7 @@ public class MRAppMaster extends Composi && appAttemptID.getAttemptId() > 1) { LOG.info("Recovery is enabled. " + "Will try to recover from previous life on best effort basis."); - recoveryServ = new RecoveryService(appAttemptID, clock, - committer); + recoveryServ = createRecoveryService(context); addIfService(recoveryServ); dispatcher = recoveryServ.getDispatcher(); clock = recoveryServ.getClock(); @@ -425,6 +424,15 @@ public class MRAppMaster extends Composi return new JobFinishEventHandler(); } + /** + * Create the recovery service. + * @return an instance of the recovery service. + */ + protected Recovery createRecoveryService(AppContext appContext) { + return new RecoveryService(appContext.getApplicationAttemptId(), + appContext.getClock(), getCommitter()); + } + /** Create and initialize (but don't start) a single job. */ protected Job createJob(Configuration conf) { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1208994&r1=1208993&r2=1208994&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Thu Dec 1 08:35:20 2011 @@ -76,8 +76,6 @@ import org.apache.hadoop.yarn.event.Asyn import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.util.BuilderUtils; @@ -97,8 +95,6 @@ import org.apache.hadoop.yarn.util.Conve public class RecoveryService extends CompositeService implements Recovery { - private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - private static final Log LOG = LogFactory.getLog(RecoveryService.class); private final ApplicationAttemptId applicationAttemptId; @@ -120,7 +116,7 @@ public class RecoveryService extends Com super("RecoveringDispatcher"); this.applicationAttemptId = applicationAttemptId; this.committer = committer; - this.dispatcher = new RecoveryDispatcher(); + this.dispatcher = createRecoveryDispatcher(); this.clock = new ControlledClock(clock); addService((Service) dispatcher); } @@ -209,17 +205,32 @@ public class RecoveryService extends Com LOG.info("Read completed tasks from history " + completedTasks.size()); } + + protected Dispatcher createRecoveryDispatcher() { + return new RecoveryDispatcher(); + } + + protected Dispatcher createRecoveryDispatcher(boolean exitOnException) { + return new RecoveryDispatcher(exitOnException); + } + @SuppressWarnings("rawtypes") class RecoveryDispatcher extends AsyncDispatcher { private final EventHandler actualHandler; private final EventHandler handler; - RecoveryDispatcher() { + RecoveryDispatcher(boolean exitOnException) { + super(exitOnException); actualHandler = super.getEventHandler(); handler = new InterceptingEventHandler(actualHandler); } + RecoveryDispatcher() { + this(false); + } + @Override + @SuppressWarnings("unchecked") public void dispatch(Event event) { if (recoveryMode) { if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) { @@ -267,6 +278,10 @@ public class RecoveryService extends Com } } } + realDispatch(event); + } + + public void realDispatch(Event event) { super.dispatch(event); } @@ -281,6 +296,7 @@ public class RecoveryService extends Com return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id)); } + @SuppressWarnings({"rawtypes", "unchecked"}) private class InterceptingEventHandler implements EventHandler { EventHandler actualHandler; @@ -407,7 +423,9 @@ public class RecoveryService extends Com LOG.info("Sending assigned event to " + yarnAttemptID); ContainerId cId = attemptInfo.getContainerId(); - NodeId nodeId = ConverterUtils.toNodeId(attemptInfo.getHostname()); + NodeId nodeId = + ConverterUtils.toNodeId(attemptInfo.getHostname() + ":" + + attemptInfo.getPort()); // Resource/Priority/ApplicationACLs are only needed while launching the // container on an NM, these are already completed tasks, so setting them // to null Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1208994&r1=1208993&r2=1208994&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Thu Dec 1 08:35:20 2011 @@ -52,7 +52,12 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; +import org.apache.hadoop.mapreduce.v2.app.recover.Recovery; +import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.Clock; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Test; @@ -408,6 +413,13 @@ public class TestRecovery { } @Override + protected Recovery createRecoveryService(AppContext appContext) { + return new RecoveryServiceWithCustomDispatcher( + appContext.getApplicationAttemptId(), appContext.getClock(), + getCommitter()); + } + + @Override protected ContainerLauncher createContainerLauncher(AppContext context) { MockContainerLauncher launcher = new MockContainerLauncher(); launcher.shufflePort = 5467; @@ -422,7 +434,22 @@ public class TestRecovery { return eventHandler; } } - + + class RecoveryServiceWithCustomDispatcher extends RecoveryService { + + public RecoveryServiceWithCustomDispatcher( + ApplicationAttemptId applicationAttemptId, Clock clock, + OutputCommitter committer) { + super(applicationAttemptId, clock, committer); + } + + @Override + public Dispatcher createRecoveryDispatcher() { + return super.createRecoveryDispatcher(false); + } + + } + public static void main(String[] arg) throws Exception { TestRecovery test = new TestRecovery(); test.testCrashed(); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java?rev=1208994&r1=1208993&r2=1208994&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java Thu Dec 1 08:35:20 2011 @@ -45,18 +45,25 @@ public class AsyncDispatcher extends Abs private Thread eventHandlingThread; protected final Map, EventHandler> eventDispatchers; + private boolean exitOnDispatchException; public AsyncDispatcher() { this(new HashMap, EventHandler>(), - new LinkedBlockingQueue()); + new LinkedBlockingQueue(), true); + } + + public AsyncDispatcher(boolean exitOnException) { + this(new HashMap, EventHandler>(), + new LinkedBlockingQueue(), exitOnException); } AsyncDispatcher( Map, EventHandler> eventDispatchers, - BlockingQueue eventQueue) { + BlockingQueue eventQueue, boolean exitOnException) { super("Dispatcher"); this.eventQueue = eventQueue; this.eventDispatchers = eventDispatchers; + this.exitOnDispatchException = exitOnException; } Runnable createThread() { @@ -118,7 +125,9 @@ public class AsyncDispatcher extends Abs catch (Throwable t) { //TODO Maybe log the state of the queue LOG.fatal("Error in dispatcher thread. Exiting..", t); - System.exit(-1); + if (exitOnDispatchException) { + System.exit(-1); + } } } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java?rev=1208994&r1=1208993&r2=1208994&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java Thu Dec 1 08:35:20 2011 @@ -36,7 +36,7 @@ public class DrainDispatcher extends Asy } private DrainDispatcher(BlockingQueue eventQueue) { - super(new HashMap, EventHandler>(), eventQueue); + super(new HashMap, EventHandler>(), eventQueue, true); this.queue = eventQueue; }