hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cl...@apache.org
Subject svn commit: r1617532 [3/4] - in /hadoop/common/branches/fs-encryption/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/proto/ had...
Date Tue, 12 Aug 2014 17:02:15 GMT
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java Tue Aug 12 17:02:07 2014
@@ -35,6 +35,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -45,6 +47,7 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.yarn.server.records.Version;
@@ -90,6 +93,14 @@ public class NMLeveldbStateStoreService 
   private static final String LOCALIZATION_FILECACHE_SUFFIX = "filecache/";
   private static final String LOCALIZATION_APPCACHE_SUFFIX = "appcache/";
 
+  private static final String CONTAINERS_KEY_PREFIX =
+      "ContainerManager/containers/";
+  private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request";
+  private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
+  private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
+  private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
+  private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
+
   private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey";
   private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey";
   private static final String NM_TOKENS_KEY_PREFIX = "NMTokens/";
@@ -104,6 +115,8 @@ public class NMLeveldbStateStoreService 
   private static final String CONTAINER_TOKENS_PREV_MASTER_KEY =
       CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
 
+  private static final byte[] EMPTY_VALUE = new byte[0];
+
   private DB db;
 
   public NMLeveldbStateStoreService() {
@@ -123,6 +136,160 @@ public class NMLeveldbStateStoreService 
 
 
   @Override
+  public List<RecoveredContainerState> loadContainersState()
+      throws IOException {
+    ArrayList<RecoveredContainerState> containers =
+        new ArrayList<RecoveredContainerState>();
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(CONTAINERS_KEY_PREFIX));
+
+      while (iter.hasNext()) {
+        Entry<byte[],byte[]> entry = iter.peekNext();
+        String key = asString(entry.getKey());
+        if (!key.startsWith(CONTAINERS_KEY_PREFIX)) {
+          break;
+        }
+
+        int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length());
+        if (idEndPos < 0) {
+          throw new IOException("Unable to determine container in key: " + key);
+        }
+        ContainerId containerId = ConverterUtils.toContainerId(
+            key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos));
+        String keyPrefix = key.substring(0, idEndPos+1);
+        containers.add(loadContainerState(containerId, iter, keyPrefix));
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+
+    return containers;
+  }
+
+  private RecoveredContainerState loadContainerState(ContainerId containerId,
+      LeveldbIterator iter, String keyPrefix) throws IOException {
+    RecoveredContainerState rcs = new RecoveredContainerState();
+    rcs.status = RecoveredContainerStatus.REQUESTED;
+    while (iter.hasNext()) {
+      Entry<byte[],byte[]> entry = iter.peekNext();
+      String key = asString(entry.getKey());
+      if (!key.startsWith(keyPrefix)) {
+        break;
+      }
+      iter.next();
+
+      String suffix = key.substring(keyPrefix.length()-1);  // start with '/'
+      if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) {
+        rcs.startRequest = new StartContainerRequestPBImpl(
+            StartContainerRequestProto.parseFrom(entry.getValue()));
+      } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) {
+        rcs.diagnostics = asString(entry.getValue());
+      } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
+        if (rcs.status == RecoveredContainerStatus.REQUESTED) {
+          rcs.status = RecoveredContainerStatus.LAUNCHED;
+        }
+      } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) {
+        rcs.killed = true;
+      } else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) {
+        rcs.status = RecoveredContainerStatus.COMPLETED;
+        rcs.exitCode = Integer.parseInt(asString(entry.getValue()));
+      } else {
+        throw new IOException("Unexpected container state key: " + key);
+      }
+    }
+    return rcs;
+  }
+
+  @Override
+  public void storeContainer(ContainerId containerId,
+      StartContainerRequest startRequest) throws IOException {
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_REQUEST_KEY_SUFFIX;
+    try {
+      db.put(bytes(key),
+        ((StartContainerRequestPBImpl) startRequest).getProto().toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void storeContainerDiagnostics(ContainerId containerId,
+      StringBuilder diagnostics) throws IOException {
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_DIAGS_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), bytes(diagnostics.toString()));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void storeContainerLaunched(ContainerId containerId)
+      throws IOException {
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_LAUNCHED_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), EMPTY_VALUE);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void storeContainerKilled(ContainerId containerId)
+      throws IOException {
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_KILLED_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), EMPTY_VALUE);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void storeContainerCompleted(ContainerId containerId,
+      int exitCode) throws IOException {
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_EXIT_CODE_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), bytes(Integer.toString(exitCode)));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void removeContainer(ContainerId containerId)
+      throws IOException {
+    String keyPrefix = CONTAINERS_KEY_PREFIX + containerId.toString();
+    try {
+      WriteBatch batch = db.createWriteBatch();
+      try {
+        batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
+        db.write(batch);
+      } finally {
+        batch.close();
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+
+  @Override
   public RecoveredApplicationsState loadApplicationsState()
       throws IOException {
     RecoveredApplicationsState state = new RecoveredApplicationsState();

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java Tue Aug 12 17:02:07 2014
@@ -19,9 +19,11 @@
 package org.apache.hadoop.yarn.server.nodemanager.recovery;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -63,6 +65,42 @@ public class NMNullStateStoreService ext
   }
 
   @Override
+  public List<RecoveredContainerState> loadContainersState()
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "Recovery not supported by this state store");
+  }
+
+  @Override
+  public void storeContainer(ContainerId containerId,
+      StartContainerRequest startRequest) throws IOException {
+  }
+
+  @Override
+  public void storeContainerDiagnostics(ContainerId containerId,
+      StringBuilder diagnostics) throws IOException {
+  }
+
+  @Override
+  public void storeContainerLaunched(ContainerId containerId)
+      throws IOException {
+  }
+
+  @Override
+  public void storeContainerKilled(ContainerId containerId)
+      throws IOException {
+  }
+
+  @Override
+  public void storeContainerCompleted(ContainerId containerId, int exitCode)
+      throws IOException {
+  }
+
+  @Override
+  public void removeContainer(ContainerId containerId) throws IOException {
+  }
+
+  @Override
   public RecoveredLocalizationState loadLocalizationState()
       throws IOException {
     throw new UnsupportedOperationException(

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java Tue Aug 12 17:02:07 2014
@@ -29,8 +29,10 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
@@ -59,6 +61,40 @@ public abstract class NMStateStoreServic
     }
   }
 
+  public enum RecoveredContainerStatus {
+    REQUESTED,
+    LAUNCHED,
+    COMPLETED
+  }
+
+  public static class RecoveredContainerState {
+    RecoveredContainerStatus status;
+    int exitCode = ContainerExitStatus.INVALID;
+    boolean killed = false;
+    String diagnostics = "";
+    StartContainerRequest startRequest;
+
+    public RecoveredContainerStatus getStatus() {
+      return status;
+    }
+
+    public int getExitCode() {
+      return exitCode;
+    }
+
+    public boolean getKilled() {
+      return killed;
+    }
+
+    public String getDiagnostics() {
+      return diagnostics;
+    }
+
+    public StartContainerRequest getStartRequest() {
+      return startRequest;
+    }
+  }
+
   public static class LocalResourceTrackerState {
     List<LocalizedResourceProto> localizedResources =
         new ArrayList<LocalizedResourceProto>();
@@ -176,20 +212,101 @@ public abstract class NMStateStoreServic
   }
 
 
+  /**
+   * Load the state of applications
+   * @return recovered state for applications
+   * @throws IOException
+   */
   public abstract RecoveredApplicationsState loadApplicationsState()
       throws IOException;
 
+  /**
+   * Record the start of an application
+   * @param appId the application ID
+   * @param p state to store for the application
+   * @throws IOException
+   */
   public abstract void storeApplication(ApplicationId appId,
       ContainerManagerApplicationProto p) throws IOException;
 
+  /**
+   * Record that an application has finished
+   * @param appId the application ID
+   * @throws IOException
+   */
   public abstract void storeFinishedApplication(ApplicationId appId)
       throws IOException;
 
+  /**
+   * Remove records corresponding to an application
+   * @param appId the application ID
+   * @throws IOException
+   */
   public abstract void removeApplication(ApplicationId appId)
       throws IOException;
 
 
   /**
+   * Load the state of containers
+   * @return recovered state for containers
+   * @throws IOException
+   */
+  public abstract List<RecoveredContainerState> loadContainersState()
+      throws IOException;
+
+  /**
+   * Record a container start request
+   * @param containerId the container ID
+   * @param startRequest the container start request
+   * @throws IOException
+   */
+  public abstract void storeContainer(ContainerId containerId,
+      StartContainerRequest startRequest) throws IOException;
+
+  /**
+   * Record that a container has been launched
+   * @param containerId the container ID
+   * @throws IOException
+   */
+  public abstract void storeContainerLaunched(ContainerId containerId)
+      throws IOException;
+
+  /**
+   * Record that a container has completed
+   * @param containerId the container ID
+   * @param exitCode the exit code from the container
+   * @throws IOException
+   */
+  public abstract void storeContainerCompleted(ContainerId containerId,
+      int exitCode) throws IOException;
+
+  /**
+   * Record a request to kill a container
+   * @param containerId the container ID
+   * @throws IOException
+   */
+  public abstract void storeContainerKilled(ContainerId containerId)
+      throws IOException;
+
+  /**
+   * Record diagnostics for a container
+   * @param containerId the container ID
+   * @param diagnostics the container diagnostics
+   * @throws IOException
+   */
+  public abstract void storeContainerDiagnostics(ContainerId containerId,
+      StringBuilder diagnostics) throws IOException;
+
+  /**
+   * Remove records corresponding to a container
+   * @param containerId the container ID
+   * @throws IOException
+   */
+  public abstract void removeContainer(ContainerId containerId)
+      throws IOException;
+
+
+  /**
    * Load the state of localized resources
    * @return recovered localized resource state
    * @throws IOException
@@ -230,43 +347,111 @@ public abstract class NMStateStoreServic
       ApplicationId appId, Path localPath) throws IOException;
 
 
+  /**
+   * Load the state of the deletion service
+   * @return recovered deletion service state
+   * @throws IOException
+   */
   public abstract RecoveredDeletionServiceState loadDeletionServiceState()
       throws IOException;
 
+  /**
+   * Record a deletion task
+   * @param taskId the deletion task ID
+   * @param taskProto the deletion task protobuf
+   * @throws IOException
+   */
   public abstract void storeDeletionTask(int taskId,
       DeletionServiceDeleteTaskProto taskProto) throws IOException;
 
+  /**
+   * Remove records corresponding to a deletion task
+   * @param taskId the deletion task ID
+   * @throws IOException
+   */
   public abstract void removeDeletionTask(int taskId) throws IOException;
 
 
+  /**
+   * Load the state of NM tokens
+   * @return recovered state of NM tokens
+   * @throws IOException
+   */
   public abstract RecoveredNMTokensState loadNMTokensState()
       throws IOException;
 
+  /**
+   * Record the current NM token master key
+   * @param key the master key
+   * @throws IOException
+   */
   public abstract void storeNMTokenCurrentMasterKey(MasterKey key)
       throws IOException;
 
+  /**
+   * Record the previous NM token master key
+   * @param key the previous master key
+   * @throws IOException
+   */
   public abstract void storeNMTokenPreviousMasterKey(MasterKey key)
       throws IOException;
 
+  /**
+   * Record a master key corresponding to an application
+   * @param attempt the application attempt ID
+   * @param key the master key
+   * @throws IOException
+   */
   public abstract void storeNMTokenApplicationMasterKey(
       ApplicationAttemptId attempt, MasterKey key) throws IOException;
 
+  /**
+   * Remove a master key corresponding to an application
+   * @param attempt the application attempt ID
+   * @throws IOException
+   */
   public abstract void removeNMTokenApplicationMasterKey(
       ApplicationAttemptId attempt) throws IOException;
 
 
+  /**
+   * Load the state of container tokens
+   * @return recovered state of container tokens
+   * @throws IOException
+   */
   public abstract RecoveredContainerTokensState loadContainerTokensState()
       throws IOException;
 
+  /**
+   * Record the current container token master key
+   * @param key the master key
+   * @throws IOException
+   */
   public abstract void storeContainerTokenCurrentMasterKey(MasterKey key)
       throws IOException;
 
+  /**
+   * Record the previous container token master key
+   * @param key the previous master key
+   * @throws IOException
+   */
   public abstract void storeContainerTokenPreviousMasterKey(MasterKey key)
       throws IOException;
 
+  /**
+   * Record the expiration time for a container token
+   * @param containerId the container ID
+   * @param expirationTime the container token expiration time
+   * @throws IOException
+   */
   public abstract void storeContainerToken(ContainerId containerId,
       Long expirationTime) throws IOException;
 
+  /**
+   * Remove records for a container token
+   * @param containerId the container ID
+   * @throws IOException
+   */
   public abstract void removeContainerToken(ContainerId containerId)
       throws IOException;
 

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c Tue Aug 12 17:02:07 2014
@@ -33,6 +33,7 @@
 #include <limits.h>
 #include <sys/stat.h>
 #include <sys/mount.h>
+#include <sys/wait.h>
 
 static const int DEFAULT_MIN_USERID = 1000;
 
@@ -245,6 +246,85 @@ static int write_pid_to_file_as_nm(const
 }
 
 /**
+ * Write the exit code of the container into the exit code file
+ * exit_code_file: Path to exit code file where exit code needs to be written
+ */
+static int write_exit_code_file(const char* exit_code_file, int exit_code) {
+  char *tmp_ecode_file = concatenate("%s.tmp", "exit_code_path", 1,
+      exit_code_file);
+  if (tmp_ecode_file == NULL) {
+    return -1;
+  }
+
+  // create with 700
+  int ecode_fd = open(tmp_ecode_file, O_WRONLY|O_CREAT|O_EXCL, S_IRWXU);
+  if (ecode_fd == -1) {
+    fprintf(LOGFILE, "Can't open file %s - %s\n", tmp_ecode_file,
+           strerror(errno));
+    free(tmp_ecode_file);
+    return -1;
+  }
+
+  char ecode_buf[21];
+  snprintf(ecode_buf, sizeof(ecode_buf), "%d", exit_code);
+  ssize_t written = write(ecode_fd, ecode_buf, strlen(ecode_buf));
+  close(ecode_fd);
+  if (written == -1) {
+    fprintf(LOGFILE, "Failed to write exit code to file %s - %s\n",
+       tmp_ecode_file, strerror(errno));
+    free(tmp_ecode_file);
+    return -1;
+  }
+
+  // rename temp file to actual exit code file
+  // use rename as atomic
+  if (rename(tmp_ecode_file, exit_code_file)) {
+    fprintf(LOGFILE, "Can't move exit code file from %s to %s - %s\n",
+        tmp_ecode_file, exit_code_file, strerror(errno));
+    unlink(tmp_ecode_file);
+    free(tmp_ecode_file);
+    return -1;
+  }
+
+  free(tmp_ecode_file);
+  return 0;
+}
+
+/**
+ * Wait for the container process to exit and write the exit code to
+ * the exit code file.
+ * Returns the exit code of the container process.
+ */
+static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
+  int child_status = -1;
+  int exit_code = -1;
+  int waitpid_result;
+
+  if (change_effective_user(nm_uid, nm_gid) != 0) {
+    return -1;
+  }
+  do {
+    waitpid_result = waitpid(pid, &child_status, 0);
+  } while (waitpid_result == -1 && errno == EINTR);
+  if (waitpid_result < 0) {
+    fprintf(LOGFILE, "Error waiting for container process %d - %s\n",
+        pid, strerror(errno));
+    return -1;
+  }
+  if (WIFEXITED(child_status)) {
+    exit_code = WEXITSTATUS(child_status);
+  } else if (WIFSIGNALED(child_status)) {
+    exit_code = 0x80 + WTERMSIG(child_status);
+  } else {
+    fprintf(LOGFILE, "Unable to determine exit status for pid %d\n", pid);
+  }
+  if (write_exit_code_file(exit_code_file, exit_code) < 0) {
+    return -1;
+  }
+  return exit_code;
+}
+
+/**
  * Change the real and effective user and group to abandon the super user
  * priviledges.
  */
@@ -337,6 +417,10 @@ char *get_container_work_directory(const
                      nm_root, user, app_id, container_id);
 }
 
+char *get_exit_code_file(const char* pid_file) {
+  return concatenate("%s.exitcode", "exit_code_file", 1, pid_file);
+}
+
 char *get_container_launcher_file(const char* work_dir) {
   return concatenate("%s/%s", "container launcher", 2, work_dir, CONTAINER_SCRIPT);
 }
@@ -879,6 +963,8 @@ int launch_container_as_user(const char 
   int exit_code = -1;
   char *script_file_dest = NULL;
   char *cred_file_dest = NULL;
+  char *exit_code_file = NULL;
+
   script_file_dest = get_container_launcher_file(work_dir);
   if (script_file_dest == NULL) {
     exit_code = OUT_OF_MEMORY;
@@ -889,6 +975,11 @@ int launch_container_as_user(const char 
     exit_code = OUT_OF_MEMORY;
     goto cleanup;
   }
+  exit_code_file = get_exit_code_file(pid_file);
+  if (NULL == exit_code_file) {
+    exit_code = OUT_OF_MEMORY;
+    goto cleanup;
+  }
 
   // open launch script
   int container_file_source = open_file_as_nm(script_name);
@@ -902,6 +993,13 @@ int launch_container_as_user(const char 
     goto cleanup;
   }
 
+  pid_t child_pid = fork();
+  if (child_pid != 0) {
+    // parent
+    exit_code = wait_and_write_exit_code(child_pid, exit_code_file);
+    goto cleanup;
+  }
+
   // setsid 
   pid_t pid = setsid();
   if (pid == -1) {
@@ -986,6 +1084,7 @@ int launch_container_as_user(const char 
   exit_code = 0;
 
  cleanup:
+  free(exit_code_file);
   free(script_file_dest);
   free(cred_file_dest);
   return exit_code;

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Tue Aug 12 17:02:07 2014
@@ -201,6 +201,7 @@ public class TestNodeStatusUpdater {
       Dispatcher mockDispatcher = mock(Dispatcher.class);
       EventHandler mockEventHandler = mock(EventHandler.class);
       when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler);
+      NMStateStoreService stateStore = new NMNullStateStoreService();
       nodeStatus.setResponseId(heartBeatID++);
       Map<ApplicationId, List<ContainerStatus>> appToContainers =
           getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
@@ -226,9 +227,8 @@ public class TestNodeStatusUpdater {
                 firstContainerID, InetAddress.getByName("localhost")
                     .getCanonicalHostName(), 1234, user, resource,
                 currentTime + 10000, 123, "password".getBytes(), currentTime));
-        Container container =
-            new ContainerImpl(conf, mockDispatcher, launchContext, null,
-              mockMetrics, containerToken);
+        Container container = new ContainerImpl(conf, mockDispatcher,
+            stateStore, launchContext, null, mockMetrics, containerToken);
         this.context.getContainers().put(firstContainerID, container);
       } else if (heartBeatID == 2) {
         // Checks on the RM end
@@ -257,9 +257,8 @@ public class TestNodeStatusUpdater {
                 secondContainerID, InetAddress.getByName("localhost")
                     .getCanonicalHostName(), 1234, user, resource,
                 currentTime + 10000, 123, "password".getBytes(), currentTime));
-        Container container =
-            new ContainerImpl(conf, mockDispatcher, launchContext, null,
-              mockMetrics, containerToken);
+        Container container = new ContainerImpl(conf, mockDispatcher,
+            stateStore, launchContext, null, mockMetrics, containerToken);
         this.context.getContainers().put(secondContainerID, container);
       } else if (heartBeatID == 3) {
         // Checks on the RM end
@@ -784,7 +783,7 @@ public class TestNodeStatusUpdater {
     ContainerId cId = ContainerId.newInstance(appAttemptId, 0);               
                                                                               
                                                                               
-    nodeStatusUpdater.updateStoppedContainersInCache(cId);
+    nodeStatusUpdater.addCompletedContainer(cId);
     Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));     
                                                                               
     long time1 = System.currentTimeMillis();                                  

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Tue Aug 12 17:02:07 2014
@@ -233,7 +233,7 @@ public abstract class BaseContainerManag
   protected DeletionService createDeletionService() {
     return new DeletionService(exec) {
       @Override
-      public void delete(String user, Path subDir, Path[] baseDirs) {
+      public void delete(String user, Path subDir, Path... baseDirs) {
         // Don't do any deletions.
         LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir
             + ", baseDirs - " + baseDirs); 

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java Tue Aug 12 17:02:07 2014
@@ -191,7 +191,8 @@ public class TestAuxServices {
     ContainerTokenIdentifier cti = new ContainerTokenIdentifier(
         ContainerId.newInstance(attemptId, 1), "", "",
         Resource.newInstance(1, 1), 0,0,0, Priority.newInstance(0), 0);
-    Container container = new ContainerImpl(null, null, null, null, null, cti);
+    Container container = new ContainerImpl(null, null, null, null, null,
+        null, cti);
     ContainerId containerId = container.getContainerId();
     Resource resource = container.getResource();
     event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container);

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java Tue Aug 12 17:02:07 2014
@@ -80,6 +80,7 @@ public class TestContainerManagerRecover
   public void testApplicationRecovery() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
     conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
     conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
     NMStateStoreService stateStore = new NMMemoryStateStoreService();

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Tue Aug 12 17:02:07 2014
@@ -780,7 +780,8 @@ public class TestContainer {
       }
       when(ctxt.getServiceData()).thenReturn(serviceData);
 
-      c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier);
+      c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(),
+          ctxt, null, metrics, identifier);
       dispatcher.register(ContainerEventType.class,
           new EventHandler<ContainerEvent>() {
             @Override

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java Tue Aug 12 17:02:07 2014
@@ -22,13 +22,16 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
@@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.server.api
 public class NMMemoryStateStoreService extends NMStateStoreService {
   private Map<ApplicationId, ContainerManagerApplicationProto> apps;
   private Set<ApplicationId> finishedApps;
+  private Map<ContainerId, RecoveredContainerState> containerStates;
   private Map<TrackerKey, TrackerState> trackerStates;
   private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
   private RecoveredNMTokensState nmTokenState;
@@ -53,6 +57,7 @@ public class NMMemoryStateStoreService e
   protected void initStorage(Configuration conf) {
     apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
     finishedApps = new HashSet<ApplicationId>();
+    containerStates = new HashMap<ContainerId, RecoveredContainerState>();
     nmTokenState = new RecoveredNMTokensState();
     nmTokenState.applicationMasterKeys =
         new HashMap<ApplicationAttemptId, MasterKey>();
@@ -100,6 +105,77 @@ public class NMMemoryStateStoreService e
     finishedApps.remove(appId);
   }
 
+  @Override
+  public List<RecoveredContainerState> loadContainersState()
+      throws IOException {
+    // return a copy so caller can't modify our state
+    List<RecoveredContainerState> result =
+        new ArrayList<RecoveredContainerState>(containerStates.size());
+    for (RecoveredContainerState rcs : containerStates.values()) {
+      RecoveredContainerState rcsCopy = new RecoveredContainerState();
+      rcsCopy.status = rcs.status;
+      rcsCopy.exitCode = rcs.exitCode;
+      rcsCopy.killed = rcs.killed;
+      rcsCopy.diagnostics = rcs.diagnostics;
+      rcsCopy.startRequest = rcs.startRequest;
+      result.add(rcsCopy);
+    }
+    return new ArrayList<RecoveredContainerState>();
+  }
+
+  @Override
+  public void storeContainer(ContainerId containerId,
+      StartContainerRequest startRequest) throws IOException {
+    RecoveredContainerState rcs = new RecoveredContainerState();
+    rcs.startRequest = startRequest;
+    containerStates.put(containerId, rcs);
+  }
+
+  @Override
+  public void storeContainerDiagnostics(ContainerId containerId,
+      StringBuilder diagnostics) throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.diagnostics = diagnostics.toString();
+  }
+
+  @Override
+  public void storeContainerLaunched(ContainerId containerId)
+      throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    if (rcs.exitCode != ContainerExitStatus.INVALID) {
+      throw new IOException("Container already completed");
+    }
+    rcs.status = RecoveredContainerStatus.LAUNCHED;
+  }
+
+  @Override
+  public void storeContainerKilled(ContainerId containerId)
+      throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.killed = true;
+  }
+
+  @Override
+  public void storeContainerCompleted(ContainerId containerId, int exitCode)
+      throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.status = RecoveredContainerStatus.COMPLETED;
+    rcs.exitCode = exitCode;
+  }
+
+  @Override
+  public void removeContainer(ContainerId containerId) throws IOException {
+    containerStates.remove(containerId);
+  }
+
+  private RecoveredContainerState getRecoveredContainerState(
+      ContainerId containerId) throws IOException {
+    RecoveredContainerState rcs = containerStates.get(containerId);
+    if (rcs == null) {
+      throw new IOException("No start request for " + containerId);
+    }
+    return rcs;
+  }
 
   private LocalResourceTrackerState loadTrackerState(TrackerState ts) {
     LocalResourceTrackerState result = new LocalResourceTrackerState();

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java Tue Aug 12 17:02:07 2014
@@ -25,18 +25,30 @@ import static org.junit.Assert.assertTru
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -44,9 +56,12 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
@@ -193,6 +208,115 @@ public class TestNMLeveldbStateStoreServ
   }
 
   @Test
+  public void testContainerStorage() throws IOException {
+    // test empty when no state
+    List<RecoveredContainerState> recoveredContainers =
+        stateStore.loadContainersState();
+    assertTrue(recoveredContainers.isEmpty());
+
+    // create a container request
+    ApplicationId appId = ApplicationId.newInstance(1234, 3);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 4);
+    ContainerId containerId = ContainerId.newInstance(appAttemptId, 5);
+    LocalResource lrsrc = LocalResource.newInstance(
+        URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
+        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
+        1234567890L);
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    localResources.put("rsrc", lrsrc);
+    Map<String, String> env = new HashMap<String, String>();
+    env.put("somevar", "someval");
+    List<String> containerCmds = new ArrayList<String>();
+    containerCmds.add("somecmd");
+    containerCmds.add("somearg");
+    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+    serviceData.put("someservice",
+        ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
+    ByteBuffer containerTokens =
+        ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>();
+    acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
+    acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
+    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+        localResources, env, containerCmds, serviceData, containerTokens,
+        acls);
+    Resource containerRsrc = Resource.newInstance(1357, 3);
+    ContainerTokenIdentifier containerTokenId =
+        new ContainerTokenIdentifier(containerId, "host", "user",
+            containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7),
+            13579);
+    Token containerToken = Token.newInstance(containerTokenId.getBytes(),
+        ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
+        "tokenservice");
+    StartContainerRequest containerReq =
+        StartContainerRequest.newInstance(clc, containerToken);
+
+    // store a container and verify recovered
+    stateStore.storeContainer(containerId, containerReq);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    RecoveredContainerState rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertEquals(false, rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertTrue(rcs.getDiagnostics().isEmpty());
+
+    // launch the container, add some diagnostics, and verify recovered
+    StringBuilder diags = new StringBuilder();
+    stateStore.storeContainerLaunched(containerId);
+    diags.append("some diags for container");
+    stateStore.storeContainerDiagnostics(containerId, diags);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertEquals(false, rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertEquals(diags.toString(), rcs.getDiagnostics());
+
+    // mark the container killed, add some more diags, and verify recovered
+    diags.append("some more diags for container");
+    stateStore.storeContainerDiagnostics(containerId, diags);
+    stateStore.storeContainerKilled(containerId);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertTrue(rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertEquals(diags.toString(), rcs.getDiagnostics());
+
+    // add yet more diags, mark container completed, and verify recovered
+    diags.append("some final diags");
+    stateStore.storeContainerDiagnostics(containerId, diags);
+    stateStore.storeContainerCompleted(containerId, 21);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
+    assertEquals(21, rcs.getExitCode());
+    assertTrue(rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertEquals(diags.toString(), rcs.getDiagnostics());
+
+    // remove the container and verify not recovered
+    stateStore.removeContainer(containerId);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertTrue(recoveredContainers.isEmpty());
+  }
+
+  @Test
   public void testStartResourceLocalization() throws IOException {
     String user = "somebody";
     ApplicationId appId = ApplicationId.newInstance(1, 1);

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Tue Aug 12 17:02:07 2014
@@ -209,7 +209,7 @@ public class TestNMWebServer {
             BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123,
             "password".getBytes(), currentTime);
       Container container =
-          new ContainerImpl(conf, dispatcher, launchContext,
+          new ContainerImpl(conf, dispatcher, stateStore, launchContext,
             null, metrics,
             BuilderUtils.newContainerTokenIdentifier(containerToken)) {
 

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Tue Aug 12 17:02:07 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
@@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
@@ -89,6 +91,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -189,7 +192,7 @@ public class ApplicationMasterService ex
     return result;
   }
 
-  private ApplicationAttemptId authorizeRequest()
+  private AMRMTokenIdentifier authorizeRequest()
       throws YarnException {
 
     UserGroupInformation remoteUgi;
@@ -226,7 +229,7 @@ public class ApplicationMasterService ex
       throw RPCUtil.getRemoteException(message);
     }
 
-    return appTokenIdentifier.getApplicationAttemptId();
+    return appTokenIdentifier;
   }
 
   @Override
@@ -234,7 +237,9 @@ public class ApplicationMasterService ex
       RegisterApplicationMasterRequest request) throws YarnException,
       IOException {
 
-    ApplicationAttemptId applicationAttemptId = authorizeRequest();
+    AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+    ApplicationAttemptId applicationAttemptId =
+        amrmTokenIdentifier.getApplicationAttemptId();
 
     ApplicationId appID = applicationAttemptId.getApplicationId();
     AllocateResponseLock lock = responseMap.get(applicationAttemptId);
@@ -333,7 +338,8 @@ public class ApplicationMasterService ex
       FinishApplicationMasterRequest request) throws YarnException,
       IOException {
 
-    ApplicationAttemptId applicationAttemptId = authorizeRequest();
+    ApplicationAttemptId applicationAttemptId =
+        authorizeRequest().getApplicationAttemptId();
 
     AllocateResponseLock lock = responseMap.get(applicationAttemptId);
     if (lock == null) {
@@ -408,7 +414,10 @@ public class ApplicationMasterService ex
   public AllocateResponse allocate(AllocateRequest request)
       throws YarnException, IOException {
 
-    ApplicationAttemptId appAttemptId = authorizeRequest();
+    AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+
+    ApplicationAttemptId appAttemptId =
+        amrmTokenIdentifier.getApplicationAttemptId();
 
     this.amLivelinessMonitor.receivedPing(appAttemptId);
 
@@ -557,6 +566,23 @@ public class ApplicationMasterService ex
       allocateResponse
           .setPreemptionMessage(generatePreemptionMessage(allocation));
 
+      // update AMRMToken if the token is rolled-up
+      MasterKeyData nextMasterKey =
+          this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();
+
+      if (nextMasterKey != null
+          && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
+            .getKeyId()) {
+        Token<AMRMTokenIdentifier> amrmToken =
+            rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+              appAttemptId);
+        ((RMAppAttemptImpl)appAttempt).setAMRMToken(amrmToken);
+        allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
+          .newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
+            .toString(), amrmToken.getPassword(), amrmToken.getService()
+            .toString()));
+      }
+
       /*
        * As we are updating the response inside the lock object so we don't
        * need to worry about unregister call occurring in between (which

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Tue Aug 12 17:02:07 2014
@@ -461,7 +461,6 @@ public class ResourceManager extends Com
       rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
 
       clientRM = createClientRMService();
-      rmContext.setClientRMService(clientRM);
       addService(clientRM);
       rmContext.setClientRMService(clientRM);
 

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Tue Aug 12 17:02:07 2014
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
@@ -226,7 +227,7 @@ public class AMLauncher implements Runna
     }
 
     // Add AMRMToken
-    Token<AMRMTokenIdentifier> amrmToken = getAMRMToken();
+    Token<AMRMTokenIdentifier> amrmToken = createAndSetAMRMToken();
     if (amrmToken != null) {
       credentials.addToken(amrmToken.getService(), amrmToken);
     }
@@ -236,8 +237,12 @@ public class AMLauncher implements Runna
   }
 
   @VisibleForTesting
-  protected Token<AMRMTokenIdentifier> getAMRMToken() {
-    return application.getAMRMToken();
+  protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
+    Token<AMRMTokenIdentifier> amrmToken =
+        this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+          application.getAppAttemptId());
+    ((RMAppAttemptImpl)application).setAMRMToken(amrmToken);
+    return amrmToken;
   }
   
   @SuppressWarnings("unchecked")

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Tue Aug 12 17:02:07 2014
@@ -71,6 +71,10 @@ import com.google.common.annotations.Vis
  * FileSystem interface. Does not use directories so that simple key-value
  * stores can be used. The retry policy for the real filesystem client must be
  * configured separately to enable retry of filesystem operations when needed.
+ *
+ * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
+ * separately. The currentMasterkey and nextMasterkey have been stored.
+ * Also, AMRMToken has been removed from ApplicationAttemptState.
  */
 public class FileSystemRMStateStore extends RMStateStore {
 
@@ -78,7 +82,7 @@ public class FileSystemRMStateStore exte
 
   protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
   protected static final Version CURRENT_VERSION_INFO = Version
-    .newInstance(1, 1);
+    .newInstance(1, 2);
   protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
       "AMRMTokenSecretManagerNode";
 

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Tue Aug 12 17:02:07 2014
@@ -32,7 +32,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -45,7 +44,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
@@ -54,13 +52,13 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
@@ -134,7 +132,8 @@ public abstract class RMStateStore exten
       LOG.info("Storing info for app: " + appId);
       try {
         store.storeApplicationStateInternal(appId, appStateData);
-        store.notifyDoneStoringApplication(appId, null);
+        store.notifyApplication(new RMAppEvent(appId,
+               RMAppEventType.APP_NEW_SAVED));
       } catch (Exception e) {
         LOG.error("Error storing app: " + appId, e);
         store.notifyStoreOperationFailed(e);
@@ -158,7 +157,8 @@ public abstract class RMStateStore exten
       LOG.info("Updating info for app: " + appId);
       try {
         store.updateApplicationStateInternal(appId, appStateData);
-        store.notifyDoneUpdatingApplication(appId, null);
+        store.notifyApplication(new RMAppEvent(appId,
+               RMAppEventType.APP_UPDATE_SAVED));
       } catch (Exception e) {
         LOG.error("Error updating app: " + appId, e);
         store.notifyStoreOperationFailed(e);
@@ -207,8 +207,9 @@ public abstract class RMStateStore exten
         }
         store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
             attemptStateData);
-        store.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
-            null);
+        store.notifyApplicationAttempt(new RMAppAttemptEvent
+               (attemptState.getAttemptId(),
+               RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
       } catch (Exception e) {
         LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
         store.notifyStoreOperationFailed(e);
@@ -235,8 +236,9 @@ public abstract class RMStateStore exten
         }
         store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
             attemptStateData);
-        store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
-            null);
+        store.notifyApplicationAttempt(new RMAppAttemptEvent
+               (attemptState.getAttemptId(),
+               RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
       } catch (Exception e) {
         LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
         store.notifyStoreOperationFailed(e);
@@ -769,10 +771,7 @@ public abstract class RMStateStore exten
   
   public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
     Credentials credentials = new Credentials();
-    Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken();
-    if(appToken != null){
-      credentials.addToken(AM_RM_TOKEN_SERVICE, appToken);
-    }
+
     SecretKey clientTokenMasterKey =
         appAttempt.getClientTokenMasterKey();
     if(clientTokenMasterKey != null){
@@ -806,47 +805,28 @@ public abstract class RMStateStore exten
     }
     rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause));
   }
-
+ 
   @SuppressWarnings("unchecked")
   /**
-   * In (@link handleStoreEvent}, this method is called to notify the
-   * application that new application is stored in state store
-   * @param appId id of the application that has been saved
-   * @param storedException the exception that is thrown when storing the
-   * application
-   */
-  private void notifyDoneStoringApplication(ApplicationId appId,
-                                                  Exception storedException) {
-    rmDispatcher.getEventHandler().handle(
-        new RMAppNewSavedEvent(appId, storedException));
-  }
-
-  @SuppressWarnings("unchecked")
-  private void notifyDoneUpdatingApplication(ApplicationId appId,
-      Exception storedException) {
-    rmDispatcher.getEventHandler().handle(
-      new RMAppUpdateSavedEvent(appId, storedException));
+   * This method is called to notify the application that
+   * new application is stored or updated in state store
+   * @param event App event containing the app id and event type
+   */
+  private void notifyApplication(RMAppEvent event) {
+    rmDispatcher.getEventHandler().handle(event);
   }
-
+  
   @SuppressWarnings("unchecked")
   /**
-   * In (@link handleStoreEvent}, this method is called to notify the
-   * application attempt that new attempt is stored in state store
-   * @param appAttempt attempt that has been saved
-   */
-  private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
-                                                  Exception storedException) {
-    rmDispatcher.getEventHandler().handle(
-        new RMAppAttemptNewSavedEvent(attemptId, storedException));
-  }
-
-  @SuppressWarnings("unchecked")
-  private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId,
-      Exception updatedException) {
-    rmDispatcher.getEventHandler().handle(
-      new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
+   * This method is called to notify the application attempt
+   * that new attempt is stored or updated in state store
+   * @param event App attempt event containing the app attempt
+   * id and event type
+   */
+  private void notifyApplicationAttempt(RMAppAttemptEvent event) {
+    rmDispatcher.getEventHandler().handle(event);
   }
-
+  
   /**
    * EventHandler implementation which forward events to the FSRMStateStore
    * This hides the EventHandle methods of the store from its public interface 

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Tue Aug 12 17:02:07 2014
@@ -78,6 +78,11 @@ import org.apache.zookeeper.server.auth.
 
 import com.google.common.annotations.VisibleForTesting;
 
+/**
+ * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
+ * separately. The currentMasterkey and nextMasterkey have been stored.
+ * Also, AMRMToken has been removed from ApplicationAttemptState.
+ */
 @Private
 @Unstable
 public class ZKRMStateStore extends RMStateStore {
@@ -87,7 +92,7 @@ public class ZKRMStateStore extends RMSt
 
   protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
   protected static final Version CURRENT_VERSION_INFO = Version
-      .newInstance(1, 1);
+      .newInstance(1, 2);
   private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
       "RMDelegationTokensRoot";
   private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Tue Aug 12 17:02:07 2014
@@ -820,17 +820,6 @@ public class RMAppImpl implements RMApp,
       RMAppTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      if (event instanceof RMAppNewSavedEvent) {
-        RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
-        // For HA this exception needs to be handled by giving up
-        // master status if we got fenced
-        if (((RMAppNewSavedEvent) event).getStoredException() != null) {
-          LOG.error(
-            "Failed to store application: " + storeEvent.getApplicationId(),
-            storeEvent.getStoredException());
-          ExitUtil.terminate(1, storeEvent.getStoredException());
-        }
-      }
       app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
         app.submissionContext.getQueue(), app.user));
     }
@@ -848,13 +837,6 @@ public class RMAppImpl implements RMApp,
 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
-      RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
-      if (storeEvent.getUpdatedException() != null) {
-        LOG.error("Failed to update the final state of application"
-              + storeEvent.getApplicationId(), storeEvent.getUpdatedException());
-        ExitUtil.terminate(1, storeEvent.getUpdatedException());
-      }
-
       if (app.transitionTodo instanceof SingleArcTransition) {
         ((SingleArcTransition) app.transitionTodo).transition(app,
           app.eventCausingFinalSaving);

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Tue Aug 12 17:02:07 2014
@@ -37,6 +37,7 @@ import org.apache.commons.lang.StringUti
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -79,11 +80,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -397,7 +396,6 @@ public class RMAppAttemptImpl implements
           RMAppAttemptState.KILLED,
           RMAppAttemptState.KILLED,
           EnumSet.of(RMAppAttemptEventType.ATTEMPT_ADDED,
-              RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.LAUNCHED,
               RMAppAttemptEventType.LAUNCH_FAILED,
               RMAppAttemptEventType.EXPIRE,
@@ -559,7 +557,22 @@ public class RMAppAttemptImpl implements
 
   @Override
   public Token<AMRMTokenIdentifier> getAMRMToken() {
-    return this.amrmToken;
+    this.readLock.lock();
+    try {
+      return this.amrmToken;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Private
+  public void setAMRMToken(Token<AMRMTokenIdentifier> lastToken) {
+    this.writeLock.lock();
+    try {
+      this.amrmToken = lastToken;
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
   @Override
@@ -713,7 +726,8 @@ public class RMAppAttemptImpl implements
       this.attemptMetrics.setIsPreempted();
     }
     setMasterContainer(attemptState.getMasterContainer());
-    recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
+    recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(),
+      attemptState.getState());
     this.recoveredFinalState = attemptState.getState();
     this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
     this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
@@ -725,9 +739,11 @@ public class RMAppAttemptImpl implements
     this.justFinishedContainers = attempt.getJustFinishedContainers();
   }
 
-  private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
-      throws IOException {
-    if (appAttemptTokens == null) {
+  private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
+      RMAppAttemptState state) throws IOException {
+    if (appAttemptTokens == null || state == RMAppAttemptState.FAILED
+        || state == RMAppAttemptState.FINISHED
+        || state == RMAppAttemptState.KILLED) {
       return;
     }
 
@@ -738,12 +754,9 @@ public class RMAppAttemptImpl implements
           .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
     }
 
-    // Only one AMRMToken is stored per-attempt, so this should be fine. Can't
-    // use TokenSelector as service may change - think fail-over.
     this.amrmToken =
-        (Token<AMRMTokenIdentifier>) appAttemptTokens
-          .getToken(RMStateStore.AM_RM_TOKEN_SERVICE);
-    rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken);
+        rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+          applicationAttemptId);
   }
 
   private static class BaseTransition implements
@@ -779,11 +792,6 @@ public class RMAppAttemptImpl implements
               .createMasterKey(appAttempt.applicationAttemptId);
       }
 
-      // create AMRMToken
-      appAttempt.amrmToken =
-          appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
-            appAttempt.applicationAttemptId);
-
       // Add the applicationAttempt to the scheduler and inform the scheduler
       // whether to transfer the state from previous attempt.
       appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
@@ -895,7 +903,6 @@ public class RMAppAttemptImpl implements
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
                                                     RMAppAttemptEvent event) {
-      appAttempt.checkAttemptStoreError(event);
       appAttempt.launchAttempt();
     }
   }
@@ -1047,14 +1054,6 @@ public class RMAppAttemptImpl implements
     @Override
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-      RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event;
-      if (storeEvent.getUpdatedException() != null) {
-        LOG.error("Failed to update the final state of application attempt: "
-            + storeEvent.getApplicationAttemptId(),
-          storeEvent.getUpdatedException());
-        ExitUtil.terminate(1, storeEvent.getUpdatedException());
-      }
-
       RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving;
 
       if (appAttempt.transitionTodo instanceof SingleArcTransition) {
@@ -1184,12 +1183,11 @@ public class RMAppAttemptImpl implements
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
                             RMAppAttemptEvent event) {
-      appAttempt.checkAttemptStoreError(event);
-      // TODO Today unmanaged AM client is waiting for app state to be Accepted to
-      // launch the AM. This is broken since we changed to start the attempt
-      // after the application is Accepted. We may need to introduce an attempt
-      // report that client can rely on to query the attempt state and choose to
-      // launch the unmanaged AM.
+      // create AMRMToken
+      appAttempt.amrmToken =
+          appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+            appAttempt.applicationAttemptId);
+
       super.transition(appAttempt, event);
     }    
   }
@@ -1677,18 +1675,6 @@ public class RMAppAttemptImpl implements
     rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
   }
   
-  private void checkAttemptStoreError(RMAppAttemptEvent event) {
-    RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event;
-    if(storeEvent.getStoredException() != null)
-    {
-      // This needs to be handled for HA and give up master status if we got
-      // fenced
-      LOG.error("Failed to store attempt: " + getAppAttemptId(),
-                storeEvent.getStoredException());
-      ExitUtil.terminate(1, storeEvent.getStoredException());
-    }
-  }
-
   private void storeAttempt() {
     // store attempt data in a non-blocking manner to prevent dispatcher
     // thread starvation and wait for state to be saved

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1617532&r1=1617531&r2=1617532&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Tue Aug 12 17:02:07 2014
@@ -93,9 +93,9 @@ public class RMNodeImpl implements RMNod
   private final RMContext context;
   private final String hostName;
   private final int commandPort;
-  private final int httpPort;
+  private int httpPort;
   private final String nodeAddress; // The containerManager address
-  private final String httpAddress;
+  private String httpAddress;
   private volatile ResourceOption resourceOption;
   private final Node node;
 
@@ -521,37 +521,15 @@ public class RMNodeImpl implements RMNod
 
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
-      // Kill containers since node is rejoining.
-      rmNode.nodeUpdateQueue.clear();
-      rmNode.context.getDispatcher().getEventHandler().handle(
-          new NodeRemovedSchedulerEvent(rmNode));
-
       RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
       RMNode newNode = reconnectEvent.getReconnectedNode();
       rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
-      if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
-          && rmNode.getHttpPort() == newNode.getHttpPort()) {
-        // Reset heartbeat ID since node just restarted.
-        rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
-        if (rmNode.getState() != NodeState.UNHEALTHY) {
-          // Only add new node if old state is not UNHEALTHY
-          rmNode.context.getDispatcher().getEventHandler().handle(
-              new NodeAddedSchedulerEvent(rmNode));
-        }
-      } else {
-        // Reconnected node differs, so replace old node and start new node
-        switch (rmNode.getState()) {
-        case RUNNING:
-          ClusterMetrics.getMetrics().decrNumActiveNodes();
-          break;
-        case UNHEALTHY:
-          ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
-          break;
-        }
-        rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
-        rmNode.context.getDispatcher().getEventHandler().handle(
-            new RMNodeStartedEvent(newNode.getNodeID(), null, null));
-      }
+      rmNode.httpPort = newNode.getHttpPort();
+      rmNode.httpAddress = newNode.getHttpAddress();
+      rmNode.resourceOption = newNode.getResourceOption();
+
+      // Reset heartbeat ID since node just restarted.
+      rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
 
       if (null != reconnectEvent.getRunningApplications()) {
         for (ApplicationId appId : reconnectEvent.getRunningApplications()) {



Mime
View raw message