hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1557144 [1/3] - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/proto/ hadoop-yarn/hadoop-yarn-common/src/main/j...
Date Fri, 10 Jan 2014 15:05:19 GMT
Author: vinodkv
Date: Fri Jan 10 15:05:18 2014
New Revision: 1557144

URL: http://svn.apache.org/r1557144
Log:
YARN-1490. Introduced the ability to make ResourceManager optionally not kill all containers when an ApplicationMaster exits. Contributed by Jian He.
svn merge --ignore-ancestry -c 1557143 ../../trunk/

Added:
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppStartAttemptEvent.java
      - copied unchanged from r1557143, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppStartAttemptEvent.java
Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Fri Jan 10 15:05:18 2014
@@ -40,6 +40,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1029. Added embedded leader election in the ResourceManager. (Karthik
     Kambatla via vinodkv)
 
+    YARN-1490. Introduced the ability to make ResourceManager optionally not kill
+    all containers when an ApplicationMaster exits. (Jian He via vinodkv)
+
   IMPROVEMENTS
 
     YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java Fri Jan 10 15:05:18 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.util.Records;
 
 /**
@@ -57,7 +58,8 @@ public abstract class ApplicationSubmiss
       ApplicationId applicationId, String applicationName, String queue,
       Priority priority, ContainerLaunchContext amContainer,
       boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
-      int maxAppAttempts, Resource resource, String applicationType) {
+      int maxAppAttempts, Resource resource, String applicationType,
+      boolean keepContainers) {
     ApplicationSubmissionContext context =
         Records.newRecord(ApplicationSubmissionContext.class);
     context.setApplicationId(applicationId);
@@ -70,6 +72,7 @@ public abstract class ApplicationSubmiss
     context.setMaxAppAttempts(maxAppAttempts);
     context.setResource(resource);
     context.setApplicationType(applicationType);
+    context.setKeepContainersAcrossApplicationAttempts(keepContainers);
     return context;
   }
 
@@ -79,6 +82,18 @@ public abstract class ApplicationSubmiss
       ApplicationId applicationId, String applicationName, String queue,
       Priority priority, ContainerLaunchContext amContainer,
       boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
+      int maxAppAttempts, Resource resource, String applicationType) {
+    return newInstance(applicationId, applicationName, queue, priority,
+      amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
+      resource, null, false);
+  }
+
+  @Public
+  @Stable
+  public static ApplicationSubmissionContext newInstance(
+      ApplicationId applicationId, String applicationName, String queue,
+      Priority priority, ContainerLaunchContext amContainer,
+      boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
       int maxAppAttempts, Resource resource) {
     return newInstance(applicationId, applicationName, queue, priority,
       amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
@@ -268,4 +283,35 @@ public abstract class ApplicationSubmiss
   @Public
   @Stable
   public abstract void setApplicationType(String applicationType);
+
+
+  /**
+   * Get the flag which indicates whether to keep containers across application
+   * attempts or not.
+   * 
+   * @return the flag which indicates whether to keep containers across
+   *         application attempts or not.
+   */
+  @Public
+  @Stable
+  public abstract boolean getKeepContainersAcrossApplicationAttempts();
+
+  /**
+   * Set the flag which indicates whether to keep containers across application
+   * attempts.
+   * <p>
+   * If the flag is true, running containers will not be killed when application
+   * attempt fails and these containers will be retrieved by the new application
+   * attempt on registration via
+   * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}.
+   * </p>
+   * 
+   * @param keepContainers
+   *          the flag which indicates whether to keep containers across
+   *          application attempts.
+   */
+  @Public
+  @Stable
+  public abstract void setKeepContainersAcrossApplicationAttempts(
+      boolean keepContainers);
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java Fri Jan 10 15:05:18 2014
@@ -46,10 +46,20 @@ public abstract class ContainerId implem
   }
 
   /**
-   * Get the <code>ApplicationAttemptId</code> of the application to which
-   * the <code>Container</code> was assigned.
-   * @return <code>ApplicationAttemptId</code> of the application to which
-   *         the <code>Container</code> was assigned
+   * Get the <code>ApplicationAttemptId</code> of the application to which the
+   * <code>Container</code> was assigned.
+   * <p>
+   * Note: If containers are kept alive across application attempts via
+   * {@link ApplicationSubmissionContext#setKeepContainersAcrossApplicationAttempts(boolean)}
+   * the <code>ContainerId</code> does not necessarily contain the current
+   * running application attempt's <code>ApplicationAttemptId</code> This
+   * container can be allocated by previously exited application attempt and
+   * managed by the current running attempt thus have the previous application
+   * attempt's <code>ApplicationAttemptId</code>.
+   * </p>
+   * 
+   * @return <code>ApplicationAttemptId</code> of the application to which the
+   *         <code>Container</code> was assigned
    */
   @Public
   @Stable

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto Fri Jan 10 15:05:18 2014
@@ -248,6 +248,7 @@ message ApplicationSubmissionContextProt
   optional int32 maxAppAttempts = 8 [default = 0];
   optional ResourceProto resource = 9;
   optional string applicationType = 10 [default = "YARN"];
+  optional bool keep_containers_across_application_attempts = 11 [default = false];
 }
 
 enum ApplicationAccessTypeProto {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java Fri Jan 10 15:05:18 2014
@@ -298,6 +298,19 @@ extends ApplicationSubmissionContext {
     this.resource = resource;
   }
 
+  @Override
+  public void
+      setKeepContainersAcrossApplicationAttempts(boolean keepContainers) {
+    maybeInitBuilder();
+    builder.setKeepContainersAcrossApplicationAttempts(keepContainers);
+  }
+
+  @Override
+  public boolean getKeepContainersAcrossApplicationAttempts() {
+    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getKeepContainersAcrossApplicationAttempts();
+  }
+
   private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
     return new PriorityPBImpl(p);
   }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Fri Jan 10 15:05:18 2014
@@ -421,21 +421,26 @@ public class ApplicationMasterService ex
         LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
         throw e;
       }
-      
-      try {
-        RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
-      } catch (InvalidContainerReleaseException e) {
-        LOG.warn("Invalid container release by application " + appAttemptId, e);
-        throw e;
+
+      RMApp app =
+          this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
+      // In the case of work-preserving AM restart, it's possible for the
+      // AM to release containers from the earlier attempt.
+      if (!app.getApplicationSubmissionContext()
+        .getKeepContainersAcrossApplicationAttempts()) {
+        try {
+          RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
+        } catch (InvalidContainerReleaseException e) {
+          LOG.warn("Invalid container release by application " + appAttemptId, e);
+          throw e;
+        }
       }
-      
+
       // Send new requests to appAttempt.
       Allocation allocation =
           this.rScheduler.allocate(appAttemptId, ask, release, 
               blacklistAdditions, blacklistRemovals);
 
-      RMApp app = this.rmContext.getRMApps().get(
-          appAttemptId.getApplicationId());
       RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
       
       AllocateResponse allocateResponse =
@@ -591,4 +596,4 @@ public class ApplicationMasterService ex
       this.response = response;
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java Fri Jan 10 15:05:18 2014
@@ -23,14 +23,20 @@ import org.apache.hadoop.yarn.api.record
 public class RMAppFailedAttemptEvent extends RMAppEvent {
 
   private final String diagnostics;
+  private final boolean transferStateFromPreviousAttempt;
 
   public RMAppFailedAttemptEvent(ApplicationId appId, RMAppEventType event, 
-      String diagnostics) {
+      String diagnostics, boolean transferStateFromPreviousAttempt) {
     super(appId, event);
     this.diagnostics = diagnostics;
+    this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
   }
 
   public String getDiagnostics() {
     return this.diagnostics;
   }
+
+  public boolean getTransferStateFromPreviousAttempt() {
+    return transferStateFromPreviousAttempt;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Fri Jan 10 15:05:18 2014
@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppStartAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -76,6 +77,7 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public class RMAppImpl implements RMApp, Recoverable {
 
   private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
@@ -633,7 +635,7 @@ public class RMAppImpl implements RMApp,
       this.writeLock.unlock();
     }
   }
-  
+
   @Override
   public void recover(RMState state) throws Exception{
     ApplicationState appState = state.getApplicationState().get(getApplicationId());
@@ -646,26 +648,28 @@ public class RMAppImpl implements RMApp,
 
     for(int i=0; i<appState.getAttemptCount(); ++i) {
       // create attempt
-      createNewAttempt(false);
+      createNewAttempt();
       ((RMAppAttemptImpl)this.currentAttempt).recover(state);
     }
   }
 
-  @SuppressWarnings("unchecked")
-  private void createNewAttempt(boolean startAttempt) {
+  private void createNewAttempt() {
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
     RMAppAttempt attempt =
         new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
-          submissionContext, conf);
+          submissionContext, conf, maxAppAttempts == attempts.size());
     attempts.put(appAttemptId, attempt);
     currentAttempt = attempt;
-    if(startAttempt) {
-      handler.handle(
-          new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
-    }
   }
   
+  private void
+      createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
+    createNewAttempt();
+    handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
+      transferStateFromPreviousAttempt));
+  }
+
   private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
     NodeState nodeState = node.getState();
     updatedNodes.add(node);
@@ -688,7 +692,6 @@ public class RMAppImpl implements RMApp,
     };
   }
 
-  @SuppressWarnings("unchecked")
   private static final class RMAppRecoveredTransition implements
       MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
 
@@ -729,7 +732,6 @@ public class RMAppImpl implements RMApp,
 
   private static final class AddApplicationToSchedulerTransition extends
       RMAppTransition {
-    @SuppressWarnings("unchecked")
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
       if (event instanceof RMAppNewSavedEvent) {
@@ -751,14 +753,13 @@ public class RMAppImpl implements RMApp,
   private static final class StartAppAttemptTransition extends RMAppTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      app.createNewAttempt(true);
+      app.createAndStartNewAttempt(false);
     };
   }
 
   private static final class FinalStateSavedTransition implements
       MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
       RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
@@ -959,7 +960,6 @@ public class RMAppImpl implements RMApp,
   }
 
   private static class KillAttemptTransition extends RMAppTransition {
-    @SuppressWarnings("unchecked")
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
       app.stateBeforeKilling = app.getState();
@@ -987,7 +987,6 @@ public class RMAppImpl implements RMApp,
       return nodes;
     }
 
-    @SuppressWarnings("unchecked")
     public void transition(RMAppImpl app, RMAppEvent event) {
       Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
       for (NodeId nodeId : nodes) {
@@ -1019,7 +1018,21 @@ public class RMAppImpl implements RMApp,
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
       if (!app.submissionContext.getUnmanagedAM()
           && app.attempts.size() < app.maxAppAttempts) {
-        app.createNewAttempt(true);
+        boolean transferStateFromPreviousAttempt = false;
+        RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
+        transferStateFromPreviousAttempt =
+            failedEvent.getTransferStateFromPreviousAttempt();
+
+        RMAppAttempt oldAttempt = app.currentAttempt;
+        app.createAndStartNewAttempt(transferStateFromPreviousAttempt);
+        // Transfer the state from the previous attempt to the current attempt.
+        // Note that the previous failed attempt may still be collecting the
+        // container events from the scheduler and update its data structures
+        // before the new attempt is created.
+        if (transferStateFromPreviousAttempt) {
+          ((RMAppAttemptImpl) app.currentAttempt)
+            .transferStateFromPreviousAttempt(oldAttempt);
+        }
         return initialState;
       } else {
         app.rememberTargetTransitionsAndStoreState(event,

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Fri Jan 10 15:05:18 2014
@@ -129,9 +129,9 @@ public class RMAppAttemptImpl implements
   private SecretKey clientTokenMasterKey = null;
 
   //nodes on while this attempt's containers ran
-  private final Set<NodeId> ranNodes =
+  private Set<NodeId> ranNodes =
     new HashSet<NodeId>();
-  private final List<ContainerStatus> justFinishedContainers =
+  private List<ContainerStatus> justFinishedContainers =
     new ArrayList<ContainerStatus>();
   private Container masterContainer;
 
@@ -148,7 +148,7 @@ public class RMAppAttemptImpl implements
   private final StringBuilder diagnostics = new StringBuilder();
 
   private Configuration conf;
-  
+  private final boolean isLastAttempt;
   private static final ExpiredTransition EXPIRED_TRANSITION =
       new ExpiredTransition();
 
@@ -330,6 +330,12 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.KILL))
 
       // Transitions from FAILED State
+      // For work-preserving AM restart, failed attempt are still capturing
+      // CONTAINER_FINISHED event and record the finished containers for the
+      // use by the next new attempt.
+      .addTransition(RMAppAttemptState.FAILED, RMAppAttemptState.FAILED,
+          RMAppAttemptEventType.CONTAINER_FINISHED,
+          new ContainerFinishedAtFailedTransition())
       .addTransition(
           RMAppAttemptState.FAILED,
           RMAppAttemptState.FAILED,
@@ -338,8 +344,7 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.KILL,
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.STATUS_UPDATE,
-              RMAppAttemptEventType.CONTAINER_ALLOCATED,
-              RMAppAttemptEventType.CONTAINER_FINISHED))
+              RMAppAttemptEventType.CONTAINER_ALLOCATED))
 
       // Transitions from FINISHING State
       .addTransition(RMAppAttemptState.FINISHING,
@@ -390,7 +395,7 @@ public class RMAppAttemptImpl implements
       RMContext rmContext, YarnScheduler scheduler,
       ApplicationMasterService masterService,
       ApplicationSubmissionContext submissionContext,
-      Configuration conf) {
+      Configuration conf, boolean isLastAttempt) {
     this.conf = conf;
     this.applicationAttemptId = appAttemptId;
     this.rmContext = rmContext;
@@ -404,7 +409,7 @@ public class RMAppAttemptImpl implements
     this.writeLock = lock.writeLock();
 
     this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
-    
+    this.isLastAttempt = isLastAttempt;
     this.stateMachine = stateMachineFactory.make(this);
   }
 
@@ -416,7 +421,7 @@ public class RMAppAttemptImpl implements
   @Override
   public ApplicationSubmissionContext getSubmissionContext() {
     return this.submissionContext;
-  } 
+  }
 
   @Override
   public FinalApplicationStatus getFinalApplicationStatus() {
@@ -685,6 +690,11 @@ public class RMAppAttemptImpl implements
     this.startTime = attemptState.getStartTime();
   }
 
+  public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
+    this.justFinishedContainers = attempt.getJustFinishedContainers();
+    this.ranNodes = attempt.getRanNodes();
+  }
+
   private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
       throws IOException {
     if (appAttemptTokens == null) {
@@ -721,6 +731,12 @@ public class RMAppAttemptImpl implements
     public void transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
 
+	    boolean transferStateFromPreviousAttempt = false;
+      if (event instanceof RMAppStartAttemptEvent) {
+        transferStateFromPreviousAttempt =
+            ((RMAppStartAttemptEvent) event)
+              .getTransferStateFromPreviousAttempt();
+      }
       appAttempt.startTime = System.currentTimeMillis();
 
       // Register with the ApplicationMasterService
@@ -740,9 +756,10 @@ public class RMAppAttemptImpl implements
           new Token<AMRMTokenIdentifier>(id,
             appAttempt.rmContext.getAMRMTokenSecretManager());
 
-      // Add the applicationAttempt to the scheduler
+      // Add the applicationAttempt to the scheduler and inform the scheduler
+      // whether to transfer the state from previous attempt.
       appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
-        appAttempt.applicationAttemptId));
+        appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
     }
   }
 
@@ -981,6 +998,7 @@ public class RMAppAttemptImpl implements
       // Tell the application and the scheduler
       ApplicationId applicationId = appAttemptId.getApplicationId();
       RMAppEvent appEvent = null;
+      boolean keepContainersAcrossAppAttempts = false;
       switch (finalAttemptState) {
         case FINISHED:
         {
@@ -996,7 +1014,7 @@ public class RMAppAttemptImpl implements
           appEvent =
               new RMAppFailedAttemptEvent(applicationId,
                   RMAppEventType.ATTEMPT_KILLED,
-                  "Application killed by user.");
+                  "Application killed by user.", false);
         }
         break;
         case FAILED:
@@ -1004,10 +1022,17 @@ public class RMAppAttemptImpl implements
           // don't leave the tracking URL pointing to a non-existent AM
           appAttempt.setTrackingUrlToRMAppPage();
           appAttempt.invalidateAMHostAndPort();
+          if (appAttempt.submissionContext
+            .getKeepContainersAcrossApplicationAttempts()
+              && !appAttempt.isLastAttempt
+              && !appAttempt.submissionContext.getUnmanagedAM()) {
+            keepContainersAcrossAppAttempts = true;
+          }
           appEvent =
               new RMAppFailedAttemptEvent(applicationId,
-                  RMAppEventType.ATTEMPT_FAILED,
-                  appAttempt.getDiagnostics());
+                RMAppEventType.ATTEMPT_FAILED, appAttempt.getDiagnostics(),
+                keepContainersAcrossAppAttempts);
+
         }
         break;
         default:
@@ -1019,7 +1044,7 @@ public class RMAppAttemptImpl implements
 
       appAttempt.eventHandler.handle(appEvent);
       appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent(
-        appAttemptId, finalAttemptState));
+        appAttemptId, finalAttemptState, keepContainersAcrossAppAttempts));
       appAttempt.removeCredentials(appAttempt);
     }
   }
@@ -1045,6 +1070,11 @@ public class RMAppAttemptImpl implements
     public void transition(RMAppAttemptImpl appAttempt,
                             RMAppAttemptEvent event) {
       appAttempt.checkAttemptStoreError(event);
+      // TODO Today unmanaged AM client is waiting for app state to be Accepted to
+      // launch the AM. This is broken since we changed to start the attempt
+      // after the application is Accepted. We may need to introduce an attempt
+      // report that client can rely on to query the attempt state and choose to
+      // launch the unmanaged AM.
       super.transition(appAttempt, event);
     }    
   }
@@ -1346,6 +1376,20 @@ public class RMAppAttemptImpl implements
     }
   }
 
+  private static final class ContainerFinishedAtFailedTransition
+      extends BaseTransition {
+    @Override
+    public void
+        transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+      RMAppAttemptContainerFinishedEvent containerFinishedEvent =
+          (RMAppAttemptContainerFinishedEvent) event;
+      ContainerStatus containerStatus =
+          containerFinishedEvent.getContainerStatus();
+      // Normal container. Add it in completed containers list
+      appAttempt.justFinishedContainers.add(containerStatus);
+    }
+  }
+
   private static class ContainerFinishedFinalStateSavedTransition extends
       BaseTransition {
     @Override

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Fri Jan 10 15:05:18 2014
@@ -59,10 +59,10 @@ public class AppSchedulingInfo {
 
   final Set<Priority> priorities = new TreeSet<Priority>(
       new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
-  final Map<Priority, Map<String, ResourceRequest>> requests = 
+  final Map<Priority, Map<String, ResourceRequest>> requests =
     new HashMap<Priority, Map<String, ResourceRequest>>();
-  final Set<String> blacklist = new HashSet<String>();
-  
+  private Set<String> blacklist = new HashSet<String>();
+
   //private final ApplicationStore store;
   private final ActiveUsersManager activeUsersManager;
   
@@ -399,4 +399,15 @@ public class AppSchedulingInfo {
   public synchronized void setQueue(Queue queue) {
     this.queue = queue;
   }
+
+  public synchronized Set<String> getBlackList() {
+    return this.blacklist;
+  }
+
+  public synchronized void transferStateFromPreviousAppSchedulingInfo(
+      AppSchedulingInfo appInfo) {
+    //    this.priorities = appInfo.getPriorities();
+    //    this.requests = appInfo.getRequests();
+    this.blacklist = appInfo.getBlackList();
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Fri Jan 10 15:05:18 2014
@@ -26,6 +26,7 @@ public class SchedulerApplication {
 
   private final Queue queue;
   private final String user;
+  private SchedulerApplicationAttempt currentAttempt;
 
   public SchedulerApplication(Queue queue, String user) {
     this.queue = queue;
@@ -39,4 +40,12 @@ public class SchedulerApplication {
   public String getUser() {
     return user;
   }
+
+  public SchedulerApplicationAttempt getCurrentAppAttempt() {
+    return currentAttempt;
+  }
+
+  public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) {
+    this.currentAttempt = currentAttempt;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Fri Jan 10 15:05:18 2014
@@ -64,7 +64,7 @@ public abstract class SchedulerApplicati
 
   protected final AppSchedulingInfo appSchedulingInfo;
   
-  protected final Map<ContainerId, RMContainer> liveContainers =
+  protected Map<ContainerId, RMContainer> liveContainers =
       new HashMap<ContainerId, RMContainer>();
   protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
       new HashMap<Priority, Map<NodeId, RMContainer>>();
@@ -73,7 +73,7 @@ public abstract class SchedulerApplicati
   
   protected final Resource currentReservation = Resource.newInstance(0, 0);
   private Resource resourceLimit = Resource.newInstance(0, 0);
-  protected final Resource currentConsumption = Resource.newInstance(0, 0);
+  protected Resource currentConsumption = Resource.newInstance(0, 0);
 
   protected List<RMContainer> newlyAllocatedContainers = 
       new ArrayList<RMContainer>();
@@ -407,4 +407,29 @@ public abstract class SchedulerApplicati
         Resources.add(currentConsumption, currentReservation));
   }
 
+  public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() {
+    return this.liveContainers;
+  }
+
+  public synchronized Resource getResourceLimit() {
+    return this.resourceLimit;
+  }
+
+  public synchronized Map<Priority, Long> getLastScheduledContainer() {
+    return this.lastScheduledContainer;
+  }
+
+  public synchronized void transferStateFromPreviousAttempt(
+      SchedulerApplicationAttempt appAttempt) {
+    this.liveContainers = appAttempt.getLiveContainersMap();
+    // this.reReservations = appAttempt.reReservations;
+    this.currentConsumption = appAttempt.getCurrentConsumption();
+    this.resourceLimit = appAttempt.getResourceLimit();
+    // this.currentReservation = appAttempt.currentReservation;
+    // this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers;
+    // this.schedulingOpportunities = appAttempt.schedulingOpportunities;
+    this.lastScheduledContainer = appAttempt.getLastScheduledContainer();
+    this.appSchedulingInfo
+      .transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo);
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Fri Jan 10 15:05:18 2014
@@ -19,13 +19,13 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 
 /**
@@ -170,4 +171,13 @@ public interface YarnScheduler extends E
   @LimitedPrivate("yarn")
   @Stable
   public List<ApplicationAttemptId> getAppsInQueue(String queueName);
+
+  /**
+   * Get the container for the given containerId.
+   * @param containerId
+   * @return the container for the given containerId.
+   */
+  @LimitedPrivate("yarn")
+  @Unstable
+  public RMContainer getRMContainer(ContainerId containerId);
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Fri Jan 10 15:05:18 2014
@@ -63,14 +63,15 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -194,10 +195,6 @@ public class CapacityScheduler
   protected Map<ApplicationId, SchedulerApplication> applications =
       new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
 
-  @VisibleForTesting
-  protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts = 
-      new ConcurrentHashMap<ApplicationAttemptId, FiCaSchedulerApp>();
-
   private boolean initialized = false;
 
   private ResourceCalculator calculator;
@@ -464,21 +461,27 @@ public class CapacityScheduler
   }
 
   private synchronized void addApplicationAttempt(
-      ApplicationAttemptId applicationAttemptId) {
+      ApplicationAttemptId applicationAttemptId,
+      boolean transferStateFromPreviousAttempt) {
     SchedulerApplication application =
         applications.get(applicationAttemptId.getApplicationId());
     CSQueue queue = (CSQueue) application.getQueue();
 
-    FiCaSchedulerApp SchedulerApp =
+    FiCaSchedulerApp attempt =
         new FiCaSchedulerApp(applicationAttemptId, application.getUser(),
           queue, queue.getActiveUsersManager(), rmContext);
-    appAttempts.put(applicationAttemptId, SchedulerApp);
-    queue.submitApplicationAttempt(SchedulerApp, application.getUser());
+    if (transferStateFromPreviousAttempt) {
+      attempt.transferStateFromPreviousAttempt(application
+        .getCurrentAppAttempt());
+    }
+    application.setCurrentAppAttempt(attempt);
+
+    queue.submitApplicationAttempt(attempt, application.getUser());
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user " + application.getUser() + " in queue "
         + queue.getQueueName());
-    rmContext.getDispatcher().getEventHandler().handle(
-      new RMAppAttemptEvent(applicationAttemptId,
+    rmContext.getDispatcher().getEventHandler() .handle(
+        new RMAppAttemptEvent(applicationAttemptId,
           RMAppAttemptEventType.ATTEMPT_ADDED));
   }
 
@@ -486,7 +489,8 @@ public class CapacityScheduler
       RMAppState finalState) {
     SchedulerApplication application = applications.get(applicationId);
     if (application == null){
-      // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps.
+      // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps,
+      // ignore it.
       return;
     }
     CSQueue queue = (CSQueue) application.getQueue();
@@ -501,52 +505,56 @@ public class CapacityScheduler
 
   private synchronized void doneApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
-      RMAppAttemptState rmAppAttemptFinalState) {
+      RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
     LOG.info("Application Attempt " + applicationAttemptId + " is done." +
     		" finalState=" + rmAppAttemptFinalState);
     
-    FiCaSchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
+    SchedulerApplication application =
+        applications.get(applicationAttemptId.getApplicationId());
 
-    if (application == null) {
-      //      throw new IOException("Unknown application " + applicationId + 
-      //          " has completed!");
+    if (application == null || attempt == null) {
       LOG.info("Unknown application " + applicationAttemptId + " has completed!");
       return;
     }
-    
-    // Release all the running containers 
-    for (RMContainer rmContainer : application.getLiveContainers()) {
-      completedContainer(rmContainer, 
-          SchedulerUtils.createAbnormalContainerStatus(
-              rmContainer.getContainerId(), 
-              SchedulerUtils.COMPLETED_APPLICATION), 
-          RMContainerEventType.KILL);
+
+    // Release all the allocated, acquired, running containers
+    for (RMContainer rmContainer : attempt.getLiveContainers()) {
+      if (keepContainers
+          && rmContainer.getState().equals(RMContainerState.RUNNING)) {
+        // do not kill the running container in the case of work-preserving AM
+        // restart.
+        LOG.info("Skip killing " + rmContainer.getContainerId());
+        continue;
+      }
+      completedContainer(
+        rmContainer,
+        SchedulerUtils.createAbnormalContainerStatus(
+          rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
+        RMContainerEventType.KILL);
     }
-    
-     // Release all reserved containers
-    for (RMContainer rmContainer : application.getReservedContainers()) {
-      completedContainer(rmContainer, 
-          SchedulerUtils.createAbnormalContainerStatus(
-              rmContainer.getContainerId(), 
-              "Application Complete"), 
-          RMContainerEventType.KILL);
+
+    // Release all reserved containers
+    for (RMContainer rmContainer : attempt.getReservedContainers()) {
+      completedContainer(
+        rmContainer,
+        SchedulerUtils.createAbnormalContainerStatus(
+          rmContainer.getContainerId(), "Application Complete"),
+        RMContainerEventType.KILL);
     }
-    
+
     // Clean up pending requests, metrics etc.
-    application.stop(rmAppAttemptFinalState);
-    
+    attempt.stop(rmAppAttemptFinalState);
+
     // Inform the queue
-    String queueName = application.getQueue().getQueueName();
+    String queueName = attempt.getQueue().getQueueName();
     CSQueue queue = queues.get(queueName);
     if (!(queue instanceof LeafQueue)) {
       LOG.error("Cannot finish application " + "from non-leaf queue: "
           + queueName);
     } else {
-      queue.finishApplicationAttempt(application, queue.getQueueName());
+      queue.finishApplicationAttempt(attempt, queue.getQueueName());
     }
-    
-    // Remove from our data-structure
-    appAttempts.remove(applicationAttemptId);
   }
 
   private static final Allocation EMPTY_ALLOCATION = 
@@ -558,7 +566,7 @@ public class CapacityScheduler
       List<ResourceRequest> ask, List<ContainerId> release, 
       List<String> blacklistAdditions, List<String> blacklistRemovals) {
 
-    FiCaSchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
     if (application == null) {
       LOG.info("Calling allocate on removed " +
           "or non existant application " + applicationAttemptId);
@@ -700,8 +708,8 @@ public class CapacityScheduler
 
     RMContainer reservedContainer = node.getReservedContainer();
     if (reservedContainer != null) {
-      FiCaSchedulerApp reservedApplication = 
-          getApplication(reservedContainer.getApplicationAttemptId());
+      FiCaSchedulerApp reservedApplication =
+          getCurrentAttemptForContainer(reservedContainer.getContainerId());
       
       // Try to fulfill the reservation
       LOG.info("Trying to fulfill reservation for application " + 
@@ -738,12 +746,11 @@ public class CapacityScheduler
 
   private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
     // Get the application for the finished container
-    ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
-    FiCaSchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
     if (application == null) {
-      LOG.info("Unknown application: " + applicationAttemptId + 
-          " launched container " + containerId +
-          " on node: " + node);
+      LOG.info("Unknown application "
+          + containerId.getApplicationAttemptId().getApplicationId()
+          + " launched container " + containerId + " on node: " + node);
       this.rmContext.getDispatcher().getEventHandler()
         .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
       return;
@@ -791,7 +798,8 @@ public class CapacityScheduler
     {
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
-      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -799,7 +807,8 @@ public class CapacityScheduler
       AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
           (AppAttemptRemovedSchedulerEvent) event;
       doneApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
-        appAttemptRemovedEvent.getFinalAttemptState());
+        appAttemptRemovedEvent.getFinalAttemptState(),
+        appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
     }
     break;
     case CONTAINER_EXPIRED:
@@ -874,13 +883,13 @@ public class CapacityScheduler
     Container container = rmContainer.getContainer();
     
     // Get the application for the finished container
-    ApplicationAttemptId applicationAttemptId =
-      container.getId().getApplicationAttemptId();
-    FiCaSchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application =
+        getCurrentAttemptForContainer(container.getId());
+    ApplicationId appId =
+        container.getId().getApplicationAttemptId().getApplicationId();
     if (application == null) {
-      LOG.info("Container " + container + " of" +
-      		" unknown application " + applicationAttemptId + 
-          " completed with event " + event);
+      LOG.info("Container " + container + " of" + " unknown application "
+          + appId + " completed with event " + event);
       return;
     }
     
@@ -892,28 +901,33 @@ public class CapacityScheduler
     queue.completedContainer(clusterResource, application, node, 
         rmContainer, containerStatus, event, null);
 
-    LOG.info("Application " + applicationAttemptId + 
-        " released container " + container.getId() +
-        " on node: " + node + 
-        " with event: " + event);
+    LOG.info("Application attempt " + application.getApplicationAttemptId()
+        + " released container " + container.getId() + " on node: " + node
+        + " with event: " + event);
   }
 
   @Lock(Lock.NoLock.class)
-  FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
-    return appAttempts.get(applicationAttemptId);
+  FiCaSchedulerApp getApplicationAttempt(
+      ApplicationAttemptId applicationAttemptId) {
+    SchedulerApplication app =
+        applications.get(applicationAttemptId.getApplicationId());
+    if (app != null) {
+      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
+    }
+    return null;
   }
 
   @Override
   public SchedulerAppReport getSchedulerAppInfo(
       ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplication(applicationAttemptId);
+    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
     return app == null ? null : new SchedulerAppReport(app);
   }
   
   @Override
   public ApplicationResourceUsageReport getAppResourceUsageReport(
       ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplication(applicationAttemptId);
+    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
     return app == null ? null : app.getResourceUsageReport();
   }
   
@@ -922,10 +936,22 @@ public class CapacityScheduler
     return nodes.get(nodeId);
   }
 
-  private RMContainer getRMContainer(ContainerId containerId) {
-    FiCaSchedulerApp application = 
-        getApplication(containerId.getApplicationAttemptId());
-    return (application == null) ? null : application.getRMContainer(containerId);
+  @Override
+  public RMContainer getRMContainer(ContainerId containerId) {
+    FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
+    return (attempt == null) ? null : attempt.getRMContainer(containerId);
+  }
+
+  @VisibleForTesting
+  public FiCaSchedulerApp getCurrentAttemptForContainer(
+      ContainerId containerId) {
+    SchedulerApplication app =
+        applications.get(containerId.getApplicationAttemptId()
+          .getApplicationId());
+    if (app != null) {
+      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
+    }
+    return null;
   }
 
   @Override
@@ -958,7 +984,7 @@ public class CapacityScheduler
       LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() +
           " container: " + cont.toString());
     }
-    FiCaSchedulerApp app = appAttempts.get(aid);
+    FiCaSchedulerApp app = getApplicationAttempt(aid);
     if (app != null) {
       app.addPreemptContainer(cont.getContainerId());
     }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Fri Jan 10 15:05:18 2014
@@ -219,7 +219,8 @@ public class FiCaSchedulerNode extends S
             " on node " + this.reservedContainer.getReservedNode());
       }
       
-      // Cannot reserve more than one application on a given node!
+      // Cannot reserve more than one application attempt on a given node!
+      // Reservation is still against attempt.
       if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
           reservedContainer.getContainer().getId().getApplicationAttemptId())) {
         throw new IllegalStateException("Trying to reserve" +

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Fri Jan 10 15:05:18 2014
@@ -23,14 +23,21 @@ import org.apache.hadoop.yarn.api.record
 public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
 
   private final ApplicationAttemptId applicationAttemptId;
+  private final boolean transferStateFromPreviousAttempt;
 
   public AppAttemptAddedSchedulerEvent(
-      ApplicationAttemptId applicationAttemptId) {
+      ApplicationAttemptId applicationAttemptId,
+      boolean transferStateFromPreviousAttempt) {
     super(SchedulerEventType.APP_ATTEMPT_ADDED);
     this.applicationAttemptId = applicationAttemptId;
+    this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
   }
 
   public ApplicationAttemptId getApplicationAttemptId() {
     return applicationAttemptId;
   }
+
+  public boolean getTransferStateFromPreviousAttempt() {
+    return transferStateFromPreviousAttempt;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java Fri Jan 10 15:05:18 2014
@@ -25,13 +25,15 @@ public class AppAttemptRemovedSchedulerE
 
   private final ApplicationAttemptId applicationAttemptId;
   private final RMAppAttemptState finalAttemptState;
+  private final boolean keepContainersAcrossAppAttempts;
 
   public AppAttemptRemovedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
-      RMAppAttemptState finalAttemptState) {
+      RMAppAttemptState finalAttemptState, boolean keepContainers) {
     super(SchedulerEventType.APP_ATTEMPT_REMOVED);
     this.applicationAttemptId = applicationAttemptId;
     this.finalAttemptState = finalAttemptState;
+    this.keepContainersAcrossAppAttempts = keepContainers;
   }
 
   public ApplicationAttemptId getApplicationAttemptID() {
@@ -41,4 +43,8 @@ public class AppAttemptRemovedSchedulerE
   public RMAppAttemptState getFinalAttemptState() {
     return this.finalAttemptState;
   }
+
+  public boolean getKeepContainersAcrossAppAttempts() {
+    return this.keepContainersAcrossAppAttempts;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Jan 10 15:05:18 2014
@@ -162,12 +162,6 @@ public class FairScheduler implements Re
   protected Map<ApplicationId, SchedulerApplication> applications =
       new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
 
-  // This stores per-application-attempt scheduling information, indexed by
-  // attempt ID's for fast lookup.
-  @VisibleForTesting
-  protected Map<ApplicationAttemptId, FSSchedulerApp> appAttempts = 
-      new ConcurrentHashMap<ApplicationAttemptId, FSSchedulerApp>();
-
   // Nodes in the cluster, indexed by NodeId
   private Map<NodeId, FSSchedulerNode> nodes = 
       new ConcurrentHashMap<NodeId, FSSchedulerNode>();
@@ -262,10 +256,21 @@ public class FairScheduler implements Re
     return queueMgr;
   }
 
-  private RMContainer getRMContainer(ContainerId containerId) {
-    FSSchedulerApp application = 
-        appAttempts.get(containerId.getApplicationAttemptId());
-    return (application == null) ? null : application.getRMContainer(containerId);
+  @Override
+  public RMContainer getRMContainer(ContainerId containerId) {
+    FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
+    return (attempt == null) ? null : attempt.getRMContainer(containerId);
+  }
+
+  private FSSchedulerApp getCurrentAttemptForContainer(
+      ContainerId containerId) {
+    SchedulerApplication app =
+        applications.get(containerId.getApplicationAttemptId()
+          .getApplicationId());
+    if (app != null) {
+      return (FSSchedulerApp) app.getCurrentAppAttempt();
+    }
+    return null;
   }
 
   /**
@@ -640,7 +645,8 @@ public class FairScheduler implements Re
     applications.put(applicationId, application);
 
     LOG.info("Accepted application " + applicationId + " from user: " + user
-        + ", in queue: " + queueName);
+        + ", in queue: " + queueName + ", currently num of applications: "
+        + applications.size());
     rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
   }
@@ -649,31 +655,35 @@ public class FairScheduler implements Re
    * Add a new application attempt to the scheduler.
    */
   protected synchronized void addApplicationAttempt(
-      ApplicationAttemptId applicationAttemptId) {
+      ApplicationAttemptId applicationAttemptId,
+      boolean transferStateFromPreviousAttempt) {
     SchedulerApplication application =
         applications.get(applicationAttemptId.getApplicationId());
     String user = application.getUser();
     FSLeafQueue queue = (FSLeafQueue) application.getQueue();
 
-    FSSchedulerApp schedulerApp =
+    FSSchedulerApp attempt =
         new FSSchedulerApp(applicationAttemptId, user,
             queue, new ActiveUsersManager(getRootQueueMetrics()),
             rmContext);
+    if (transferStateFromPreviousAttempt) {
+      attempt.transferStateFromPreviousAttempt(application
+        .getCurrentAppAttempt());
+    }
+    application.setCurrentAppAttempt(attempt);
 
     boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
-    queue.addApp(schedulerApp, runnable);
+    queue.addApp(attempt, runnable);
     if (runnable) {
-      maxRunningEnforcer.trackRunnableApp(schedulerApp);
+      maxRunningEnforcer.trackRunnableApp(attempt);
     } else {
-      maxRunningEnforcer.trackNonRunnableApp(schedulerApp);
+      maxRunningEnforcer.trackNonRunnableApp(attempt);
     }
     
     queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
-    appAttempts.put(applicationAttemptId, schedulerApp);
 
     LOG.info("Added Application Attempt " + applicationAttemptId
-        + " to scheduler from user: " + user + ", currently active: "
-        + appAttempts.size());
+        + " to scheduler from user: " + user);
     rmContext.getDispatcher().getEventHandler().handle(
         new RMAppAttemptEvent(applicationAttemptId,
             RMAppAttemptEventType.ATTEMPT_ADDED));
@@ -709,19 +719,27 @@ public class FairScheduler implements Re
 
   private synchronized void removeApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
-      RMAppAttemptState rmAppAttemptFinalState) {
+      RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
     LOG.info("Application " + applicationAttemptId + " is done." +
         " finalState=" + rmAppAttemptFinalState);
+    SchedulerApplication application =
+        applications.get(applicationAttemptId.getApplicationId());
+    FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
 
-    FSSchedulerApp application = appAttempts.get(applicationAttemptId);
-
-    if (application == null) {
+    if (attempt == null || application == null) {
       LOG.info("Unknown application " + applicationAttemptId + " has completed!");
       return;
     }
 
     // Release all the running containers
-    for (RMContainer rmContainer : application.getLiveContainers()) {
+    for (RMContainer rmContainer : attempt.getLiveContainers()) {
+      if (keepContainers
+          && rmContainer.getState().equals(RMContainerState.RUNNING)) {
+        // do not kill the running container in the case of work-preserving AM
+        // restart.
+        LOG.info("Skip killing " + rmContainer.getContainerId());
+        continue;
+      }
       completedContainer(rmContainer,
           SchedulerUtils.createAbnormalContainerStatus(
               rmContainer.getContainerId(),
@@ -730,30 +748,26 @@ public class FairScheduler implements Re
     }
 
     // Release all reserved containers
-    for (RMContainer rmContainer : application.getReservedContainers()) {
+    for (RMContainer rmContainer : attempt.getReservedContainers()) {
       completedContainer(rmContainer,
           SchedulerUtils.createAbnormalContainerStatus(
               rmContainer.getContainerId(),
               "Application Complete"),
-          RMContainerEventType.KILL);
+              RMContainerEventType.KILL);
     }
-
     // Clean up pending requests, metrics etc.
-    application.stop(rmAppAttemptFinalState);
+    attempt.stop(rmAppAttemptFinalState);
 
     // Inform the queue
-    FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
+    FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue()
         .getQueueName(), false);
-    boolean wasRunnable = queue.removeApp(application);
+    boolean wasRunnable = queue.removeApp(attempt);
 
     if (wasRunnable) {
-      maxRunningEnforcer.updateRunnabilityOnAppRemoval(application);
+      maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
     } else {
-      maxRunningEnforcer.untrackNonRunnableApp(application);
+      maxRunningEnforcer.untrackNonRunnableApp(attempt);
     }
-    
-    // Remove from our data-structure
-    appAttempts.remove(applicationAttemptId);
   }
 
   /**
@@ -769,11 +783,13 @@ public class FairScheduler implements Re
     Container container = rmContainer.getContainer();
 
     // Get the application for the finished container
-    ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
-    FSSchedulerApp application = appAttempts.get(applicationAttemptId);
+    FSSchedulerApp application =
+        getCurrentAttemptForContainer(container.getId());
+    ApplicationId appId =
+        container.getId().getApplicationAttemptId().getApplicationId();
     if (application == null) {
       LOG.info("Container " + container + " of" +
-          " unknown application " + applicationAttemptId +
+          " unknown application attempt " + appId +
           " completed with event " + event);
       return;
     }
@@ -790,10 +806,9 @@ public class FairScheduler implements Re
       updateRootQueueMetrics();
     }
 
-    LOG.info("Application " + applicationAttemptId +
-        " released container " + container.getId() +
-        " on node: " + node +
-        " with event: " + event);
+    LOG.info("Application attempt " + application.getApplicationAttemptId()
+        + " released container " + container.getId() + " on node: " + node
+        + " with event: " + event);
   }
 
   private synchronized void addNode(RMNode node) {
@@ -844,7 +859,7 @@ public class FairScheduler implements Re
       List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
 
     // Make sure this application exists
-    FSSchedulerApp application = appAttempts.get(appAttemptId);
+    FSSchedulerApp application = getSchedulerApp(appAttemptId);
     if (application == null) {
       LOG.info("Calling allocate on removed " +
           "or non existant application " + appAttemptId);
@@ -914,12 +929,11 @@ public class FairScheduler implements Re
    */
   private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
     // Get the application for the finished container
-    ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
-    FSSchedulerApp application = appAttempts.get(applicationAttemptId);
+    FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
     if (application == null) {
-      LOG.info("Unknown application: " + applicationAttemptId +
-          " launched container " + containerId +
-          " on node: " + node);
+      LOG.info("Unknown application "
+          + containerId.getApplicationAttemptId().getApplicationId()
+          + " launched container " + containerId + " on node: " + node);
       return;
     }
 
@@ -1058,28 +1072,34 @@ public class FairScheduler implements Re
   }
   
   public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
-    return appAttempts.get(appAttemptId);
+    SchedulerApplication app =
+        applications.get(appAttemptId.getApplicationId());
+    if (app != null) {
+      return (FSSchedulerApp) app.getCurrentAppAttempt();
+    }
+    return null;
   }
   
   @Override
   public SchedulerAppReport getSchedulerAppInfo(
       ApplicationAttemptId appAttemptId) {
-    if (!appAttempts.containsKey(appAttemptId)) {
+    FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
+    if (attempt == null) {
       LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
       return null;
     }
-    return new SchedulerAppReport(appAttempts.get(appAttemptId));
+    return new SchedulerAppReport(attempt);
   }
   
   @Override
   public ApplicationResourceUsageReport getAppResourceUsageReport(
       ApplicationAttemptId appAttemptId) {
-    FSSchedulerApp app = appAttempts.get(appAttemptId);
-    if (app == null) {
+    FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
+    if (attempt == null) {
       LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
       return null;
     }
-    return app.getResourceUsageReport();
+    return attempt.getResourceUsageReport();
   }
   
   /**
@@ -1145,7 +1165,8 @@ public class FairScheduler implements Re
       }
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
-      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
       break;
     case APP_ATTEMPT_REMOVED:
       if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
@@ -1153,8 +1174,10 @@ public class FairScheduler implements Re
       }
       AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
           (AppAttemptRemovedSchedulerEvent) event;
-      removeApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
-        appAttemptRemovedEvent.getFinalAttemptState());
+      removeApplicationAttempt(
+          appAttemptRemovedEvent.getApplicationAttemptID(),
+          appAttemptRemovedEvent.getFinalAttemptState(),
+          appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
       break;
     case CONTAINER_EXPIRED:
       if (!(event instanceof ContainerExpiredSchedulerEvent)) {



Mime
View raw message