hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject svn commit: r1612403 [1/3] - in /hadoop/common/branches/fs-encryption/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/...
Date Mon, 21 Jul 2014 21:45:00 GMT
Author: wang
Date: Mon Jul 21 21:44:50 2014
New Revision: 1612403

URL: http://svn.apache.org/r1612403
Log:
Merge from trunk to branch

Added:
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/records/
      - copied from r1612402, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/records/
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/security/
      - copied from r1612402, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/security/
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/DelegationToken.java
      - copied unchanged from r1612402, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/DelegationToken.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesDelegationTokens.java
      - copied unchanged from r1612402, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesDelegationTokens.java
Modified:
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/bin/yarn-daemon.sh
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
    hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ResourceManagerRest.apt.vm

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/CHANGES.txt?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/CHANGES.txt Mon Jul 21 21:44:50 2014
@@ -46,6 +46,16 @@ Release 2.6.0 - UNRELEASED
     YARN-2228. Augmented TimelineServer to load pseudo authentication filter when
     authentication = simple. (Zhijie Shen via vinodkv)
 
+    YARN-1341. Recover NMTokens upon nodemanager restart. (Jason Lowe via 
+    junping_du)
+
+    YARN-2208. AMRMTokenManager need to have a way to roll over AMRMToken. (xgong)
+
+    YARN-2323. FairShareComparator creates too many Resource objects (Hong Zhiguo
+    via Sandy Ryza)
+
+    YARN-2045. Data persisted in NM should be versioned (Junping Du via jlowe)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -59,6 +69,16 @@ Release 2.6.0 - UNRELEASED
     YARN-2260. Fixed ResourceManager's RMNode to correctly remember containers
     when nodes resync during work-preserving RM restart. (Jian He via vinodkv)
 
+    YARN-2264. Fixed a race condition in DrainDispatcher which may cause random
+    test failures. (Li Lu via jianhe)
+
+    YARN-2219. Changed ResourceManager to avoid AMs and NMs getting exceptions
+    after RM recovery but before scheduler learns about apps and app-attempts.
+    (Jian He via vinodkv)
+
+    YARN-2244. FairScheduler missing handling of containers for unknown 
+    application attempts. (Anubhav Dhoot via kasha)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -95,6 +115,9 @@ Release 2.5.0 - UNRELEASED
     YARN-1713. Added get-new-app and submit-app functionality to RM web services.
     (Varun Vasudev via vinodkv)
 
+    YARN-2233. Implemented ResourceManager web-services to create, renew and
+    cancel delegation tokens. (Varun Vasudev via vinodkv)
+
   IMPROVEMENTS
 
     YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
@@ -259,6 +282,9 @@ Release 2.5.0 - UNRELEASED
     YARN-2241. ZKRMStateStore: On startup, show nicer messages if znodes already 
     exist. (Robert Kanter via kasha)
 
+	YARN-1408 Preemption caused Invalid State Event: ACQUIRED at KILLED and 
+	caused a task timeout for 30mins. (Sunil G via mayank)
+
   OPTIMIZATIONS
 
   BUG FIXES 

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/bin/yarn-daemon.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/bin/yarn-daemon.sh?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/bin/yarn-daemon.sh (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/bin/yarn-daemon.sh Mon Jul 21 21:44:50 2014
@@ -145,6 +145,7 @@ case $startStop in
       else
         echo no $command to stop
       fi
+      rm -f $pid
     else
       echo no $command to stop
     fi

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java Mon Jul 21 21:44:50 2014
@@ -44,6 +44,7 @@ public class AMRMTokenIdentifier extends
   public static final Text KIND_NAME = new Text("YARN_AM_RM_TOKEN");
 
   private ApplicationAttemptId applicationAttemptId;
+  private int keyId = Integer.MIN_VALUE;
 
   public AMRMTokenIdentifier() {
   }
@@ -53,6 +54,13 @@ public class AMRMTokenIdentifier extends
     this.applicationAttemptId = appAttemptId;
   }
 
+  public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId,
+      int masterKeyId) {
+    this();
+    this.applicationAttemptId = appAttemptId;
+    this.keyId = masterKeyId;
+  }
+
   @Private
   public ApplicationAttemptId getApplicationAttemptId() {
     return this.applicationAttemptId;
@@ -64,6 +72,7 @@ public class AMRMTokenIdentifier extends
     out.writeLong(appId.getClusterTimestamp());
     out.writeInt(appId.getId());
     out.writeInt(this.applicationAttemptId.getAttemptId());
+    out.writeInt(this.keyId);
   }
 
   @Override
@@ -75,6 +84,7 @@ public class AMRMTokenIdentifier extends
         ApplicationId.newInstance(clusterTimeStamp, appId);
     this.applicationAttemptId =
         ApplicationAttemptId.newInstance(applicationId, attemptId);
+    this.keyId = in.readInt();
   }
 
   @Override
@@ -92,6 +102,10 @@ public class AMRMTokenIdentifier extends
         .toString());
   }
 
+  public int getKeyId() {
+    return this.keyId;
+  }
+
   // TODO: Needed?
   @InterfaceAudience.Private
   public static class Renewer extends Token.TrivialRenewer {

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java Mon Jul 21 21:44:50 2014
@@ -28,6 +28,7 @@ public class DrainDispatcher extends Asy
 // and similar grotesqueries
   private volatile boolean drained = false;
   private final BlockingQueue<Event> queue;
+  final Object mutex;
 
   public DrainDispatcher() {
     this(new LinkedBlockingQueue<Event>());
@@ -36,6 +37,7 @@ public class DrainDispatcher extends Asy
   private DrainDispatcher(BlockingQueue<Event> eventQueue) {
     super(eventQueue);
     this.queue = eventQueue;
+    this.mutex = this;
   }
 
   /**
@@ -53,8 +55,10 @@ public class DrainDispatcher extends Asy
       @Override
       public void run() {
         while (!Thread.currentThread().isInterrupted()) {
-          // !drained if dispatch queued new events on this dispatcher
-          drained = queue.isEmpty();
+          synchronized (mutex) {
+            // !drained if dispatch queued new events on this dispatcher
+            drained = queue.isEmpty();
+          }
           Event event;
           try {
             event = queue.take();
@@ -75,8 +79,10 @@ public class DrainDispatcher extends Asy
     return new EventHandler() {
       @Override
       public void handle(Event event) {
-        drained = false;
-        actual.handle(event);
+        synchronized (mutex) {
+          actual.handle(event);
+          drained = false;
+        }
       }
     };
   }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java Mon Jul 21 21:44:50 2014
@@ -42,7 +42,7 @@ public class BaseNMTokenSecretManager ex
   private static Log LOG = LogFactory
       .getLog(BaseNMTokenSecretManager.class);
 
-  private int serialNo = new SecureRandom().nextInt();
+  protected int serialNo = new SecureRandom().nextInt();
 
   protected final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
   protected final Lock readLock = readWriteLock.readLock();

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Mon Jul 21 21:44:50 2014
@@ -169,6 +169,15 @@ public class NodeManager extends Composi
     }
   }
 
+  private void recoverTokens(NMTokenSecretManagerInNM nmTokenSecretManager,
+      NMContainerTokenSecretManager containerTokenSecretManager)
+          throws IOException {
+    if (nmStore.canRecover()) {
+      nmTokenSecretManager.recover(nmStore.loadNMTokenState());
+      // TODO: recover containerTokenSecretManager
+    }
+  }
+
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
 
@@ -184,7 +193,9 @@ public class NodeManager extends Composi
         new NMContainerTokenSecretManager(conf);
 
     NMTokenSecretManagerInNM nmTokenSecretManager =
-        new NMTokenSecretManagerInNM();
+        new NMTokenSecretManagerInNM(nmStore);
+
+    recoverTokens(nmTokenSecretManager, containerTokenSecretManager);
     
     this.aclsManager = new ApplicationACLsManager(conf);
 

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java Mon Jul 21 21:44:50 2014
@@ -35,11 +35,18 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl;
 import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.fusesource.leveldbjni.JniDBFactory;
@@ -50,14 +57,18 @@ import org.iq80.leveldb.Logger;
 import org.iq80.leveldb.Options;
 import org.iq80.leveldb.WriteBatch;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class NMLeveldbStateStoreService extends NMStateStoreService {
 
   public static final Log LOG =
       LogFactory.getLog(NMLeveldbStateStoreService.class);
 
   private static final String DB_NAME = "yarn-nm-state";
-  private static final String DB_SCHEMA_VERSION_KEY = "schema-version";
-  private static final String DB_SCHEMA_VERSION = "1.0";
+  private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
+  
+  private static final NMDBSchemaVersion CURRENT_VERSION_INFO = NMDBSchemaVersion
+      .newInstance(1, 0);
 
   private static final String DELETION_TASK_KEY_PREFIX =
       "DeletionService/deltask_";
@@ -72,6 +83,14 @@ public class NMLeveldbStateStoreService 
   private static final String LOCALIZATION_FILECACHE_SUFFIX = "filecache/";
   private static final String LOCALIZATION_APPCACHE_SUFFIX = "appcache/";
 
+  private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey";
+  private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey";
+  private static final String NM_TOKENS_KEY_PREFIX = "NMTokens/";
+  private static final String NM_TOKENS_CURRENT_MASTER_KEY =
+      NM_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX;
+  private static final String NM_TOKENS_PREV_MASTER_KEY =
+      NM_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
+
   private DB db;
 
   public NMLeveldbStateStoreService() {
@@ -368,6 +387,93 @@ public class NMLeveldbStateStoreService 
 
 
   @Override
+  public RecoveredNMTokenState loadNMTokenState() throws IOException {
+    RecoveredNMTokenState state = new RecoveredNMTokenState();
+    state.applicationMasterKeys =
+        new HashMap<ApplicationAttemptId, MasterKey>();
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(NM_TOKENS_KEY_PREFIX));
+      while (iter.hasNext()) {
+        Entry<byte[], byte[]> entry = iter.next();
+        String fullKey = asString(entry.getKey());
+        if (!fullKey.startsWith(NM_TOKENS_KEY_PREFIX)) {
+          break;
+        }
+        String key = fullKey.substring(NM_TOKENS_KEY_PREFIX.length());
+        if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) {
+          state.currentMasterKey = parseMasterKey(entry.getValue());
+        } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) {
+          state.previousMasterKey = parseMasterKey(entry.getValue());
+        } else if (key.startsWith(
+            ApplicationAttemptId.appAttemptIdStrPrefix)) {
+          ApplicationAttemptId attempt;
+          try {
+            attempt = ConverterUtils.toApplicationAttemptId(key);
+          } catch (IllegalArgumentException e) {
+            throw new IOException("Bad application master key state for "
+                + fullKey, e);
+          }
+          state.applicationMasterKeys.put(attempt,
+              parseMasterKey(entry.getValue()));
+        }
+      }
+    } catch (DBException e) {
+      throw new IOException(e.getMessage(), e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+    return state;
+  }
+
+  @Override
+  public void storeNMTokenCurrentMasterKey(MasterKey key)
+      throws IOException {
+    storeMasterKey(NM_TOKENS_CURRENT_MASTER_KEY, key);
+  }
+
+  @Override
+  public void storeNMTokenPreviousMasterKey(MasterKey key)
+      throws IOException {
+    storeMasterKey(NM_TOKENS_PREV_MASTER_KEY, key);
+  }
+
+  @Override
+  public void storeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt, MasterKey key) throws IOException {
+    storeMasterKey(NM_TOKENS_KEY_PREFIX + attempt, key);
+  }
+
+  @Override
+  public void removeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt) throws IOException {
+    String key = NM_TOKENS_KEY_PREFIX + attempt;
+    try {
+      db.delete(bytes(key));
+    } catch (DBException e) {
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+  private MasterKey parseMasterKey(byte[] keyData) throws IOException {
+    return new MasterKeyPBImpl(MasterKeyProto.parseFrom(keyData));
+  }
+
+  private void storeMasterKey(String dbKey, MasterKey key)
+      throws IOException {
+    MasterKeyPBImpl pb = (MasterKeyPBImpl) key;
+    try {
+      db.put(bytes(dbKey), pb.getProto().toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+
+  @Override
   protected void initStorage(Configuration conf)
       throws IOException {
     Path storeRoot = createStorageDir(conf);
@@ -376,22 +482,16 @@ public class NMLeveldbStateStoreService 
     options.logger(new LeveldbLogger());
     LOG.info("Using state database at " + storeRoot + " for recovery");
     File dbfile = new File(storeRoot.toString());
-    byte[] schemaVersionData = null;
     try {
       db = JniDBFactory.factory.open(dbfile, options);
-      try {
-        schemaVersionData = db.get(bytes(DB_SCHEMA_VERSION_KEY));
-      } catch (DBException e) {
-        throw new IOException(e.getMessage(), e);
-      }
     } catch (NativeDB.DBException e) {
       if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
         LOG.info("Creating state database at " + dbfile);
         options.createIfMissing(true);
         try {
           db = JniDBFactory.factory.open(dbfile, options);
-          schemaVersionData = bytes(DB_SCHEMA_VERSION);
-          db.put(bytes(DB_SCHEMA_VERSION_KEY), schemaVersionData);
+          // store version
+          storeVersion();
         } catch (DBException dbErr) {
           throw new IOException(dbErr.getMessage(), dbErr);
         }
@@ -399,16 +499,7 @@ public class NMLeveldbStateStoreService 
         throw e;
       }
     }
-    if (schemaVersionData != null) {
-      String schemaVersion = asString(schemaVersionData);
-      // only support exact schema matches for now
-      if (!DB_SCHEMA_VERSION.equals(schemaVersion)) {
-        throw new IOException("Incompatible state database schema, found "
-            + schemaVersion + " expected " + DB_SCHEMA_VERSION);
-      }
-    } else {
-      throw new IOException("State database schema version not found");
-    }
+    checkVersion();
   }
 
   private Path createStorageDir(Configuration conf) throws IOException {
@@ -433,4 +524,68 @@ public class NMLeveldbStateStoreService 
       LOG.info(message);
     }
   }
+
+
+  NMDBSchemaVersion loadVersion() throws IOException {
+    byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY));
+    // if version is not stored previously, treat it as 1.0.
+    if (data == null || data.length == 0) {
+      return NMDBSchemaVersion.newInstance(1, 0);
+    }
+    NMDBSchemaVersion version =
+        new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data));
+    return version;
+  }
+
+  private void storeVersion() throws IOException {
+    dbStoreVersion(CURRENT_VERSION_INFO);
+  }
+  
+  // Only used for test
+  @VisibleForTesting
+  void storeVersion(NMDBSchemaVersion state) throws IOException {
+    dbStoreVersion(state);
+  }
+  
+  private void dbStoreVersion(NMDBSchemaVersion state) throws IOException {
+    String key = DB_SCHEMA_VERSION_KEY;
+    byte[] data = 
+        ((NMDBSchemaVersionPBImpl) state).getProto().toByteArray();
+    try {
+      db.put(bytes(key), data);
+    } catch (DBException e) {
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+  NMDBSchemaVersion getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+  
+  /**
+   * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
+   * 2) Any incompatible change of state-store is a major upgrade, and any
+   *    compatible change of state-store is a minor upgrade.
+   * 3) Within a minor upgrade, say 1.1 to 1.2:
+   *    overwrite the version info and proceed as normal.
+   * 4) Within a major upgrade, say 1.2 to 2.0:
+   *    throw exception and indicate user to use a separate upgrade tool to
+   *    upgrade NM state or remove incompatible old state.
+   */
+  private void checkVersion() throws IOException {
+    NMDBSchemaVersion loadedVersion = loadVersion();
+    LOG.info("Loaded NM state version info " + loadedVersion);
+    if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
+      return;
+    }
+    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+      LOG.info("Storing NM state version info " + getCurrentVersion());
+      storeVersion();
+    } else {
+      throw new IOException(
+        "Incompatible version for NM state: expecting NM state version " 
+            + getCurrentVersion() + ", but loading version " + loadedVersion);
+    }
+  }
+  
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java Mon Jul 21 21:44:50 2014
@@ -22,10 +22,12 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
 
 // The state store to use when state isn't being stored
 public class NMNullStateStoreService extends NMStateStoreService {
@@ -78,6 +80,32 @@ public class NMNullStateStoreService ext
   }
 
   @Override
+  public RecoveredNMTokenState loadNMTokenState() throws IOException {
+    throw new UnsupportedOperationException(
+        "Recovery not supported by this state store");
+  }
+
+  @Override
+  public void storeNMTokenCurrentMasterKey(MasterKey key)
+      throws IOException {
+  }
+
+  @Override
+  public void storeNMTokenPreviousMasterKey(MasterKey key)
+      throws IOException {
+  }
+
+  @Override
+  public void storeNMTokenApplicationMasterKey(ApplicationAttemptId attempt,
+      MasterKey key) throws IOException {
+  }
+
+  @Override
+  public void removeNMTokenApplicationMasterKey(ApplicationAttemptId attempt)
+      throws IOException {
+  }
+
+  @Override
   protected void initStorage(Configuration conf) throws IOException {
   }
 

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java Mon Jul 21 21:44:50 2014
@@ -29,10 +29,12 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
 
 @Private
 @Unstable
@@ -100,6 +102,24 @@ public abstract class NMStateStoreServic
     }
   }
 
+  public static class RecoveredNMTokenState {
+    MasterKey currentMasterKey;
+    MasterKey previousMasterKey;
+    Map<ApplicationAttemptId, MasterKey> applicationMasterKeys;
+
+    public MasterKey getCurrentMasterKey() {
+      return currentMasterKey;
+    }
+
+    public MasterKey getPreviousMasterKey() {
+      return previousMasterKey;
+    }
+
+    public Map<ApplicationAttemptId, MasterKey> getApplicationMasterKeys() {
+      return applicationMasterKeys;
+    }
+  }
+
   /** Initialize the state storage */
   @Override
   public void serviceInit(Configuration conf) throws IOException {
@@ -173,6 +193,21 @@ public abstract class NMStateStoreServic
   public abstract void removeDeletionTask(int taskId) throws IOException;
 
 
+  public abstract RecoveredNMTokenState loadNMTokenState() throws IOException;
+
+  public abstract void storeNMTokenCurrentMasterKey(MasterKey key)
+      throws IOException;
+
+  public abstract void storeNMTokenPreviousMasterKey(MasterKey key)
+      throws IOException;
+
+  public abstract void storeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt, MasterKey key) throws IOException;
+
+  public abstract void removeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt) throws IOException;
+
+
   protected abstract void initStorage(Configuration conf) throws IOException;
 
   protected abstract void startStorage() throws IOException;

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java Mon Jul 21 21:44:50 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.security;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -31,6 +32,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokenState;
 import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 
@@ -45,16 +49,78 @@ public class NMTokenSecretManagerInNM ex
   
   private final Map<ApplicationAttemptId, MasterKeyData> oldMasterKeys;
   private final Map<ApplicationId, List<ApplicationAttemptId>> appToAppAttemptMap;
+  private final NMStateStoreService stateStore;
   private NodeId nodeId;                                                      
   
-  
   public NMTokenSecretManagerInNM() {
+    this(new NMNullStateStoreService());
+  }
+
+  public NMTokenSecretManagerInNM(NMStateStoreService stateStore) {
     this.oldMasterKeys =
         new HashMap<ApplicationAttemptId, MasterKeyData>();
     appToAppAttemptMap =         
         new HashMap<ApplicationId, List<ApplicationAttemptId>>();
+    this.stateStore = stateStore;
   }
   
+  public synchronized void recover(RecoveredNMTokenState state)
+      throws IOException {
+    MasterKey key = state.getCurrentMasterKey();
+    if (key != null) {
+      super.currentMasterKey =
+          new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+    }
+
+    key = state.getPreviousMasterKey();
+    if (key != null) {
+      previousMasterKey =
+          new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+    }
+
+    // restore the serial number from the current master key
+    if (super.currentMasterKey != null) {
+      super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
+    }
+
+    for (Map.Entry<ApplicationAttemptId, MasterKey> entry :
+         state.getApplicationMasterKeys().entrySet()) {
+      key = entry.getValue();
+      oldMasterKeys.put(entry.getKey(),
+          new MasterKeyData(key, createSecretKey(key.getBytes().array())));
+    }
+
+    // reconstruct app to app attempts map
+    appToAppAttemptMap.clear();
+    for (ApplicationAttemptId attempt : oldMasterKeys.keySet()) {
+      ApplicationId app = attempt.getApplicationId();
+      List<ApplicationAttemptId> attempts = appToAppAttemptMap.get(app);
+      if (attempts == null) {
+        attempts = new ArrayList<ApplicationAttemptId>();
+        appToAppAttemptMap.put(app, attempts);
+      }
+      attempts.add(attempt);
+    }
+  }
+
+  private void updateCurrentMasterKey(MasterKeyData key) {
+    super.currentMasterKey = key;
+    try {
+      stateStore.storeNMTokenCurrentMasterKey(key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to update current master key in state store", e);
+    }
+  }
+
+  private void updatePreviousMasterKey(MasterKeyData key) {
+    previousMasterKey = key;
+    try {
+      stateStore.storeNMTokenPreviousMasterKey(key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to update previous master key in state store", e);
+    }
+  }
+
   /**
    * Used by NodeManagers to create a token-secret-manager with the key
    * obtained from the RM. This can happen during registration or when the RM
@@ -62,20 +128,16 @@ public class NMTokenSecretManagerInNM ex
    */
   @Private
   public synchronized void setMasterKey(MasterKey masterKey) {
-    LOG.info("Rolling master-key for nm-tokens, got key with id :"
-        + masterKey.getKeyId());
-    if (super.currentMasterKey == null) {
-      super.currentMasterKey =
-          new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes()
-            .array()));
-    } else {
-      if (super.currentMasterKey.getMasterKey().getKeyId() != masterKey
-        .getKeyId()) {
-        this.previousMasterKey = super.currentMasterKey;
-        super.currentMasterKey =
-            new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes()
-              .array()));
+    // Update keys only if the key has changed.
+    if (super.currentMasterKey == null || super.currentMasterKey.getMasterKey()
+          .getKeyId() != masterKey.getKeyId()) {
+      LOG.info("Rolling master-key for container-tokens, got key with id "
+          + masterKey.getKeyId());
+      if (super.currentMasterKey != null) {
+        updatePreviousMasterKey(super.currentMasterKey);
       }
+      updateCurrentMasterKey(new MasterKeyData(masterKey,
+          createSecretKey(masterKey.getBytes().array())));
     }
   }
 
@@ -128,7 +190,7 @@ public class NMTokenSecretManagerInNM ex
       LOG.debug("Removing application attempts NMToken keys for application "
           + appId);
       for (ApplicationAttemptId appAttemptId : appAttemptList) {
-        this.oldMasterKeys.remove(appAttemptId);
+        removeAppAttemptKey(appAttemptId);
       }
       appToAppAttemptMap.remove(appId);
     } else {
@@ -164,11 +226,11 @@ public class NMTokenSecretManagerInNM ex
           + identifier.getApplicationAttemptId().toString());
       if (identifier.getKeyId() == currentMasterKey.getMasterKey()
         .getKeyId()) {
-        oldMasterKeys.put(appAttemptId, currentMasterKey);
+        updateAppAttemptKey(appAttemptId, currentMasterKey);
       } else if (previousMasterKey != null
           && identifier.getKeyId() == previousMasterKey.getMasterKey()
             .getKeyId()) {
-        oldMasterKeys.put(appAttemptId, previousMasterKey);
+        updateAppAttemptKey(appAttemptId, previousMasterKey);
       } else {
         throw new InvalidToken(
           "Older NMToken should not be used while starting the container.");
@@ -193,4 +255,24 @@ public class NMTokenSecretManagerInNM ex
   public synchronized NodeId getNodeId() {
     return this.nodeId;
   }
+
+  private void updateAppAttemptKey(ApplicationAttemptId attempt,
+      MasterKeyData key) {
+    this.oldMasterKeys.put(attempt, key);
+    try {
+      stateStore.storeNMTokenApplicationMasterKey(attempt,
+          key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to store master key for application " + attempt, e);
+    }
+  }
+
+  private void removeAppAttemptKey(ApplicationAttemptId attempt) {
+    this.oldMasterKeys.remove(attempt);
+    try {
+      stateStore.removeNMTokenApplicationMasterKey(attempt);
+    } catch (IOException e) {
+      LOG.error("Unable to remove master key for application " + attempt, e);
+    }
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto Mon Jul 21 21:44:50 2014
@@ -38,3 +38,9 @@ message LocalizedResourceProto {
   optional string localPath = 2;
   optional int64 size = 3;
 }
+
+message NMDBSchemaVersionProto {
+  optional int32 majorVersion = 1;
+  optional int32 minorVersion = 2;
+}
+

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java Mon Jul 21 21:44:50 2014
@@ -25,14 +25,18 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 
 public class NMMemoryStateStoreService extends NMStateStoreService {
   private Map<TrackerKey, TrackerState> trackerStates;
   private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
+  private RecoveredNMTokenState nmTokenState;
 
   public NMMemoryStateStoreService() {
     super(NMMemoryStateStoreService.class.getName());
@@ -113,8 +117,12 @@ public class NMMemoryStateStoreService e
 
   @Override
   protected void initStorage(Configuration conf) {
+    nmTokenState = new RecoveredNMTokenState();
+    nmTokenState.applicationMasterKeys =
+        new HashMap<ApplicationAttemptId, MasterKey>();
     trackerStates = new HashMap<TrackerKey, TrackerState>();
     deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
+
   }
 
   @Override
@@ -148,6 +156,47 @@ public class NMMemoryStateStoreService e
   }
 
 
+  @Override
+  public RecoveredNMTokenState loadNMTokenState() throws IOException {
+    // return a copy so caller can't modify our state
+    RecoveredNMTokenState result = new RecoveredNMTokenState();
+    result.currentMasterKey = nmTokenState.currentMasterKey;
+    result.previousMasterKey = nmTokenState.previousMasterKey;
+    result.applicationMasterKeys =
+        new HashMap<ApplicationAttemptId, MasterKey>(
+            nmTokenState.applicationMasterKeys);
+    return result;
+  }
+
+  @Override
+  public void storeNMTokenCurrentMasterKey(MasterKey key)
+      throws IOException {
+    MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
+    nmTokenState.currentMasterKey = new MasterKeyPBImpl(keypb.getProto());
+  }
+
+  @Override
+  public void storeNMTokenPreviousMasterKey(MasterKey key)
+      throws IOException {
+    MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
+    nmTokenState.previousMasterKey = new MasterKeyPBImpl(keypb.getProto());
+  }
+
+  @Override
+  public void storeNMTokenApplicationMasterKey(ApplicationAttemptId attempt,
+      MasterKey key) throws IOException {
+    MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
+    nmTokenState.applicationMasterKeys.put(attempt,
+        new MasterKeyPBImpl(keypb.getProto()));
+  }
+
+  @Override
+  public void removeNMTokenApplicationMasterKey(ApplicationAttemptId attempt)
+      throws IOException {
+    nmTokenState.applicationMasterKeys.remove(attempt);
+  }
+
+
   private static class TrackerState {
     Map<Path, LocalResourceProto> inProgressMap =
         new HashMap<Path, LocalResourceProto>();

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java Mon Jul 21 21:44:50 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -28,6 +29,8 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -37,12 +40,17 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokenState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
+import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -97,6 +105,36 @@ public class TestNMLeveldbStateStoreServ
     assertTrue(stateStore.canRecover());
     verifyEmptyState();
   }
+  
+  @Test
+  public void testCheckVersion() throws IOException {
+    // default version
+    NMDBSchemaVersion defaultVersion = stateStore.getCurrentVersion();
+    Assert.assertEquals(defaultVersion, stateStore.loadVersion());
+
+    // compatible version
+    NMDBSchemaVersion compatibleVersion =
+        NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion(),
+          defaultVersion.getMinorVersion() + 2);
+    stateStore.storeVersion(compatibleVersion);
+    Assert.assertEquals(compatibleVersion, stateStore.loadVersion());
+    restartStateStore();
+    // overwrite the compatible version
+    Assert.assertEquals(defaultVersion, stateStore.loadVersion());
+
+    // incompatible version
+    NMDBSchemaVersion incompatibleVersion =
+      NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion() + 1,
+          defaultVersion.getMinorVersion());
+    stateStore.storeVersion(incompatibleVersion);
+    try {
+      restartStateStore();
+      Assert.fail("Incompatible version, should expect fail here.");
+    } catch (ServiceStateException e) {
+      Assert.assertTrue("Exception message mismatch", 
+        e.getMessage().contains("Incompatible version for NM state:"));
+    }
+  }
 
   @Test
   public void testStartResourceLocalization() throws IOException {
@@ -460,4 +498,80 @@ public class TestNMLeveldbStateStoreServ
     state = stateStore.loadDeletionServiceState();
     assertTrue(state.getTasks().isEmpty());
   }
+
+  @Test
+  public void testNMTokenStorage() throws IOException {
+    // test empty when no state
+    RecoveredNMTokenState state = stateStore.loadNMTokenState();
+    assertNull(state.getCurrentMasterKey());
+    assertNull(state.getPreviousMasterKey());
+    assertTrue(state.getApplicationMasterKeys().isEmpty());
+
+    // store a master key and verify recovered
+    NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest();
+    MasterKey currentKey = secretMgr.generateKey();
+    stateStore.storeNMTokenCurrentMasterKey(currentKey);
+    restartStateStore();
+    state = stateStore.loadNMTokenState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertNull(state.getPreviousMasterKey());
+    assertTrue(state.getApplicationMasterKeys().isEmpty());
+
+    // store a previous key and verify recovered
+    MasterKey prevKey = secretMgr.generateKey();
+    stateStore.storeNMTokenPreviousMasterKey(prevKey);
+    restartStateStore();
+    state = stateStore.loadNMTokenState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertEquals(prevKey, state.getPreviousMasterKey());
+    assertTrue(state.getApplicationMasterKeys().isEmpty());
+
+    // store a few application keys and verify recovered
+    ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(1, 1), 1);
+    MasterKey attemptKey1 = secretMgr.generateKey();
+    stateStore.storeNMTokenApplicationMasterKey(attempt1, attemptKey1);
+    ApplicationAttemptId attempt2 = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(2, 3), 4);
+    MasterKey attemptKey2 = secretMgr.generateKey();
+    stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
+    restartStateStore();
+    state = stateStore.loadNMTokenState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertEquals(prevKey, state.getPreviousMasterKey());
+    Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
+        state.getApplicationMasterKeys();
+    assertEquals(2, loadedAppKeys.size());
+    assertEquals(attemptKey1, loadedAppKeys.get(attempt1));
+    assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
+
+    // add/update/remove keys and verify recovered
+    ApplicationAttemptId attempt3 = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(5, 6), 7);
+    MasterKey attemptKey3 = secretMgr.generateKey();
+    stateStore.storeNMTokenApplicationMasterKey(attempt3, attemptKey3);
+    stateStore.removeNMTokenApplicationMasterKey(attempt1);
+    attemptKey2 = prevKey;
+    stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
+    prevKey = currentKey;
+    stateStore.storeNMTokenPreviousMasterKey(prevKey);
+    currentKey = secretMgr.generateKey();
+    stateStore.storeNMTokenCurrentMasterKey(currentKey);
+    restartStateStore();
+    state = stateStore.loadNMTokenState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertEquals(prevKey, state.getPreviousMasterKey());
+    loadedAppKeys = state.getApplicationMasterKeys();
+    assertEquals(2, loadedAppKeys.size());
+    assertNull(loadedAppKeys.get(attempt1));
+    assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
+    assertEquals(attemptKey3, loadedAppKeys.get(attempt3));
+  }
+
+  private static class NMTokenSecretManagerForTest extends
+      BaseNMTokenSecretManager {
+    public MasterKey generateKey() {
+      return createNewMasterKey().getMasterKey();
+    }
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml Mon Jul 21 21:44:50 2014
@@ -194,6 +194,21 @@
 
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-auth</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+      <version>${project.version}</version>
+    </dependency>
+
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
       <groupId>com.sun.jersey.jersey-test-framework</groupId>
       <artifactId>jersey-test-framework-grizzly2</artifactId>
       <scope>test</scope>

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Mon Jul 21 21:44:50 2014
@@ -205,12 +205,6 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
-    // ACCECPTED state can once again receive APP_ACCEPTED event, because on
-    // recovery the app returns ACCEPTED state and the app once again go
-    // through the scheduler and triggers one more APP_ACCEPTED event at
-    // ACCEPTED state.
-    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
-        RMAppEventType.APP_ACCEPTED)
 
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -789,8 +783,18 @@ public class RMAppImpl implements RMApp,
         return app.recoveredFinalState;
       }
 
-      // Notify scheduler about the app on recovery
-      new AddApplicationToSchedulerTransition().transition(app, event);
+      // No existent attempts means the attempt associated with this app was not
+      // started or started but not yet saved.
+      if (app.attempts.isEmpty()) {
+        app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
+          app.submissionContext.getQueue(), app.user));
+        return RMAppState.SUBMITTED;
+      }
+
+      // Add application to scheduler synchronously to guarantee scheduler
+      // knows applications before AM or NM re-registers.
+      app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
+        app.submissionContext.getQueue(), app.user, true));
 
       // recover attempts
       app.recoverAppAttempts();
@@ -805,12 +809,6 @@ public class RMAppImpl implements RMApp,
         return RMAppState.ACCEPTED;
       }
 
-      // No existent attempts means the attempt associated with this app was not
-      // started or started but not yet saved.
-      if (app.attempts.isEmpty()) {
-        return RMAppState.SUBMITTED;
-      }
-
       // YARN-1507 is saving the application state after the application is
       // accepted. So after YARN-1507, an app is saved meaning it is accepted.
       // Thus we return ACCECPTED state on recovery.

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Mon Jul 21 21:44:50 2014
@@ -774,11 +774,9 @@ public class RMAppAttemptImpl implements
       }
 
       // create AMRMToken
-      AMRMTokenIdentifier id =
-          new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
       appAttempt.amrmToken =
-          new Token<AMRMTokenIdentifier>(id,
-            appAttempt.rmContext.getAMRMTokenSecretManager());
+          appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+            appAttempt.applicationAttemptId);
 
       // Add the applicationAttempt to the scheduler and inform the scheduler
       // whether to transfer the state from previous attempt.
@@ -926,8 +924,10 @@ public class RMAppAttemptImpl implements
           appAttempt.masterService
               .registerAppAttempt(appAttempt.applicationAttemptId);
 
-          appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
-            appAttempt.getAppAttemptId(), false, false));
+          // Add attempt to scheduler synchronously to guarantee scheduler
+          // knows attempts before AM or NM re-registers.
+          appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent(
+            appAttempt.getAppAttemptId(), false, true));
         }
 
         /*

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java Mon Jul 21 21:44:50 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
+import java.util.List;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 
 /**
@@ -73,5 +76,7 @@ public interface RMContainer extends Eve
   ContainerReport createContainerReport();
   
   boolean isAMContainer();
+  
+  List<ResourceRequest> getResourceRequests();
 
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Mon Jul 21 21:44:50 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
 import java.util.EnumSet;
+import java.util.List;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -158,6 +160,7 @@ public class RMContainerImpl implements 
   private long finishTime;
   private ContainerStatus finishedStatus;
   private boolean isAMContainer;
+  private List<ResourceRequest> resourceRequests;
 
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@@ -180,7 +183,8 @@ public class RMContainerImpl implements 
     this.eventHandler = rmContext.getDispatcher().getEventHandler();
     this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
     this.isAMContainer = false;
-    
+    this.resourceRequests = null;
+
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
@@ -311,6 +315,25 @@ public class RMContainerImpl implements 
       readLock.unlock();
     }
   }
+  
+  @Override
+  public List<ResourceRequest> getResourceRequests() {
+    try {
+      readLock.lock();
+      return resourceRequests;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  public void setResourceRequests(List<ResourceRequest> requests) {
+    try {
+      writeLock.lock();
+      this.resourceRequests = requests;
+    } finally {
+      writeLock.unlock();
+    }
+  }
 
   @Override
   public String toString() {
@@ -432,6 +455,9 @@ public class RMContainerImpl implements 
 
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
+      // Clear ResourceRequest stored in RMContainer
+      container.setResourceRequests(null);
+      
       // Register with containerAllocationExpirer.
       container.containerAllocationExpirer.register(container.getContainerId());
 

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Mon Jul 21 21:44:50 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -122,6 +123,23 @@ public abstract class AbstractYarnSchedu
     return maximumAllocation;
   }
 
+  protected void containerLaunchedOnNode(ContainerId containerId,
+                                         SchedulerNode node) {
+    // Get the application for the finished container
+    SchedulerApplicationAttempt application = getCurrentAttemptForContainer
+        (containerId);
+    if (application == null) {
+      LOG.info("Unknown application "
+          + containerId.getApplicationAttemptId().getApplicationId()
+          + " launched container " + containerId + " on node: " + node);
+      this.rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+      return;
+    }
+
+    application.containerLaunchedOnNode(containerId, node.getNodeID());
+  }
+
   public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
     SchedulerApplication<T> app =
         applications.get(applicationAttemptId.getApplicationId());
@@ -275,6 +293,27 @@ public abstract class AbstractYarnSchedu
     return rmContainer;
   }
 
+  /**
+   * Recover resource request back from RMContainer when a container is 
+   * preempted before AM pulled the same. If container is pulled by
+   * AM, then RMContainer will not have resource request to recover.
+   * @param rmContainer
+   */
+  protected void recoverResourceRequestForContainer(RMContainer rmContainer) {
+    List<ResourceRequest> requests = rmContainer.getResourceRequests();
+
+    // If container state is moved to ACQUIRED, request will be empty.
+    if (requests == null) {
+      return;
+    }
+    // Add resource request back to Scheduler.
+    SchedulerApplicationAttempt schedulerAttempt 
+        = getCurrentAttemptForContainer(rmContainer.getContainerId());
+    if (schedulerAttempt != null) {
+      schedulerAttempt.recoverResourceRequests(requests);
+    }
+  }
+
   public SchedulerNode getSchedulerNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Mon Jul 21 21:44:50 2014
@@ -127,9 +127,10 @@ public class AppSchedulingInfo {
    * by the application.
    *
    * @param requests resources to be acquired
+   * @param recoverPreemptedRequest recover Resource Request on preemption
    */
   synchronized public void updateResourceRequests(
-      List<ResourceRequest> requests) {
+      List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
     QueueMetrics metrics = queue.getMetrics();
     
     // Update resource requests
@@ -163,8 +164,13 @@ public class AppSchedulingInfo {
         asks = new HashMap<String, ResourceRequest>();
         this.requests.put(priority, asks);
         this.priorities.add(priority);
-      } else if (updatePendingResources) {
-        lastRequest = asks.get(resourceName);
+      }
+      lastRequest = asks.get(resourceName);
+
+      if (recoverPreemptedRequest && lastRequest != null) {
+        // Increment the number of containers to 1, as it is recovering a
+        // single container.
+        request.setNumContainers(lastRequest.getNumContainers() + 1);
       }
 
       asks.put(resourceName, request);
@@ -254,14 +260,16 @@ public class AppSchedulingInfo {
    * @param container
    *          the containers allocated.
    */
-  synchronized public void allocate(NodeType type, SchedulerNode node,
-      Priority priority, ResourceRequest request, Container container) {
+  synchronized public List<ResourceRequest> allocate(NodeType type,
+      SchedulerNode node, Priority priority, ResourceRequest request,
+      Container container) {
+    List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>();
     if (type == NodeType.NODE_LOCAL) {
-      allocateNodeLocal(node, priority, request, container);
+      allocateNodeLocal(node, priority, request, container, resourceRequests);
     } else if (type == NodeType.RACK_LOCAL) {
-      allocateRackLocal(node, priority, request, container);
+      allocateRackLocal(node, priority, request, container, resourceRequests);
     } else {
-      allocateOffSwitch(node, priority, request, container);
+      allocateOffSwitch(node, priority, request, container, resourceRequests);
     }
     QueueMetrics metrics = queue.getMetrics();
     if (pending) {
@@ -279,6 +287,7 @@ public class AppSchedulingInfo {
           + " resource=" + request.getCapability());
     }
     metrics.allocateResources(user, 1, request.getCapability(), true);
+    return resourceRequests;
   }
 
   /**
@@ -288,9 +297,9 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateNodeLocal( 
-      SchedulerNode node, Priority priority, 
-      ResourceRequest nodeLocalRequest, Container container) {
+  synchronized private void allocateNodeLocal(SchedulerNode node,
+      Priority priority, ResourceRequest nodeLocalRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
     if (nodeLocalRequest.getNumContainers() == 0) {
@@ -304,7 +313,14 @@ public class AppSchedulingInfo {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+    ResourceRequest offRackRequest = requests.get(priority).get(
+        ResourceRequest.ANY);
+    decrementOutstanding(offRackRequest);
+
+    // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
   }
 
   /**
@@ -314,16 +330,22 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateRackLocal(
-      SchedulerNode node, Priority priority,
-      ResourceRequest rackLocalRequest, Container container) {
+  synchronized private void allocateRackLocal(SchedulerNode node,
+      Priority priority, ResourceRequest rackLocalRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
     if (rackLocalRequest.getNumContainers() == 0) {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+    ResourceRequest offRackRequest = requests.get(priority).get(
+        ResourceRequest.ANY);
+    decrementOutstanding(offRackRequest);
+
+    // Update cloned RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
   }
 
   /**
@@ -333,11 +355,13 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateOffSwitch(
-      SchedulerNode node, Priority priority,
-      ResourceRequest offSwitchRequest, Container container) {
+  synchronized private void allocateOffSwitch(SchedulerNode node,
+      Priority priority, ResourceRequest offSwitchRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     decrementOutstanding(offSwitchRequest);
+    // Update cloned RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(offSwitchRequest));
   }
 
   synchronized private void decrementOutstanding(
@@ -436,4 +460,11 @@ public class AppSchedulingInfo {
     metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
       false);
   }
+  
+  public ResourceRequest cloneResourceRequest(ResourceRequest request) {
+    ResourceRequest newRequest = ResourceRequest.newInstance(
+        request.getPriority(), request.getResourceName(),
+        request.getCapability(), 1, request.getRelaxLocality());
+    return newRequest;
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Mon Jul 21 21:44:50 2014
@@ -241,7 +241,14 @@ public class SchedulerApplicationAttempt
   public synchronized void updateResourceRequests(
       List<ResourceRequest> requests) {
     if (!isStopped) {
-      appSchedulingInfo.updateResourceRequests(requests);
+      appSchedulingInfo.updateResourceRequests(requests, false);
+    }
+  }
+  
+  public synchronized void recoverResourceRequests(
+      List<ResourceRequest> requests) {
+    if (!isStopped) {
+      appSchedulingInfo.updateResourceRequests(requests, true);
     }
   }
   



Mime
View raw message