hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject svn commit: r1609878 [3/7] - in /hadoop/common/branches/YARN-1051/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ex...
Date Sat, 12 Jul 2014 02:25:04 GMT
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Sat Jul 12 02:24:40 2014
@@ -82,6 +82,7 @@ public class RMContextImpl implements RM
   private ApplicationMasterService applicationMasterService;
   private RMApplicationHistoryWriter rmApplicationHistoryWriter;
   private ConfigurationProvider configurationProvider;
+  private int epoch;
 
   /**
    * Default constructor. To be used in conjunction with setter methods for
@@ -359,4 +360,13 @@ public class RMContextImpl implements RM
       ConfigurationProvider configurationProvider) {
     this.configurationProvider = configurationProvider;
   }
+
+  @Override
+  public int getEpoch() {
+    return this.epoch;
+  }
+
+ void setEpoch(int epoch) {
+    this.epoch = epoch;
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Sat Jul 12 02:24:40 2014
@@ -482,6 +482,9 @@ public class ResourceManager extends Com
       if(recoveryEnabled) {
         try {
           rmStore.checkVersion();
+          if (rmContext.isWorkPreservingRecoveryEnabled()) {
+            rmContext.setEpoch(rmStore.getAndIncrementEpoch());
+          }
           RMState state = rmStore.loadState();
           recover(state);
         } catch (Exception e) {
@@ -1032,12 +1035,17 @@ public class ResourceManager extends Com
     StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
     try {
       Configuration conf = new YarnConfiguration();
-      ResourceManager resourceManager = new ResourceManager();
-      ShutdownHookManager.get().addShutdownHook(
-        new CompositeServiceShutdownHook(resourceManager),
-        SHUTDOWN_HOOK_PRIORITY);
-      resourceManager.init(conf);
-      resourceManager.start();
+      // If -format, then delete RMStateStore; else startup normally
+      if (argv.length == 1 && argv[0].equals("-format")) {
+        deleteRMStateStore(conf);
+      } else {
+        ResourceManager resourceManager = new ResourceManager();
+        ShutdownHookManager.get().addShutdownHook(
+          new CompositeServiceShutdownHook(resourceManager),
+          SHUTDOWN_HOOK_PRIORITY);
+        resourceManager.init(conf);
+        resourceManager.start();
+      }
     } catch (Throwable t) {
       LOG.fatal("Error starting ResourceManager", t);
       System.exit(-1);
@@ -1074,4 +1082,23 @@ public class ResourceManager extends Com
     return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
       YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
   }
+
+  /**
+   * Deletes the RMStateStore
+   *
+   * @param conf
+   * @throws Exception
+   */
+  private static void deleteRMStateStore(Configuration conf) throws Exception {
+    RMStateStore rmStore = RMStateStoreFactory.getStore(conf);
+    rmStore.init(conf);
+    rmStore.start();
+    try {
+      LOG.info("Deleting ResourceManager state store...");
+      rmStore.deleteStore();
+      LOG.info("State store deleted");
+    } finally {
+      rmStore.stop();
+    }
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java Sat Jul 12 02:24:40 2014
@@ -269,7 +269,7 @@ public class RMApplicationHistoryWriter 
         new WritingContainerStartEvent(container.getContainerId(),
           ContainerStartData.newInstance(container.getContainerId(),
             container.getAllocatedResource(), container.getAllocatedNode(),
-            container.getAllocatedPriority(), container.getStartTime())));
+            container.getAllocatedPriority(), container.getCreationTime())));
     }
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java Sat Jul 12 02:24:40 2014
@@ -111,7 +111,7 @@ public class ProportionalCapacityPreempt
   public static final String NATURAL_TERMINATION_FACTOR =
       "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
 
-  //the dispatcher to send preempt and kill events
+  // the dispatcher to send preempt and kill events
   public EventHandler<ContainerPreemptEvent> dispatcher;
 
   private final Clock clock;
@@ -437,8 +437,9 @@ public class ProportionalCapacityPreempt
   private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
       List<TempQueue> queues, Resource clusterResource) {
 
-    Map<ApplicationAttemptId,Set<RMContainer>> list =
+    Map<ApplicationAttemptId,Set<RMContainer>> preemptMap =
         new HashMap<ApplicationAttemptId,Set<RMContainer>>();
+    List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
 
     for (TempQueue qT : queues) {
       // we act only if we are violating balance by more than
@@ -449,26 +450,83 @@ public class ProportionalCapacityPreempt
         // accounts for natural termination of containers
         Resource resToObtain =
           Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
+        Resource skippedAMSize = Resource.newInstance(0, 0);
 
         // lock the leafqueue while we scan applications and unreserve
-        synchronized(qT.leafQueue) {
-          NavigableSet<FiCaSchedulerApp> ns =
-            (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
+        synchronized (qT.leafQueue) {
+          NavigableSet<FiCaSchedulerApp> ns = 
+              (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
           Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
           qT.actuallyPreempted = Resources.clone(resToObtain);
           while (desc.hasNext()) {
             FiCaSchedulerApp fc = desc.next();
-            if (Resources.lessThanOrEqual(rc, clusterResource,
-                resToObtain, Resources.none())) {
+            if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
+                Resources.none())) {
               break;
             }
-            list.put(fc.getApplicationAttemptId(),
-            preemptFrom(fc, clusterResource, resToObtain));
+            preemptMap.put(
+                fc.getApplicationAttemptId(),
+                preemptFrom(fc, clusterResource, resToObtain,
+                    skippedAMContainerlist, skippedAMSize));
           }
+          Resource maxAMCapacityForThisQueue = Resources.multiply(
+              Resources.multiply(clusterResource,
+                  qT.leafQueue.getAbsoluteCapacity()),
+              qT.leafQueue.getMaxAMResourcePerQueuePercent());
+
+          // Can try preempting AMContainers (still saving atmost
+          // maxAMCapacityForThisQueue AMResource's) if more resources are
+          // required to be preempted from this Queue.
+          preemptAMContainers(clusterResource, preemptMap,
+              skippedAMContainerlist, resToObtain, skippedAMSize,
+              maxAMCapacityForThisQueue);
         }
       }
     }
-    return list;
+    return preemptMap;
+  }
+
+  /**
+   * As more resources are needed for preemption, saved AMContainers has to be
+   * rescanned. Such AMContainers can be preempted based on resToObtain, but 
+   * maxAMCapacityForThisQueue resources will be still retained.
+   *  
+   * @param clusterResource
+   * @param preemptMap
+   * @param skippedAMContainerlist
+   * @param resToObtain
+   * @param skippedAMSize
+   * @param maxAMCapacityForThisQueue
+   */
+  private void preemptAMContainers(Resource clusterResource,
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      List<RMContainer> skippedAMContainerlist, Resource resToObtain,
+      Resource skippedAMSize, Resource maxAMCapacityForThisQueue) {
+    for (RMContainer c : skippedAMContainerlist) {
+      // Got required amount of resources for preemption, can stop now
+      if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
+          Resources.none())) {
+        break;
+      }
+      // Once skippedAMSize reaches down to maxAMCapacityForThisQueue,
+      // container selection iteration for preemption will be stopped. 
+      if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize,
+          maxAMCapacityForThisQueue)) {
+        break;
+      }
+      Set<RMContainer> contToPrempt = preemptMap.get(c
+          .getApplicationAttemptId());
+      if (null == contToPrempt) {
+        contToPrempt = new HashSet<RMContainer>();
+        preemptMap.put(c.getApplicationAttemptId(), contToPrempt);
+      }
+      contToPrempt.add(c);
+      
+      Resources.subtractFrom(resToObtain, c.getContainer().getResource());
+      Resources.subtractFrom(skippedAMSize, c.getContainer()
+          .getResource());
+    }
+    skippedAMContainerlist.clear();
   }
 
   /**
@@ -480,8 +538,9 @@ public class ProportionalCapacityPreempt
    * @param rsrcPreempt
    * @return
    */
-  private Set<RMContainer> preemptFrom(
-      FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) {
+  private Set<RMContainer> preemptFrom(FiCaSchedulerApp app,
+      Resource clusterResource, Resource rsrcPreempt,
+      List<RMContainer> skippedAMContainerlist, Resource skippedAMSize) {
     Set<RMContainer> ret = new HashSet<RMContainer>();
     ApplicationAttemptId appId = app.getApplicationAttemptId();
 
@@ -513,6 +572,12 @@ public class ProportionalCapacityPreempt
             rsrcPreempt, Resources.none())) {
         return ret;
       }
+      // Skip AM Container from preemption for now.
+      if (c.isAMContainer()) {
+        skippedAMContainerlist.add(c);
+        Resources.addTo(skippedAMSize, c.getContainer().getResource());
+        continue;
+      }
       ret.add(c);
       Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
     }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Sat Jul 12 02:24:40 2014
@@ -43,15 +43,19 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
@@ -71,7 +75,7 @@ public class FileSystemRMStateStore exte
 
   protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
   protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
-    .newInstance(1, 0);
+    .newInstance(1, 1);
 
   protected FileSystem fs;
 
@@ -145,7 +149,30 @@ public class FileSystemRMStateStore exte
       writeFile(versionNodePath, data);
     }
   }
-
+  
+  @Override
+  public synchronized int getAndIncrementEpoch() throws Exception {
+    Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
+    int currentEpoch = 0;
+    if (fs.exists(epochNodePath)) {
+      // load current epoch
+      FileStatus status = fs.getFileStatus(epochNodePath);
+      byte[] data = readFile(epochNodePath, status.getLen());
+      Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
+      currentEpoch = epoch.getEpoch();
+      // increment epoch and store it
+      byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+          .toByteArray();
+      updateFile(epochNodePath, storeData);
+    } else {
+      // initialize epoch file with 1 for the next time.
+      byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+          .toByteArray();
+      writeFile(epochNodePath, storeData);
+    }
+    return currentEpoch;
+  }
+  
   @Override
   public synchronized RMState loadState() throws Exception {
     RMState rmState = new RMState();
@@ -216,7 +243,8 @@ public class FileSystemRMStateStore exte
                   attemptStateData.getState(),
                   attemptStateData.getFinalTrackingUrl(),
                   attemptStateData.getDiagnostics(),
-                  attemptStateData.getFinalApplicationStatus());
+                  attemptStateData.getFinalApplicationStatus(),
+                  attemptStateData.getAMContainerExitStatus());
 
             // assert child node name is same as application attempt id
             assert attemptId.equals(attemptState.getAttemptId());
@@ -487,6 +515,13 @@ public class FileSystemRMStateStore exte
     deleteFile(nodeCreatePath);
   }
 
+  @Override
+  public synchronized void deleteStore() throws IOException {
+    if (fs.exists(rootDirPath)) {
+      fs.delete(rootDirPath, true);
+    }
+  }
+
   private Path getAppDir(Path root, String appId) {
     return getNodePath(root, appId);
   }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Sat Jul 12 02:24:40 2014
@@ -43,6 +43,8 @@ import com.google.common.annotations.Vis
 public class MemoryRMStateStore extends RMStateStore {
   
   RMState state = new RMState();
+  private int epoch = 0;
+  
   @VisibleForTesting
   public RMState getState() {
     return state;
@@ -53,6 +55,13 @@ public class MemoryRMStateStore extends 
   }
 
   @Override
+  public synchronized int getAndIncrementEpoch() throws Exception {
+    int currentEpoch = epoch;
+    epoch = epoch + 1;
+    return currentEpoch;
+  }
+
+  @Override
   public synchronized RMState loadState() throws Exception {
     // return a copy of the state to allow for modification of the real state
     RMState returnState = new RMState();
@@ -152,7 +161,8 @@ public class MemoryRMStateStore extends 
           attemptStateData.getStartTime(), attemptStateData.getState(),
           attemptStateData.getFinalTrackingUrl(),
           attemptStateData.getDiagnostics(),
-          attemptStateData.getFinalApplicationStatus());
+          attemptStateData.getFinalApplicationStatus(),
+          attemptStateData.getAMContainerExitStatus());
 
     ApplicationState appState =
         state.getApplicationState().get(
@@ -257,4 +267,8 @@ public class MemoryRMStateStore extends 
     return null;
   }
 
+  @Override
+  public void deleteStore() throws Exception {
+  }
+
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Sat Jul 12 02:24:40 2014
@@ -48,6 +48,11 @@ public class NullRMStateStore extends RM
   }
 
   @Override
+  public synchronized int getAndIncrementEpoch() throws Exception {
+    return 0;
+  }
+
+  @Override
   public RMState loadState() throws Exception {
     throw new UnsupportedOperationException("Cannot load state from null store");
   }
@@ -133,4 +138,9 @@ public class NullRMStateStore extends RM
     return null;
   }
 
+  @Override
+  public void deleteStore() throws Exception {
+    // Do nothing
+  }
+
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Sat Jul 12 02:24:40 2014
@@ -39,11 +39,13 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
@@ -84,6 +86,7 @@ public abstract class RMStateStore exten
   protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
       "RMDTSequenceNumber_";
   protected static final String VERSION_NODE = "RMVersionNode";
+  protected static final String EPOCH_NODE = "EpochNode";
 
   public static final Log LOG = LogFactory.getLog(RMStateStore.class);
 
@@ -258,19 +261,21 @@ public abstract class RMStateStore exten
     RMAppAttemptState state;
     String finalTrackingUrl = "N/A";
     String diagnostics;
+    int exitStatus = ContainerExitStatus.INVALID;
     FinalApplicationStatus amUnregisteredFinalStatus;
 
     public ApplicationAttemptState(ApplicationAttemptId attemptId,
         Container masterContainer, Credentials appAttemptCredentials,
         long startTime) {
       this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
-        null, "", null);
+        null, "", null, ContainerExitStatus.INVALID);
     }
 
     public ApplicationAttemptState(ApplicationAttemptId attemptId,
         Container masterContainer, Credentials appAttemptCredentials,
         long startTime, RMAppAttemptState state, String finalTrackingUrl,
-        String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus) {
+        String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
+        int exitStatus) {
       this.attemptId = attemptId;
       this.masterContainer = masterContainer;
       this.appAttemptCredentials = appAttemptCredentials;
@@ -279,6 +284,7 @@ public abstract class RMStateStore exten
       this.finalTrackingUrl = finalTrackingUrl;
       this.diagnostics = diagnostics == null ? "" : diagnostics;
       this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
+      this.exitStatus = exitStatus;
     }
 
     public Container getMasterContainer() {
@@ -305,6 +311,9 @@ public abstract class RMStateStore exten
     public FinalApplicationStatus getFinalApplicationStatus() {
       return amUnregisteredFinalStatus;
     }
+    public int getAMContainerExitStatus(){
+      return this.exitStatus;
+    }
   }
   
   /**
@@ -513,6 +522,12 @@ public abstract class RMStateStore exten
    */
   protected abstract RMStateVersion getCurrentVersion();
 
+
+  /**
+   * Get the current epoch of RM and increment the value.
+   */
+  public abstract int getAndIncrementEpoch() throws Exception;
+  
   /**
    * Blocking API
    * The derived class must recover state from the store and return a new 
@@ -830,4 +845,10 @@ public abstract class RMStateStore exten
       handleStoreEvent(event);
     }
   }
+
+  /**
+   * Derived classes must implement this method to delete the state store
+   * @throws Exception
+   */
+  public abstract void deleteStore() throws Exception;
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Sat Jul 12 02:24:40 2014
@@ -44,16 +44,21 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.zookeeper.CreateMode;
@@ -81,7 +86,7 @@ public class ZKRMStateStore extends RMSt
 
   protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
   protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
-      .newInstance(1, 0);
+      .newInstance(1, 1);
   private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
       "RMDelegationTokensRoot";
   private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
@@ -102,6 +107,7 @@ public class ZKRMStateStore extends RMSt
    *
    * ROOT_DIR_PATH
    * |--- VERSION_INFO
+   * |--- EPOCH_NODE
    * |--- RM_ZK_FENCING_LOCK
    * |--- RM_APP_ROOT
    * |     |----- (#ApplicationId1)
@@ -273,20 +279,21 @@ public class ZKRMStateStore extends RMSt
 
   private void createRootDir(final String rootPath) throws Exception {
     // For root dirs, we shouldn't use the doMulti helper methods
-    try {
-      new ZKAction<String>() {
-        @Override
-        public String run() throws KeeperException, InterruptedException {
+    new ZKAction<String>() {
+      @Override
+      public String run() throws KeeperException, InterruptedException {
+        try {
           return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT);
+        } catch (KeeperException ke) {
+          if (ke.code() == Code.NODEEXISTS) {
+            LOG.debug(rootPath + "znode already exists!");
+            return null;
+          } else {
+            throw ke;
+          }
         }
-      }.runWithRetries();
-    } catch (KeeperException ke) {
-      if (ke.code() == Code.NODEEXISTS) {
-        LOG.debug(rootPath + "znode already exists!");
-      } else {
-        throw ke;
       }
-    }
+    }.runWithRetries();
   }
 
   private void logRootNodeAcls(String prefix) throws Exception {
@@ -392,6 +399,28 @@ public class ZKRMStateStore extends RMSt
   }
 
   @Override
+  public synchronized int getAndIncrementEpoch() throws Exception {
+    String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
+    int currentEpoch = 0;
+    if (existsWithRetries(epochNodePath, true) != null) {
+      // load current epoch
+      byte[] data = getDataWithRetries(epochNodePath, true);
+      Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
+      currentEpoch = epoch.getEpoch();
+      // increment epoch and store it
+      byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+          .toByteArray();
+      setDataWithRetries(epochNodePath, storeData, -1);
+    } else {
+      // initialize epoch node with 1 for the next time.
+      byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+          .toByteArray();
+      createWithRetries(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT);
+    }
+    return currentEpoch;
+  }
+
+  @Override
   public synchronized RMState loadState() throws Exception {
     RMState rmState = new RMState();
     // recover DelegationTokenSecretManager
@@ -538,12 +567,12 @@ public class ZKRMStateStore extends RMSt
 
         ApplicationAttemptState attemptState =
             new ApplicationAttemptState(attemptId,
-                attemptStateData.getMasterContainer(), credentials,
-                attemptStateData.getStartTime(),
-                attemptStateData.getState(),
-                attemptStateData.getFinalTrackingUrl(),
-                attemptStateData.getDiagnostics(),
-                attemptStateData.getFinalApplicationStatus());
+              attemptStateData.getMasterContainer(), credentials,
+              attemptStateData.getStartTime(), attemptStateData.getState(),
+              attemptStateData.getFinalTrackingUrl(),
+              attemptStateData.getDiagnostics(),
+              attemptStateData.getFinalApplicationStatus(),
+              attemptStateData.getAMContainerExitStatus());
 
         appState.attempts.put(attemptState.getAttemptId(), attemptState);
       }
@@ -776,6 +805,13 @@ public class ZKRMStateStore extends RMSt
     }
   }
 
+  @Override
+  public synchronized void deleteStore() throws Exception {
+    if (existsWithRetries(zkRootNodePath, true) != null) {
+      deleteWithRetries(zkRootNodePath, true);
+    }
+  }
+
   // ZK related code
   /**
    * Watcher implementation which forward events to the ZKRMStateStore This
@@ -930,6 +966,29 @@ public class ZKRMStateStore extends RMSt
     }.runWithRetries();
   }
 
+  private void deleteWithRetries(
+      final String path, final boolean watch) throws Exception {
+    new ZKAction<Void>() {
+      @Override
+      Void run() throws KeeperException, InterruptedException {
+        recursiveDeleteWithRetriesHelper(path, watch);
+        return null;
+      }
+    }.runWithRetries();
+  }
+
+  /**
+   * Helper method that deletes znodes recursively
+   */
+  private void recursiveDeleteWithRetriesHelper(String path, boolean watch)
+          throws KeeperException, InterruptedException {
+    List<String> children = zkClient.getChildren(path, watch);
+    for (String child : children) {
+      recursiveDeleteWithRetriesHelper(path + "/" + child, false);
+    }
+    zkClient.delete(path, -1);
+  }
+
   /**
    * Helper class that periodically attempts creating a znode to ensure that
    * this RM continues to be the Active.

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java Sat Jul 12 02:24:40 2014
@@ -43,7 +43,7 @@ public abstract class ApplicationAttempt
       ApplicationAttemptId attemptId, Container container,
       ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
       String finalTrackingUrl, String diagnostics,
-      FinalApplicationStatus amUnregisteredFinalStatus) {
+      FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) {
     ApplicationAttemptStateData attemptStateData =
         Records.newRecord(ApplicationAttemptStateData.class);
     attemptStateData.setAttemptId(attemptId);
@@ -54,6 +54,7 @@ public abstract class ApplicationAttempt
     attemptStateData.setDiagnostics(diagnostics);
     attemptStateData.setStartTime(startTime);
     attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
+    attemptStateData.setAMContainerExitStatus(exitStatus);
     return attemptStateData;
   }
 
@@ -67,11 +68,11 @@ public abstract class ApplicationAttempt
       appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
     }
     return newInstance(attemptState.getAttemptId(),
-        attemptState.getMasterContainer(), appAttemptTokens,
-        attemptState.getStartTime(), attemptState.getState(),
-        attemptState.getFinalTrackingUrl(),
-        attemptState.getDiagnostics(),
-        attemptState.getFinalApplicationStatus());
+      attemptState.getMasterContainer(), appAttemptTokens,
+      attemptState.getStartTime(), attemptState.getState(),
+      attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
+      attemptState.getFinalApplicationStatus(),
+      attemptState.getAMContainerExitStatus());
   }
 
   public abstract ApplicationAttemptStateDataProto getProto();
@@ -150,5 +151,10 @@ public abstract class ApplicationAttempt
    */
   public abstract FinalApplicationStatus getFinalApplicationStatus();
 
-  public abstract void setFinalApplicationStatus(FinalApplicationStatus finishState);
+  public abstract void setFinalApplicationStatus(
+      FinalApplicationStatus finishState);
+
+  public abstract int getAMContainerExitStatus();
+
+  public abstract void setAMContainerExitStatus(int exitStatus);
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java Sat Jul 12 02:24:40 2014
@@ -253,6 +253,19 @@ public class ApplicationAttemptStateData
   }
 
   @Override
+  public int getAMContainerExitStatus() {
+    ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getAmContainerExitStatus();
+  }
+
+  @Override
+  public void setAMContainerExitStatus(int exitStatus) {
+    maybeInitBuilder();
+    builder.setAmContainerExitStatus(exitStatus);
+  }
+
+
+  @Override
   public boolean equals(Object other) {
     if (other == null)
       return false;
@@ -281,5 +294,4 @@ public class ApplicationAttemptStateData
   private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) {
     return ProtoUtils.convertFromProtoFormat(s);
   }
-
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Sat Jul 12 02:24:40 2014
@@ -223,4 +223,11 @@ public interface RMApp extends EventHand
    * @return the external user-facing state of ApplicationMaster.
    */
   YarnApplicationState createApplicationState();
+  
+  /**
+   * Get RMAppMetrics of the {@link RMApp}.
+   * 
+   * @return metrics
+   */
+  RMAppMetrics getRMAppMetrics();
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Sat Jul 12 02:24:40 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppStartAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -686,7 +688,12 @@ public class RMAppImpl implements RMApp,
         ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
     RMAppAttempt attempt =
         new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
-          submissionContext, conf, maxAppAttempts == attempts.size());
+          submissionContext, conf,
+          // The newly created attempt maybe last attempt if (number of
+          // previously failed attempts(which should not include Preempted,
+          // hardware error and NM resync) + 1) equal to the max-attempt
+          // limit.
+          maxAppAttempts == (getNumFailedAppAttempts() + 1));
     attempts.put(appAttemptId, attempt);
     currentAttempt = attempt;
   }
@@ -794,7 +801,7 @@ public class RMAppImpl implements RMApp,
           && (app.currentAttempt.getState() == RMAppAttemptState.KILLED
               || app.currentAttempt.getState() == RMAppAttemptState.FINISHED
               || (app.currentAttempt.getState() == RMAppAttemptState.FAILED
-                  && app.attempts.size() == app.maxAppAttempts))) {
+                  && app.getNumFailedAppAttempts() == app.maxAppAttempts))) {
         return RMAppState.ACCEPTED;
       }
 
@@ -885,7 +892,7 @@ public class RMAppImpl implements RMApp,
       msg = "Unmanaged application " + this.getApplicationId()
               + " failed due to " + failedEvent.getDiagnostics()
               + ". Failing the application.";
-    } else if (this.attempts.size() >= this.maxAppAttempts) {
+    } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) {
       msg = "Application " + this.getApplicationId() + " failed "
               + this.maxAppAttempts + " times due to "
               + failedEvent.getDiagnostics() + ". Failing the application.";
@@ -1102,6 +1109,18 @@ public class RMAppImpl implements RMApp,
     };
   }
 
+  private int getNumFailedAppAttempts() {
+    int completedAttempts = 0;
+    // Do not count AM preemption, hardware failures or NM resync
+    // as attempt failure.
+    for (RMAppAttempt attempt : attempts.values()) {
+      if (attempt.shouldCountTowardsMaxAttemptRetry()) {
+        completedAttempts++;
+      }
+    }
+    return completedAttempts;
+  }
+
   private static final class AttemptFailedTransition implements
       MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
 
@@ -1113,8 +1132,9 @@ public class RMAppImpl implements RMApp,
 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+
       if (!app.submissionContext.getUnmanagedAM()
-          && app.attempts.size() < app.maxAppAttempts) {
+          && app.getNumFailedAppAttempts() < app.maxAppAttempts) {
         boolean transferStateFromPreviousAttempt = false;
         RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
         transferStateFromPreviousAttempt =
@@ -1185,4 +1205,25 @@ public class RMAppImpl implements RMApp,
   public Set<NodeId> getRanNodes() {
     return ranNodes;
   }
+  
+  @Override
+  public RMAppMetrics getRMAppMetrics() {
+    Resource resourcePreempted = Resource.newInstance(0, 0);
+    int numAMContainerPreempted = 0;
+    int numNonAMContainerPreempted = 0;
+    for (RMAppAttempt attempt : attempts.values()) {
+      if (null != attempt) {
+        RMAppAttemptMetrics attemptMetrics =
+            attempt.getRMAppAttemptMetrics();
+        Resources.addTo(resourcePreempted,
+            attemptMetrics.getResourcePreempted());
+        numAMContainerPreempted += attemptMetrics.getIsPreempted() ? 1 : 0;
+        numNonAMContainerPreempted +=
+            attemptMetrics.getNumNonAMContainersPreempted();
+      }
+    }
+
+    return new RMAppMetrics(resourcePreempted,
+        numNonAMContainerPreempted, numAMContainerPreempted);
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java Sat Jul 12 02:24:40 2014
@@ -196,4 +196,21 @@ public interface RMAppAttempt extends Ev
    */
   ApplicationAttemptReport createApplicationAttemptReport();
 
+  /**
+   * Return the flag which indicates whether the attempt failure should be
+   * counted to attempt retry count.
+   * <ul>
+   * There failure types should not be counted to attempt retry count:
+   * <li>preempted by the scheduler.</li>
+   * <li>hardware failures, such as NM failing, lost NM and NM disk errors.</li>
+   * <li>killed by RM because of RM restart or failover.</li>
+   * </ul>
+   */
+  boolean shouldCountTowardsMaxAttemptRetry();
+  
+  /**
+   * Get metrics from the {@link RMAppAttempt}
+   * @return metrics
+   */
+  RMAppAttemptMetrics getRMAppAttemptMetrics();
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Sat Jul 12 02:24:40 2014
@@ -48,11 +48,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -146,9 +147,15 @@ public class RMAppAttemptImpl implements
   // if an RMAppAttemptUnregistrationEvent occurs
   private FinalApplicationStatus finalStatus = null;
   private final StringBuilder diagnostics = new StringBuilder();
+  private int amContainerExitStatus = ContainerExitStatus.INVALID;
 
   private Configuration conf;
-  private final boolean isLastAttempt;
+  // Since AM preemption, hardware error and NM resync are not counted towards
+  // AM failure count, even if this flag is true, a new attempt can still be
+  // re-created if this attempt is eventually failed because of preemption,
+  // hardware error or NM resync. So this flag indicates that this may be
+  // last attempt.
+  private final boolean maybeLastAttempt;
   private static final ExpiredTransition EXPIRED_TRANSITION =
       new ExpiredTransition();
 
@@ -157,6 +164,8 @@ public class RMAppAttemptImpl implements
   private RMAppAttemptState recoveredFinalState;
   private RMAppAttemptState stateBeforeFinalSaving;
   private Object transitionTodo;
+  
+  private RMAppAttemptMetrics attemptMetrics = null;
 
   private static final StateMachineFactory<RMAppAttemptImpl,
                                            RMAppAttemptState,
@@ -220,6 +229,12 @@ public class RMAppAttemptImpl implements
           RMAppAttemptEventType.KILL,
           new FinalSavingTransition(new BaseFinalTransition(
             RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
+      .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
+          RMAppAttemptState.FINAL_SAVING,
+          RMAppAttemptEventType.CONTAINER_FINISHED,
+          new FinalSavingTransition(
+            new AMContainerCrashedBeforeRunningTransition(), 
+            RMAppAttemptState.FAILED))
 
        // Transitions from LAUNCHED_UNMANAGED_SAVING State
       .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
@@ -329,7 +344,7 @@ public class RMAppAttemptImpl implements
       // use by the next new attempt.
       .addTransition(RMAppAttemptState.FAILED, RMAppAttemptState.FAILED,
           RMAppAttemptEventType.CONTAINER_FINISHED,
-          new ContainerFinishedAtFailedTransition())
+          new ContainerFinishedAtFinalStateTransition())
       .addTransition(
           RMAppAttemptState.FAILED,
           RMAppAttemptState.FAILED,
@@ -365,8 +380,11 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.CONTAINER_ALLOCATED,
-              RMAppAttemptEventType.CONTAINER_FINISHED,
               RMAppAttemptEventType.KILL))
+      .addTransition(RMAppAttemptState.FINISHED, 
+          RMAppAttemptState.FINISHED, 
+          RMAppAttemptEventType.CONTAINER_FINISHED, 
+          new ContainerFinishedAtFinalStateTransition())
 
       // Transitions from KILLED State
       .addTransition(
@@ -379,17 +397,20 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.REGISTERED,
               RMAppAttemptEventType.CONTAINER_ALLOCATED,
-              RMAppAttemptEventType.CONTAINER_FINISHED,
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.KILL,
               RMAppAttemptEventType.STATUS_UPDATE))
+      .addTransition(RMAppAttemptState.KILLED, 
+          RMAppAttemptState.KILLED, 
+          RMAppAttemptEventType.CONTAINER_FINISHED, 
+          new ContainerFinishedAtFinalStateTransition())
     .installTopology();
 
   public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
       RMContext rmContext, YarnScheduler scheduler,
       ApplicationMasterService masterService,
       ApplicationSubmissionContext submissionContext,
-      Configuration conf, boolean isLastAttempt) {
+      Configuration conf, boolean maybeLastAttempt) {
     this.conf = conf;
     this.applicationAttemptId = appAttemptId;
     this.rmContext = rmContext;
@@ -403,8 +424,9 @@ public class RMAppAttemptImpl implements
     this.writeLock = lock.writeLock();
 
     this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
-    this.isLastAttempt = isLastAttempt;
+    this.maybeLastAttempt = maybeLastAttempt;
     this.stateMachine = stateMachineFactory.make(this);
+    this.attemptMetrics = new RMAppAttemptMetrics(applicationAttemptId);
   }
 
   @Override
@@ -565,6 +587,15 @@ public class RMAppAttemptImpl implements
     }
   }
 
+  public int getAMContainerExitStatus() {
+    this.readLock.lock();
+    try {
+      return this.amContainerExitStatus;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
   @Override
   public float getProgress() {
     this.readLock.lock();
@@ -671,6 +702,10 @@ public class RMAppAttemptImpl implements
         + attemptState.getState());
     diagnostics.append("Attempt recovered after RM restart");
     diagnostics.append(attemptState.getDiagnostics());
+    this.amContainerExitStatus = attemptState.getAMContainerExitStatus();
+    if (amContainerExitStatus == ContainerExitStatus.PREEMPTED) {
+      this.attemptMetrics.setIsPreempted();
+    }
     setMasterContainer(attemptState.getMasterContainer());
     recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
     this.recoveredFinalState = attemptState.getState();
@@ -816,7 +851,10 @@ public class RMAppAttemptImpl implements
 
       // Set the masterContainer
       appAttempt.setMasterContainer(amContainerAllocation.getContainers()
-        .get(0));
+          .get(0));
+      RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler
+          .getRMContainer(appAttempt.getMasterContainer().getId());
+      rmMasterContainer.setAMContainer(true);
       // The node set in NMTokenSecrentManager is used for marking whether the
       // NMToken has been issued for this node to the AM.
       // When AM container was allocated to RM itself, the node which allocates
@@ -884,8 +922,12 @@ public class RMAppAttemptImpl implements
       } else {
         // Add the current attempt to the scheduler.
         if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) {
+          // Need to register an app attempt before AM can register
+          appAttempt.masterService
+              .registerAppAttempt(appAttempt.applicationAttemptId);
+
           appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
-            appAttempt.getAppAttemptId(), false));
+            appAttempt.getAppAttemptId(), false, false));
         }
 
         /*
@@ -931,7 +973,7 @@ public class RMAppAttemptImpl implements
     String diags = null;
     String finalTrackingUrl = null;
     FinalApplicationStatus finalStatus = null;
-
+    int exitStatus = ContainerExitStatus.INVALID;
     switch (event.getType()) {
     case LAUNCH_FAILED:
       RMAppAttemptLaunchFailedEvent launchFaileEvent =
@@ -952,6 +994,7 @@ public class RMAppAttemptImpl implements
       RMAppAttemptContainerFinishedEvent finishEvent =
           (RMAppAttemptContainerFinishedEvent) event;
       diags = getAMContainerCrashedDiagnostics(finishEvent);
+      exitStatus = finishEvent.getContainerStatus().getExitStatus();
       break;
     case KILL:
       break;
@@ -966,9 +1009,10 @@ public class RMAppAttemptImpl implements
     ApplicationAttemptState attemptState =
         new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
           rmStore.getCredentialsFromAppAttempt(this), startTime,
-          stateToBeStored, finalTrackingUrl, diags, finalStatus);
+          stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus);
     LOG.info("Updating application attempt " + applicationAttemptId
-        + " with final state: " + targetedFinalState);
+        + " with final state: " + targetedFinalState + ", and exit status: "
+        + exitStatus);
     rmStore.updateApplicationAttemptState(attemptState);
   }
 
@@ -1061,11 +1105,20 @@ public class RMAppAttemptImpl implements
           // don't leave the tracking URL pointing to a non-existent AM
           appAttempt.setTrackingUrlToRMAppPage();
           appAttempt.invalidateAMHostAndPort();
+
           if (appAttempt.submissionContext
             .getKeepContainersAcrossApplicationAttempts()
-              && !appAttempt.isLastAttempt
               && !appAttempt.submissionContext.getUnmanagedAM()) {
-            keepContainersAcrossAppAttempts = true;
+            // See if we should retain containers for non-unmanaged applications
+            if (!appAttempt.shouldCountTowardsMaxAttemptRetry()) {
+              // Premption, hardware failures, NM resync doesn't count towards
+              // app-failures and so we should retain containers.
+              keepContainersAcrossAppAttempts = true;
+            } else if (!appAttempt.maybeLastAttempt) {
+              // Not preemption, hardware failures or NM resync.
+              // Not last-attempt too - keep containers.
+              keepContainersAcrossAppAttempts = true;
+            }
           }
           appEvent =
               new RMAppFailedAttemptEvent(applicationId,
@@ -1105,7 +1158,21 @@ public class RMAppAttemptImpl implements
         appAttempt.getClientTokenMasterKey());
     }
   }
-  
+
+  @Override
+  public boolean shouldCountTowardsMaxAttemptRetry() {
+    try {
+      this.readLock.lock();
+      int exitStatus = getAMContainerExitStatus();
+      return !(exitStatus == ContainerExitStatus.PREEMPTED
+          || exitStatus == ContainerExitStatus.ABORTED
+          || exitStatus == ContainerExitStatus.DISKS_FAILED
+          || exitStatus == ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
   private static final class UnmanagedAMAttemptSavedTransition 
                                                 extends AMLaunchedTransition {
     @Override
@@ -1208,22 +1275,30 @@ public class RMAppAttemptImpl implements
       appAttempt.rmContext.getAMLivelinessMonitor().unregister(
           appAttempt.getAppAttemptId());
 
-      // Setup diagnostic message
-      appAttempt.diagnostics
-        .append(getAMContainerCrashedDiagnostics(finishEvent));
+      // Setup diagnostic message and exit status
+      appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent);
+
       // Tell the app, scheduler
       super.transition(appAttempt, finishEvent);
     }
   }
 
-  private static String getAMContainerCrashedDiagnostics(
+  private void setAMContainerCrashedDiagnosticsAndExitStatus(
+      RMAppAttemptContainerFinishedEvent finishEvent) {
+    ContainerStatus status = finishEvent.getContainerStatus();
+    String diagnostics = getAMContainerCrashedDiagnostics(finishEvent);
+    this.diagnostics.append(diagnostics);
+    this.amContainerExitStatus = status.getExitStatus();
+  }
+
+  private String getAMContainerCrashedDiagnostics(
       RMAppAttemptContainerFinishedEvent finishEvent) {
     ContainerStatus status = finishEvent.getContainerStatus();
     String diagnostics =
         "AM Container for " + finishEvent.getApplicationAttemptId()
-            + " exited with " + " exitCode: " + status.getExitStatus()
-            + " due to: " + status.getDiagnostics() + "."
-            + "Failing this attempt.";
+        + " exited with " + " exitCode: " + status.getExitStatus() + ". "
+        + "Check application tracking page: " + this.getTrackingUrl()
+        + " . Then, click on links to logs of each attempt for detailed output. ";
     return diagnostics;
   }
 
@@ -1396,8 +1471,8 @@ public class RMAppAttemptImpl implements
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
 
-      RMAppAttemptContainerFinishedEvent containerFinishedEvent
-        = (RMAppAttemptContainerFinishedEvent) event;
+      RMAppAttemptContainerFinishedEvent containerFinishedEvent =
+          (RMAppAttemptContainerFinishedEvent) event;
       ContainerStatus containerStatus =
           containerFinishedEvent.getContainerStatus();
 
@@ -1405,26 +1480,28 @@ public class RMAppAttemptImpl implements
       // the AMContainer, AppAttempt fails
       if (appAttempt.masterContainer != null
           && appAttempt.masterContainer.getId().equals(
-            containerStatus.getContainerId())) {
+              containerStatus.getContainerId())) {
+
         // Remember the follow up transition and save the final attempt state.
         appAttempt.rememberTargetTransitionsAndStoreState(event,
-          transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
+            transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
         return RMAppAttemptState.FINAL_SAVING;
       }
 
-      // Normal container.Put it in completedcontainers list
+      // Normal container.Put it in completed containers list
       appAttempt.justFinishedContainers.add(containerStatus);
       return this.currentState;
     }
   }
 
-  private static final class ContainerFinishedAtFailedTransition
+  private static final class ContainerFinishedAtFinalStateTransition
       extends BaseTransition {
     @Override
     public void
         transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
       RMAppAttemptContainerFinishedEvent containerFinishedEvent =
           (RMAppAttemptContainerFinishedEvent) event;
+      
       ContainerStatus containerStatus =
           containerFinishedEvent.getContainerStatus();
       // Normal container. Add it in completed containers list
@@ -1437,13 +1514,12 @@ public class RMAppAttemptImpl implements
     @Override
     public void
         transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
-      RMAppAttemptContainerFinishedEvent containerFinishedEvent =
+      RMAppAttemptContainerFinishedEvent finishEvent =
           (RMAppAttemptContainerFinishedEvent) event;
       // container associated with AM. must not be unmanaged
       assert appAttempt.submissionContext.getUnmanagedAM() == false;
-      // Setup diagnostic message
-      appAttempt.diagnostics
-        .append(getAMContainerCrashedDiagnostics(containerFinishedEvent));
+      // Setup diagnostic message and exit status
+      appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent);
       new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt,
         event);
     }
@@ -1644,4 +1720,16 @@ public class RMAppAttemptImpl implements
     }
     return attemptReport;
   }
+
+  // for testing
+  public boolean mayBeLastAttempt() {
+    return maybeLastAttempt;
+  }
+
+  @Override
+  public RMAppAttemptMetrics getRMAppAttemptMetrics() {
+    // didn't use read/write lock here because RMAppAttemptMetrics has its own
+    // lock
+    return attemptMetrics;
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java Sat Jul 12 02:24:40 2014
@@ -58,7 +58,7 @@ public interface RMContainer extends Eve
 
   Priority getAllocatedPriority();
 
-  long getStartTime();
+  long getCreationTime();
 
   long getFinishTime();
 
@@ -71,5 +71,7 @@ public interface RMContainer extends Eve
   ContainerState getContainerState();
   
   ContainerReport createContainerReport();
+  
+  boolean isAMContainer();
 
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Sat Jul 12 02:24:40 2014
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
@@ -152,26 +154,32 @@ public class RMContainerImpl implements 
   private Resource reservedResource;
   private NodeId reservedNode;
   private Priority reservedPriority;
-  private long startTime;
+  private long creationTime;
   private long finishTime;
   private ContainerStatus finishedStatus;
+  private boolean isAMContainer;
 
-
-
+  public RMContainerImpl(Container container,
+      ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
+      RMContext rmContext) {
+    this(container, appAttemptId, nodeId, user, rmContext, System
+      .currentTimeMillis());
+  }
 
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId,
-      String user, RMContext rmContext) {
+      String user, RMContext rmContext, long creationTime) {
     this.stateMachine = stateMachineFactory.make(this);
     this.containerId = container.getId();
     this.nodeId = nodeId;
     this.container = container;
     this.appAttemptId = appAttemptId;
     this.user = user;
-    this.startTime = System.currentTimeMillis();
+    this.creationTime = creationTime;
     this.rmContext = rmContext;
     this.eventHandler = rmContext.getDispatcher().getEventHandler();
     this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
+    this.isAMContainer = false;
     
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
@@ -237,8 +245,8 @@ public class RMContainerImpl implements 
   }
 
   @Override
-  public long getStartTime() {
-    return startTime;
+  public long getCreationTime() {
+    return creationTime;
   }
 
   @Override
@@ -310,6 +318,25 @@ public class RMContainerImpl implements 
   }
   
   @Override
+  public boolean isAMContainer() {
+    try {
+      readLock.lock();
+      return isAMContainer;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public void setAMContainer(boolean isAMContainer) {
+    try {
+      writeLock.lock();
+      this.isAMContainer = isAMContainer;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+  
+  @Override
   public void handle(RMContainerEvent event) {
     LOG.debug("Processing " + event.getContainerId() + " of type " + event.getType());
     try {
@@ -433,10 +460,30 @@ public class RMContainerImpl implements 
       container.finishTime = System.currentTimeMillis();
       container.finishedStatus = finishedEvent.getRemoteContainerStatus();
       // Inform AppAttempt
+      // container.getContainer() can return null when a RMContainer is a
+      // reserved container
+      updateMetricsIfPreempted(container);
+
       container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
-          container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
-      container.rmContext.getRMApplicationHistoryWriter()
-          .containerFinished(container);
+        container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
+
+      container.rmContext.getRMApplicationHistoryWriter().containerFinished(
+        container);
+    }
+
+    private static void updateMetricsIfPreempted(RMContainerImpl container) {
+      // If this is a preempted container, update preemption metrics
+      if (ContainerExitStatus.PREEMPTED == container.finishedStatus
+        .getExitStatus()) {
+
+        Resource resource = container.getContainer().getResource();
+        RMAppAttempt rmAttempt =
+            container.rmContext.getRMApps()
+              .get(container.getApplicationAttemptId().getApplicationId())
+              .getCurrentAppAttempt();
+        rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,
+          container);
+      }
     }
   }
 
@@ -478,7 +525,7 @@ public class RMContainerImpl implements 
     try {
       containerReport = ContainerReport.newInstance(this.getContainerId(),
           this.getAllocatedResource(), this.getAllocatedNode(),
-          this.getAllocatedPriority(), this.getStartTime(),
+          this.getAllocatedPriority(), this.getCreationTime(),
           this.getFinishTime(), this.getDiagnosticsInfo(), this.getLogURL(),
           this.getContainerExitStatus(), this.getContainerState());
     } finally {
@@ -486,5 +533,4 @@ public class RMContainerImpl implements 
     }
     return containerReport;
   }
-
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Sat Jul 12 02:24:40 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
@@ -242,20 +243,35 @@ public abstract class AbstractYarnSchedu
 
       // recover scheduler attempt
       schedulerAttempt.recoverContainer(rmContainer);
+            
+      // set master container for the current running AMContainer for this
+      // attempt.
+      RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
+      if (appAttempt != null) {
+        Container masterContainer = appAttempt.getMasterContainer();
+
+        // Mark current running AMContainer's RMContainer based on the master
+        // container ID stored in AppAttempt.
+        if (masterContainer != null
+            && masterContainer.getId().equals(rmContainer.getContainerId())) {
+          ((RMContainerImpl)rmContainer).setAMContainer(true);
+        }
+      }
     }
   }
 
-  private RMContainer recoverAndCreateContainer(NMContainerStatus report,
+  private RMContainer recoverAndCreateContainer(NMContainerStatus status,
       RMNode node) {
     Container container =
-        Container.newInstance(report.getContainerId(), node.getNodeID(),
-          node.getHttpAddress(), report.getAllocatedResource(),
-          report.getPriority(), null);
+        Container.newInstance(status.getContainerId(), node.getNodeID(),
+          node.getHttpAddress(), status.getAllocatedResource(),
+          status.getPriority(), null);
     ApplicationAttemptId attemptId =
         container.getId().getApplicationAttemptId();
     RMContainer rmContainer =
         new RMContainerImpl(container, attemptId, node.getNodeID(),
-          applications.get(attemptId.getApplicationId()).getUser(), rmContext);
+          applications.get(attemptId.getApplicationId()).getUser(), rmContext,
+          status.getCreationTime());
     return rmContainer;
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Sat Jul 12 02:24:40 2014
@@ -57,7 +57,10 @@ public class AppSchedulingInfo {
   private final String queueName;
   Queue queue;
   final String user;
-  private final AtomicInteger containerIdCounter = new AtomicInteger(0);
+  // TODO making containerIdCounter long
+  private final AtomicInteger containerIdCounter;
+  private final int EPOCH_BIT_MASK = 0x3ff;
+  private final int EPOCH_BIT_SHIFT = 22;
 
   final Set<Priority> priorities = new TreeSet<Priority>(
       new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
@@ -70,15 +73,19 @@ public class AppSchedulingInfo {
   
   /* Allocated by scheduler */
   boolean pending = true; // for app metrics
-
+  
+ 
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
-      String user, Queue queue, ActiveUsersManager activeUsersManager) {
+      String user, Queue queue, ActiveUsersManager activeUsersManager,
+      int epoch) {
     this.applicationAttemptId = appAttemptId;
     this.applicationId = appAttemptId.getApplicationId();
     this.queue = queue;
     this.queueName = queue.getQueueName();
     this.user = user;
     this.activeUsersManager = activeUsersManager;
+    this.containerIdCounter = new AtomicInteger(
+        (epoch & EPOCH_BIT_MASK) << EPOCH_BIT_SHIFT);
   }
 
   public ApplicationId getApplicationId() {
@@ -413,9 +420,6 @@ public class AppSchedulingInfo {
   }
 
   public synchronized void recoverContainer(RMContainer rmContainer) {
-    // ContainerIdCounter on recovery will be addressed in YARN-2052
-    this.containerIdCounter.incrementAndGet();
-
     QueueMetrics metrics = queue.getMetrics();
     if (pending) {
       // If there was any container to recover, the application was

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Sat Jul 12 02:24:40 2014
@@ -17,6 +17,7 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -106,13 +108,14 @@ public class SchedulerApplicationAttempt
   public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, 
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       RMContext rmContext) {
+    Preconditions.checkNotNull("RMContext should not be null", rmContext);
     this.rmContext = rmContext;
     this.appSchedulingInfo = 
         new AppSchedulingInfo(applicationAttemptId, user, queue,  
-            activeUsersManager);
+            activeUsersManager, rmContext.getEpoch());
     this.queue = queue;
-
-    if (rmContext != null && rmContext.getRMApps() != null &&
+    
+    if (rmContext.getRMApps() != null &&
         rmContext.getRMApps()
             .containsKey(applicationAttemptId.getApplicationId())) {
       ApplicationSubmissionContext appSubmissionContext =
@@ -414,7 +417,8 @@ public class SchedulerApplicationAttempt
         // create container token and NMToken altogether.
         container.setContainerToken(rmContext.getContainerTokenSecretManager()
           .createContainerToken(container.getId(), container.getNodeId(),
-            getUser(), container.getResource()));
+            getUser(), container.getResource(), container.getPriority(),
+            rmContainer.getCreationTime()));
         NMToken nmToken =
             rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
               getApplicationAttemptId(), container);

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Sat Jul 12 02:24:40 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import com.google.common.base.Preconditions;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -30,6 +31,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -180,7 +182,7 @@ public class CapacityScheduler extends
 
   private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
 
-  private int numNodeManagers = 0;
+  private AtomicInteger numNodeManagers = new AtomicInteger(0);
 
   private ResourceCalculator calculator;
   private boolean usePortForNodeName;
@@ -236,8 +238,8 @@ public class CapacityScheduler extends
   }
 
   @Override
-  public synchronized int getNumClusterNodes() {
-    return numNodeManagers;
+  public int getNumClusterNodes() {
+    return numNodeManagers.get();
   }
 
   @Override
@@ -557,7 +559,8 @@ public class CapacityScheduler extends
 
   private synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
-      boolean transferStateFromPreviousAttempt) {
+      boolean transferStateFromPreviousAttempt,
+      boolean shouldNotifyAttemptAdded) {
     SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     CSQueue queue = (CSQueue) application.getQueue();
@@ -575,9 +578,15 @@ public class CapacityScheduler extends
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user " + application.getUser() + " in queue "
         + queue.getQueueName());
-    rmContext.getDispatcher().getEventHandler() .handle(
-        new RMAppAttemptEvent(applicationAttemptId,
-          RMAppAttemptEventType.ATTEMPT_ADDED));
+    if (shouldNotifyAttemptAdded) {
+      rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptEvent(applicationAttemptId,
+              RMAppAttemptEventType.ATTEMPT_ADDED));
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+      }
+    }
   }
 
   private synchronized void doneApplication(ApplicationId applicationId,
@@ -911,7 +920,8 @@ public class CapacityScheduler extends
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -945,11 +955,11 @@ public class CapacityScheduler extends
         usePortForNodeName));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     root.updateClusterResource(clusterResource);
-    ++numNodeManagers;
+    int numNodes = numNodeManagers.incrementAndGet();
     LOG.info("Added node " + nodeManager.getNodeAddress() + 
         " clusterResource: " + clusterResource);
 
-    if (scheduleAsynchronously && numNodeManagers == 1) {
+    if (scheduleAsynchronously && numNodes == 1) {
       asyncSchedulerThread.beginSchedule();
     }
   }
@@ -961,9 +971,9 @@ public class CapacityScheduler extends
     }
     Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
     root.updateClusterResource(clusterResource);
-    --numNodeManagers;
+    int numNodes = numNodeManagers.decrementAndGet();
 
-    if (scheduleAsynchronously && numNodeManagers == 0) {
+    if (scheduleAsynchronously && numNodes == 0) {
       asyncSchedulerThread.suspendSchedule();
     }
     
@@ -1076,14 +1086,12 @@ public class CapacityScheduler extends
 
   @Override
   public void killContainer(RMContainer cont) {
-    if(LOG.isDebugEnabled()){
+    if (LOG.isDebugEnabled()) {
       LOG.debug("KILL_CONTAINER: container" + cont.toString());
     }
-    completedContainer(cont,
-        SchedulerUtils.createPreemptedContainerStatus(
-            cont.getContainerId(),"Container being forcibly preempted:"
-        + cont.getContainerId()),
-        RMContainerEventType.KILL);
+    completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
+      cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
+      RMContainerEventType.KILL);
   }
 
   @Override



Mime
View raw message