hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1611223 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/ hadoop-yarn/hadoop-yarn-server/hadoo...
Date Thu, 17 Jul 2014 00:15:29 GMT
Author: vinodkv
Date: Thu Jul 17 00:15:28 2014
New Revision: 1611223

URL: http://svn.apache.org/r1611223
Log:
YARN-2219. Changed ResourceManager to avoid AMs and NMs getting exceptions after RM recovery
but before scheduler learns about apps and app-attempts. Contributed by Jian He.
svn merge --ignore-ancestry -c 1611222 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu Jul 17 00:15:28 2014
@@ -44,6 +44,10 @@ Release 2.6.0 - UNRELEASED
     YARN-2264. Fixed a race condition in DrainDispatcher which may cause random
     test failures. (Li Lu via jianhe)
 
+    YARN-2219. Changed ResourceManager to avoid AMs and NMs getting exceptions
+    after RM recovery but before scheduler learns about apps and app-attempts.
+    (Jian He via vinodkv)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
Thu Jul 17 00:15:28 2014
@@ -205,12 +205,6 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
-    // ACCECPTED state can once again receive APP_ACCEPTED event, because on
-    // recovery the app returns ACCEPTED state and the app once again go
-    // through the scheduler and triggers one more APP_ACCEPTED event at
-    // ACCEPTED state.
-    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
-        RMAppEventType.APP_ACCEPTED)
 
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -789,8 +783,18 @@ public class RMAppImpl implements RMApp,
         return app.recoveredFinalState;
       }
 
-      // Notify scheduler about the app on recovery
-      new AddApplicationToSchedulerTransition().transition(app, event);
+      // No existent attempts means the attempt associated with this app was not
+      // started or started but not yet saved.
+      if (app.attempts.isEmpty()) {
+        app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
+          app.submissionContext.getQueue(), app.user));
+        return RMAppState.SUBMITTED;
+      }
+
+      // Add application to scheduler synchronously to guarantee scheduler
+      // knows applications before AM or NM re-registers.
+      app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
+        app.submissionContext.getQueue(), app.user, true));
 
       // recover attempts
       app.recoverAppAttempts();
@@ -805,12 +809,6 @@ public class RMAppImpl implements RMApp,
         return RMAppState.ACCEPTED;
       }
 
-      // No existent attempts means the attempt associated with this app was not
-      // started or started but not yet saved.
-      if (app.attempts.isEmpty()) {
-        return RMAppState.SUBMITTED;
-      }
-
       // YARN-1507 is saving the application state after the application is
       // accepted. So after YARN-1507, an app is saved meaning it is accepted.
       // Thus we return ACCECPTED state on recovery.

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
Thu Jul 17 00:15:28 2014
@@ -926,8 +926,10 @@ public class RMAppAttemptImpl implements
           appAttempt.masterService
               .registerAppAttempt(appAttempt.applicationAttemptId);
 
-          appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
-            appAttempt.getAppAttemptId(), false, false));
+          // Add attempt to scheduler synchronously to guarantee scheduler
+          // knows attempts before AM or NM re-registers.
+          appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent(
+            appAttempt.getAppAttemptId(), false, true));
         }
 
         /*

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
Thu Jul 17 00:15:28 2014
@@ -521,7 +521,7 @@ public class CapacityScheduler extends
   }
 
   private synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user) {
+      String queueName, String user, boolean isAppRecovering) {
     // santiy checks.
     CSQueue queue = getQueue(queueName);
     if (queue == null) {
@@ -553,14 +553,20 @@ public class CapacityScheduler extends
     applications.put(applicationId, application);
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", in queue: " + queueName);
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   private synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
-      boolean shouldNotifyAttemptAdded) {
+      boolean isAttemptRecovering) {
     SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     CSQueue queue = (CSQueue) application.getQueue();
@@ -578,14 +584,15 @@ public class CapacityScheduler extends
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user " + application.getUser() + " in queue "
         + queue.getQueueName());
-    if (shouldNotifyAttemptAdded) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptEvent(applicationAttemptId,
-              RMAppAttemptEventType.ATTEMPT_ADDED));
-    } else {
+    if (isAttemptRecovering) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+        LOG.debug(applicationAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
       }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(applicationAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     }
   }
 
@@ -905,7 +912,8 @@ public class CapacityScheduler extends
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
     }
     break;
     case APP_REMOVED:
@@ -921,7 +929,7 @@ public class CapacityScheduler extends
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
         appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
-        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+        appAttemptAddedEvent.getIsAttemptRecovering());
     }
     break;
     case APP_ATTEMPT_REMOVED:

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
Thu Jul 17 00:15:28 2014
@@ -25,13 +25,20 @@ public class AppAddedSchedulerEvent exte
   private final ApplicationId applicationId;
   private final String queue;
   private final String user;
+  private final boolean isAppRecovering;
 
   public AppAddedSchedulerEvent(
       ApplicationId applicationId, String queue, String user) {
+    this(applicationId, queue, user, false);
+  }
+
+  public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+      String user, boolean isAppRecovering) {
     super(SchedulerEventType.APP_ADDED);
     this.applicationId = applicationId;
     this.queue = queue;
     this.user = user;
+    this.isAppRecovering = isAppRecovering;
   }
 
   public ApplicationId getApplicationId() {
@@ -46,4 +53,7 @@ public class AppAddedSchedulerEvent exte
     return user;
   }
 
+  public boolean getIsAppRecovering() {
+    return isAppRecovering;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
Thu Jul 17 00:15:28 2014
@@ -24,22 +24,22 @@ public class AppAttemptAddedSchedulerEve
 
   private final ApplicationAttemptId applicationAttemptId;
   private final boolean transferStateFromPreviousAttempt;
-  private final boolean shouldNotifyAttemptAdded;
+  private final boolean isAttemptRecovering;
 
   public AppAttemptAddedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt) {
-    this(applicationAttemptId, transferStateFromPreviousAttempt, true);
+    this(applicationAttemptId, transferStateFromPreviousAttempt, false);
   }
 
   public AppAttemptAddedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
-      boolean shouldNotifyAttemptAdded) {
+      boolean isAttemptRecovering) {
     super(SchedulerEventType.APP_ATTEMPT_ADDED);
     this.applicationAttemptId = applicationAttemptId;
     this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
-    this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded;
+    this.isAttemptRecovering = isAttemptRecovering;
   }
 
   public ApplicationAttemptId getApplicationAttemptId() {
@@ -50,7 +50,7 @@ public class AppAttemptAddedSchedulerEve
     return transferStateFromPreviousAttempt;
   }
 
-  public boolean getShouldNotifyAttemptAdded() {
-    return shouldNotifyAttemptAdded;
+  public boolean getIsAttemptRecovering() {
+    return isAttemptRecovering;
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
Thu Jul 17 00:15:28 2014
@@ -566,7 +566,7 @@ public class FairScheduler extends
    * configured limits, but the app will not be marked as runnable.
    */
   protected synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user) {
+      String queueName, String user, boolean isAppRecovering) {
     if (queueName == null || queueName.isEmpty()) {
       String message = "Reject application " + applicationId +
               " submitted by user " + user + " with an empty queue name.";
@@ -603,8 +603,14 @@ public class FairScheduler extends
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", in queue: " + queueName + ", currently num of applications: "
         + applications.size());
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   /**
@@ -613,7 +619,7 @@ public class FairScheduler extends
   protected synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
-      boolean shouldNotifyAttemptAdded) {
+      boolean isAttemptRecovering) {
     SchedulerApplication<FSSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     String user = application.getUser();
@@ -642,14 +648,15 @@ public class FairScheduler extends
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user: " + user);
 
-    if (shouldNotifyAttemptAdded) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptEvent(applicationAttemptId,
-              RMAppAttemptEventType.ATTEMPT_ADDED));
-    } else {
+    if (isAttemptRecovering) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+        LOG.debug(applicationAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
       }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(applicationAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     }
   }
 
@@ -1136,7 +1143,8 @@ public class FairScheduler extends
       }
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
       break;
     case APP_REMOVED:
       if (!(event instanceof AppRemovedSchedulerEvent)) {
@@ -1154,7 +1162,7 @@ public class FairScheduler extends
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
         appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
-        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+        appAttemptAddedEvent.getIsAttemptRecovering());
       break;
     case APP_ATTEMPT_REMOVED:
       if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
Thu Jul 17 00:15:28 2014
@@ -356,22 +356,28 @@ public class FifoScheduler extends
 
   @VisibleForTesting
   public synchronized void addApplication(ApplicationId applicationId,
-      String queue, String user) {
+      String queue, String user, boolean isAppRecovering) {
     SchedulerApplication<FiCaSchedulerApp> application =
         new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
     applications.put(applicationId, application);
     metrics.submitApp(user);
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", currently num of applications: " + applications.size());
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   @VisibleForTesting
   public synchronized void
       addApplicationAttempt(ApplicationAttemptId appAttemptId,
           boolean transferStateFromPreviousAttempt,
-          boolean shouldNotifyAttemptAdded) {
+          boolean isAttemptRecovering) {
     SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(appAttemptId.getApplicationId());
     String user = application.getUser();
@@ -389,14 +395,15 @@ public class FifoScheduler extends
     metrics.submitAppAttempt(user);
     LOG.info("Added Application Attempt " + appAttemptId
         + " to scheduler from user " + application.getUser());
-    if (shouldNotifyAttemptAdded) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptEvent(appAttemptId,
-              RMAppAttemptEventType.ATTEMPT_ADDED));
-    } else {
+    if (isAttemptRecovering) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+        LOG.debug(appAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
       }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(appAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     }
   }
 
@@ -772,7 +779,8 @@ public class FifoScheduler extends
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
     }
     break;
     case APP_REMOVED:
@@ -788,7 +796,7 @@ public class FifoScheduler extends
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
         appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
-        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+        appAttemptAddedEvent.getIsAttemptRecovering());
     }
     break;
     case APP_ATTEMPT_REMOVED:

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
Thu Jul 17 00:15:28 2014
@@ -228,7 +228,7 @@ public class TestFifoScheduler {
     scheduler.handle(new NodeAddedSchedulerEvent(node));
 
     ApplicationId appId = ApplicationId.newInstance(0, 1);
-    scheduler.addApplication(appId, "queue1", "user1");
+    scheduler.addApplication(appId, "queue1", "user1", true);
 
     NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
     try {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
Thu Jul 17 00:15:28 2014
@@ -610,6 +610,36 @@ public class TestWorkPreservingRMRestart
         attempt0.getMasterContainer().getId()).isAMContainer());
   }
 
+  @Test (timeout = 20000)
+  public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception {
+    // start RM
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
+
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    // scheduler app/attempt is immediately available after RM is re-started.
+    Assert.assertNotNull(rm2.getResourceScheduler().getSchedulerAppInfo(
+      am0.getApplicationAttemptId()));
+
+    // getTransferredContainers should not throw NPE.
+    ((AbstractYarnScheduler) rm2.getResourceScheduler())
+      .getTransferredContainers(am0.getApplicationAttemptId());
+
+    List<NMContainerStatus> containers = createNMContainerStatusForApp(am0);
+    nm1.registerNode(containers, null);
+    waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
+  }
 
   private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
       int appsPending, int appsRunning, int appsCompleted,

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
Thu Jul 17 00:15:28 2014
@@ -147,7 +147,7 @@ public class FairSchedulerTestBase {
       int memory, int vcores, String queueId, String userId, int numContainers,
       int priority) {
     ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
-    scheduler.addApplication(id.getApplicationId(), queueId, userId);
+    scheduler.addApplication(id.getApplicationId(), queueId, userId, true);
     // This conditional is for testAclSubmitApplication where app is rejected
     // and no app is added.
     if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Thu Jul 17 00:15:28 2014
@@ -793,13 +793,13 @@ public class TestFairScheduler extends F
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     ApplicationAttemptId id11 = createAppAttemptId(1, 1);
-    scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1");
+    scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", true);
     scheduler.addApplicationAttempt(id11, false, true);
     ApplicationAttemptId id21 = createAppAttemptId(2, 1);
-    scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1");
+    scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1", true);
     scheduler.addApplicationAttempt(id21, false, true);
     ApplicationAttemptId id22 = createAppAttemptId(2, 2);
-    scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1");
+    scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1", true);
     scheduler.addApplicationAttempt(id22, false, true);
 
     int minReqSize = 
@@ -1561,7 +1561,7 @@ public class TestFairScheduler extends F
     scheduler.handle(nodeEvent2);
     
     ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
-    scheduler.addApplication(appId.getApplicationId(), "queue1", "user1");
+    scheduler.addApplication(appId.getApplicationId(), "queue1", "user1", true);
     scheduler.addApplicationAttempt(appId, false, true);
     
     // 1 request with 2 nodes on the same rack. another request with 1 node on
@@ -1843,7 +1843,7 @@ public class TestFairScheduler extends F
 
     ApplicationAttemptId attId =
         ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
-    scheduler.addApplication(attId.getApplicationId(), queue, user);
+    scheduler.addApplication(attId.getApplicationId(), queue, user, true);
 
     numTries = 0;
     while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
@@ -2720,7 +2720,7 @@ public class TestFairScheduler extends F
     // send application request
     ApplicationAttemptId appAttemptId =
             createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
-    fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11");
+    fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", true);
     fs.addApplicationAttempt(appAttemptId, false, true);
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     ResourceRequest request =



Mime
View raw message