Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7D81E11863 for ; Thu, 17 Jul 2014 00:15:26 +0000 (UTC) Received: (qmail 56732 invoked by uid 500); 17 Jul 2014 00:15:26 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 56688 invoked by uid 500); 17 Jul 2014 00:15:26 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 56677 invoked by uid 99); 17 Jul 2014 00:15:26 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jul 2014 00:15:26 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jul 2014 00:15:23 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 5742F23888D2; Thu, 17 Jul 2014 00:14:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1611222 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-serve... Date: Thu, 17 Jul 2014 00:14:56 -0000 To: yarn-commits@hadoop.apache.org From: vinodkv@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140717001457.5742F23888D2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: vinodkv Date: Thu Jul 17 00:14:56 2014 New Revision: 1611222 URL: http://svn.apache.org/r1611222 Log: YARN-2219. Changed ResourceManager to avoid AMs and NMs getting exceptions after RM recovery but before scheduler learns about apps and app-attempts. Contributed by Jian He. Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1611222&r1=1611221&r2=1611222&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu Jul 17 00:14:56 2014 @@ -62,6 +62,10 @@ Release 2.6.0 - UNRELEASED YARN-2264. Fixed a race condition in DrainDispatcher which may cause random test failures. (Li Lu via jianhe) + YARN-2219. Changed ResourceManager to avoid AMs and NMs getting exceptions + after RM recovery but before scheduler learns about apps and app-attempts. + (Jian He via vinodkv) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1611222&r1=1611221&r2=1611222&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Thu Jul 17 00:14:56 2014 @@ -205,12 +205,6 @@ public class RMAppImpl implements RMApp, .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) - // ACCECPTED state can once again receive APP_ACCEPTED event, because on - // recovery the app returns ACCEPTED state and the app once again go - // through the scheduler and triggers one more APP_ACCEPTED event at - // ACCEPTED state. - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.APP_ACCEPTED) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, @@ -789,8 +783,18 @@ public class RMAppImpl implements RMApp, return app.recoveredFinalState; } - // Notify scheduler about the app on recovery - new AddApplicationToSchedulerTransition().transition(app, event); + // No existent attempts means the attempt associated with this app was not + // started or started but not yet saved. + if (app.attempts.isEmpty()) { + app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, + app.submissionContext.getQueue(), app.user)); + return RMAppState.SUBMITTED; + } + + // Add application to scheduler synchronously to guarantee scheduler + // knows applications before AM or NM re-registers. + app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, + app.submissionContext.getQueue(), app.user, true)); // recover attempts app.recoverAppAttempts(); @@ -805,12 +809,6 @@ public class RMAppImpl implements RMApp, return RMAppState.ACCEPTED; } - // No existent attempts means the attempt associated with this app was not - // started or started but not yet saved. - if (app.attempts.isEmpty()) { - return RMAppState.SUBMITTED; - } - // YARN-1507 is saving the application state after the application is // accepted. So after YARN-1507, an app is saved meaning it is accepted. // Thus we return ACCECPTED state on recovery. Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1611222&r1=1611221&r2=1611222&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Thu Jul 17 00:14:56 2014 @@ -926,8 +926,10 @@ public class RMAppAttemptImpl implements appAttempt.masterService .registerAppAttempt(appAttempt.applicationAttemptId); - appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( - appAttempt.getAppAttemptId(), false, false)); + // Add attempt to scheduler synchronously to guarantee scheduler + // knows attempts before AM or NM re-registers. + appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent( + appAttempt.getAppAttemptId(), false, true)); } /* Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1611222&r1=1611221&r2=1611222&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Thu Jul 17 00:14:56 2014 @@ -521,7 +521,7 @@ public class CapacityScheduler extends } private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user) { + String queueName, String user, boolean isAppRecovering) { // santiy checks. CSQueue queue = getQueue(queueName); if (queue == null) { @@ -553,14 +553,20 @@ public class CapacityScheduler extends applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); - rmContext.getDispatcher().getEventHandler() + if (isAppRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); + } + } else { + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } } private synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, - boolean shouldNotifyAttemptAdded) { + boolean isAttemptRecovering) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); CSQueue queue = (CSQueue) application.getQueue(); @@ -578,14 +584,15 @@ public class CapacityScheduler extends LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " + queue.getQueueName()); - if (shouldNotifyAttemptAdded) { - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); - } else { + if (isAttemptRecovering) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping notifying ATTEMPT_ADDED"); + LOG.debug(applicationAttemptId + + " is recovering. Skipping notifying ATTEMPT_ADDED"); } + } else { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); } } @@ -905,7 +912,8 @@ public class CapacityScheduler extends { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getIsAppRecovering()); } break; case APP_REMOVED: @@ -921,7 +929,7 @@ public class CapacityScheduler extends (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), - appAttemptAddedEvent.getShouldNotifyAttemptAdded()); + appAttemptAddedEvent.getIsAttemptRecovering()); } break; case APP_ATTEMPT_REMOVED: Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java?rev=1611222&r1=1611221&r2=1611222&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java Thu Jul 17 00:14:56 2014 @@ -25,13 +25,20 @@ public class AppAddedSchedulerEvent exte private final ApplicationId applicationId; private final String queue; private final String user; + private final boolean isAppRecovering; public AppAddedSchedulerEvent( ApplicationId applicationId, String queue, String user) { + this(applicationId, queue, user, false); + } + + public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, + String user, boolean isAppRecovering) { super(SchedulerEventType.APP_ADDED); this.applicationId = applicationId; this.queue = queue; this.user = user; + this.isAppRecovering = isAppRecovering; } public ApplicationId getApplicationId() { @@ -46,4 +53,7 @@ public class AppAddedSchedulerEvent exte return user; } + public boolean getIsAppRecovering() { + return isAppRecovering; + } } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1611222&r1=1611221&r2=1611222&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Thu Jul 17 00:14:56 2014 @@ -24,22 +24,22 @@ public class AppAttemptAddedSchedulerEve private final ApplicationAttemptId applicationAttemptId; private final boolean transferStateFromPreviousAttempt; - private final boolean shouldNotifyAttemptAdded; + private final boolean isAttemptRecovering; public AppAttemptAddedSchedulerEvent( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt) { - this(applicationAttemptId, transferStateFromPreviousAttempt, true); + this(applicationAttemptId, transferStateFromPreviousAttempt, false); } public AppAttemptAddedSchedulerEvent( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, - boolean shouldNotifyAttemptAdded) { + boolean isAttemptRecovering) { super(SchedulerEventType.APP_ATTEMPT_ADDED); this.applicationAttemptId = applicationAttemptId; this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt; - this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded; + this.isAttemptRecovering = isAttemptRecovering; } public ApplicationAttemptId getApplicationAttemptId() { @@ -50,7 +50,7 @@ public class AppAttemptAddedSchedulerEve return transferStateFromPreviousAttempt; } - public boolean getShouldNotifyAttemptAdded() { - return shouldNotifyAttemptAdded; + public boolean getIsAttemptRecovering() { + return isAttemptRecovering; } } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1611222&r1=1611221&r2=1611222&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu Jul 17 00:14:56 2014 @@ -566,7 +566,7 @@ public class FairScheduler extends * configured limits, but the app will not be marked as runnable. */ protected synchronized void addApplication(ApplicationId applicationId, - String queueName, String user) { + String queueName, String user, boolean isAppRecovering) { if (queueName == null || queueName.isEmpty()) { String message = "Reject application " + applicationId + " submitted by user " + user + " with an empty queue name."; @@ -603,8 +603,14 @@ public class FairScheduler extends LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName + ", currently num of applications: " + applications.size()); - rmContext.getDispatcher().getEventHandler() + if (isAppRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); + } + } else { + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } } /** @@ -613,7 +619,7 @@ public class FairScheduler extends protected synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, - boolean shouldNotifyAttemptAdded) { + boolean isAttemptRecovering) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); String user = application.getUser(); @@ -642,14 +648,15 @@ public class FairScheduler extends LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user); - if (shouldNotifyAttemptAdded) { - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); - } else { + if (isAttemptRecovering) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping notifying ATTEMPT_ADDED"); + LOG.debug(applicationAttemptId + + " is recovering. Skipping notifying ATTEMPT_ADDED"); } + } else { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); } } @@ -1136,7 +1143,8 @@ public class FairScheduler extends } AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getIsAppRecovering()); break; case APP_REMOVED: if (!(event instanceof AppRemovedSchedulerEvent)) { @@ -1154,7 +1162,7 @@ public class FairScheduler extends (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), - appAttemptAddedEvent.getShouldNotifyAttemptAdded()); + appAttemptAddedEvent.getIsAttemptRecovering()); break; case APP_ATTEMPT_REMOVED: if (!(event instanceof AppAttemptRemovedSchedulerEvent)) { Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1611222&r1=1611221&r2=1611222&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Thu Jul 17 00:14:56 2014 @@ -356,22 +356,28 @@ public class FifoScheduler extends @VisibleForTesting public synchronized void addApplication(ApplicationId applicationId, - String queue, String user) { + String queue, String user, boolean isAppRecovering) { SchedulerApplication application = new SchedulerApplication(DEFAULT_QUEUE, user); applications.put(applicationId, application); metrics.submitApp(user); LOG.info("Accepted application " + applicationId + " from user: " + user + ", currently num of applications: " + applications.size()); - rmContext.getDispatcher().getEventHandler() + if (isAppRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); + } + } else { + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } } @VisibleForTesting public synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId, boolean transferStateFromPreviousAttempt, - boolean shouldNotifyAttemptAdded) { + boolean isAttemptRecovering) { SchedulerApplication application = applications.get(appAttemptId.getApplicationId()); String user = application.getUser(); @@ -389,14 +395,15 @@ public class FifoScheduler extends metrics.submitAppAttempt(user); LOG.info("Added Application Attempt " + appAttemptId + " to scheduler from user " + application.getUser()); - if (shouldNotifyAttemptAdded) { - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(appAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); - } else { + if (isAttemptRecovering) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping notifying ATTEMPT_ADDED"); + LOG.debug(appAttemptId + + " is recovering. Skipping notifying ATTEMPT_ADDED"); } + } else { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(appAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); } } @@ -772,7 +779,8 @@ public class FifoScheduler extends { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getIsAppRecovering()); } break; case APP_REMOVED: @@ -788,7 +796,7 @@ public class FifoScheduler extends (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), - appAttemptAddedEvent.getShouldNotifyAttemptAdded()); + appAttemptAddedEvent.getIsAttemptRecovering()); } break; case APP_ATTEMPT_REMOVED: Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1611222&r1=1611221&r2=1611222&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Thu Jul 17 00:14:56 2014 @@ -228,7 +228,7 @@ public class TestFifoScheduler { scheduler.handle(new NodeAddedSchedulerEvent(node)); ApplicationId appId = ApplicationId.newInstance(0, 1); - scheduler.addApplication(appId, "queue1", "user1"); + scheduler.addApplication(appId, "queue1", "user1", true); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); try { Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1611222&r1=1611221&r2=1611222&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Thu Jul 17 00:14:56 2014 @@ -610,6 +610,36 @@ public class TestWorkPreservingRMRestart attempt0.getMasterContainer().getId()).isAMContainer()); } + @Test (timeout = 20000) + public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception { + // start RM + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); + + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + // scheduler app/attempt is immediately available after RM is re-started. + Assert.assertNotNull(rm2.getResourceScheduler().getSchedulerAppInfo( + am0.getApplicationAttemptId())); + + // getTransferredContainers should not throw NPE. + ((AbstractYarnScheduler) rm2.getResourceScheduler()) + .getTransferredContainers(am0.getApplicationAttemptId()); + + List containers = createNMContainerStatusForApp(am0); + nm1.registerNode(containers, null); + waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId()); + } private void asserteMetrics(QueueMetrics qm, int appsSubmitted, int appsPending, int appsRunning, int appsCompleted, Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java?rev=1611222&r1=1611221&r2=1611222&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java Thu Jul 17 00:14:56 2014 @@ -147,7 +147,7 @@ public class FairSchedulerTestBase { int memory, int vcores, String queueId, String userId, int numContainers, int priority) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(id.getApplicationId(), queueId, userId); + scheduler.addApplication(id.getApplicationId(), queueId, userId, true); // This conditional is for testAclSubmitApplication where app is rejected // and no app is added. if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1611222&r1=1611221&r2=1611222&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu Jul 17 00:14:56 2014 @@ -793,13 +793,13 @@ public class TestFairScheduler extends F scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId id11 = createAppAttemptId(1, 1); - scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1"); + scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", true); scheduler.addApplicationAttempt(id11, false, true); ApplicationAttemptId id21 = createAppAttemptId(2, 1); - scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1"); + scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1", true); scheduler.addApplicationAttempt(id21, false, true); ApplicationAttemptId id22 = createAppAttemptId(2, 2); - scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1"); + scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1", true); scheduler.addApplicationAttempt(id22, false, true); int minReqSize = @@ -1561,7 +1561,7 @@ public class TestFairScheduler extends F scheduler.handle(nodeEvent2); ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(appId.getApplicationId(), "queue1", "user1"); + scheduler.addApplication(appId.getApplicationId(), "queue1", "user1", true); scheduler.addApplicationAttempt(appId, false, true); // 1 request with 2 nodes on the same rack. another request with 1 node on @@ -1843,7 +1843,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId = ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++); - scheduler.addApplication(attId.getApplicationId(), queue, user); + scheduler.addApplication(attId.getApplicationId(), queue, user, true); numTries = 0; while (application.getFinishTime() == 0 && numTries < MAX_TRIES) { @@ -2720,7 +2720,7 @@ public class TestFairScheduler extends F // send application request ApplicationAttemptId appAttemptId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11"); + fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", true); fs.addApplicationAttempt(appAttemptId, false, true); List ask = new ArrayList(); ResourceRequest request =