hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1612742 [2/3] - in /hadoop/common/branches/MR-2841/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ ...
Date Wed, 23 Jul 2014 01:47:41 GMT
Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java Wed Jul 23 01:47:28 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -28,6 +29,8 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -37,12 +40,17 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokenState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
+import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -97,6 +105,36 @@ public class TestNMLeveldbStateStoreServ
     assertTrue(stateStore.canRecover());
     verifyEmptyState();
   }
+  
+  @Test
+  public void testCheckVersion() throws IOException {
+    // default version
+    NMDBSchemaVersion defaultVersion = stateStore.getCurrentVersion();
+    Assert.assertEquals(defaultVersion, stateStore.loadVersion());
+
+    // compatible version
+    NMDBSchemaVersion compatibleVersion =
+        NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion(),
+          defaultVersion.getMinorVersion() + 2);
+    stateStore.storeVersion(compatibleVersion);
+    Assert.assertEquals(compatibleVersion, stateStore.loadVersion());
+    restartStateStore();
+    // overwrite the compatible version
+    Assert.assertEquals(defaultVersion, stateStore.loadVersion());
+
+    // incompatible version
+    NMDBSchemaVersion incompatibleVersion =
+      NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion() + 1,
+          defaultVersion.getMinorVersion());
+    stateStore.storeVersion(incompatibleVersion);
+    try {
+      restartStateStore();
+      Assert.fail("Incompatible version, should expect fail here.");
+    } catch (ServiceStateException e) {
+      Assert.assertTrue("Exception message mismatch", 
+        e.getMessage().contains("Incompatible version for NM state:"));
+    }
+  }
 
   @Test
   public void testStartResourceLocalization() throws IOException {
@@ -460,4 +498,80 @@ public class TestNMLeveldbStateStoreServ
     state = stateStore.loadDeletionServiceState();
     assertTrue(state.getTasks().isEmpty());
   }
+
+  @Test
+  public void testNMTokenStorage() throws IOException {
+    // test empty when no state
+    RecoveredNMTokenState state = stateStore.loadNMTokenState();
+    assertNull(state.getCurrentMasterKey());
+    assertNull(state.getPreviousMasterKey());
+    assertTrue(state.getApplicationMasterKeys().isEmpty());
+
+    // store a master key and verify recovered
+    NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest();
+    MasterKey currentKey = secretMgr.generateKey();
+    stateStore.storeNMTokenCurrentMasterKey(currentKey);
+    restartStateStore();
+    state = stateStore.loadNMTokenState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertNull(state.getPreviousMasterKey());
+    assertTrue(state.getApplicationMasterKeys().isEmpty());
+
+    // store a previous key and verify recovered
+    MasterKey prevKey = secretMgr.generateKey();
+    stateStore.storeNMTokenPreviousMasterKey(prevKey);
+    restartStateStore();
+    state = stateStore.loadNMTokenState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertEquals(prevKey, state.getPreviousMasterKey());
+    assertTrue(state.getApplicationMasterKeys().isEmpty());
+
+    // store a few application keys and verify recovered
+    ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(1, 1), 1);
+    MasterKey attemptKey1 = secretMgr.generateKey();
+    stateStore.storeNMTokenApplicationMasterKey(attempt1, attemptKey1);
+    ApplicationAttemptId attempt2 = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(2, 3), 4);
+    MasterKey attemptKey2 = secretMgr.generateKey();
+    stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
+    restartStateStore();
+    state = stateStore.loadNMTokenState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertEquals(prevKey, state.getPreviousMasterKey());
+    Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
+        state.getApplicationMasterKeys();
+    assertEquals(2, loadedAppKeys.size());
+    assertEquals(attemptKey1, loadedAppKeys.get(attempt1));
+    assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
+
+    // add/update/remove keys and verify recovered
+    ApplicationAttemptId attempt3 = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(5, 6), 7);
+    MasterKey attemptKey3 = secretMgr.generateKey();
+    stateStore.storeNMTokenApplicationMasterKey(attempt3, attemptKey3);
+    stateStore.removeNMTokenApplicationMasterKey(attempt1);
+    attemptKey2 = prevKey;
+    stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
+    prevKey = currentKey;
+    stateStore.storeNMTokenPreviousMasterKey(prevKey);
+    currentKey = secretMgr.generateKey();
+    stateStore.storeNMTokenCurrentMasterKey(currentKey);
+    restartStateStore();
+    state = stateStore.loadNMTokenState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertEquals(prevKey, state.getPreviousMasterKey());
+    loadedAppKeys = state.getApplicationMasterKeys();
+    assertEquals(2, loadedAppKeys.size());
+    assertNull(loadedAppKeys.get(attempt1));
+    assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
+    assertEquals(attemptKey3, loadedAppKeys.get(attempt3));
+  }
+
+  private static class NMTokenSecretManagerForTest extends
+      BaseNMTokenSecretManager {
+    public MasterKey generateKey() {
+      return createNewMasterKey().getMasterKey();
+    }
+  }
 }

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml Wed Jul 23 01:47:28 2014
@@ -194,6 +194,21 @@
 
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-auth</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+      <version>${project.version}</version>
+    </dependency>
+
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
       <groupId>com.sun.jersey.jersey-test-framework</groupId>
       <artifactId>jersey-test-framework-grizzly2</artifactId>
       <scope>test</scope>

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Wed Jul 23 01:47:28 2014
@@ -1035,8 +1035,8 @@ public class ResourceManager extends Com
     StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
     try {
       Configuration conf = new YarnConfiguration();
-      // If -format, then delete RMStateStore; else startup normally
-      if (argv.length == 1 && argv[0].equals("-format")) {
+      // If -format-state-store, then delete RMStateStore; else startup normally
+      if (argv.length == 1 && argv[0].equals("-format-state-store")) {
         deleteRMStateStore(conf);
       } else {
         ResourceManager resourceManager = new ResourceManager();

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Wed Jul 23 01:47:28 2014
@@ -205,12 +205,6 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
-    // ACCECPTED state can once again receive APP_ACCEPTED event, because on
-    // recovery the app returns ACCEPTED state and the app once again go
-    // through the scheduler and triggers one more APP_ACCEPTED event at
-    // ACCEPTED state.
-    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
-        RMAppEventType.APP_ACCEPTED)
 
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -789,8 +783,18 @@ public class RMAppImpl implements RMApp,
         return app.recoveredFinalState;
       }
 
-      // Notify scheduler about the app on recovery
-      new AddApplicationToSchedulerTransition().transition(app, event);
+      // No existent attempts means the attempt associated with this app was not
+      // started or started but not yet saved.
+      if (app.attempts.isEmpty()) {
+        app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
+          app.submissionContext.getQueue(), app.user));
+        return RMAppState.SUBMITTED;
+      }
+
+      // Add application to scheduler synchronously to guarantee scheduler
+      // knows applications before AM or NM re-registers.
+      app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
+        app.submissionContext.getQueue(), app.user, true));
 
       // recover attempts
       app.recoverAppAttempts();
@@ -805,12 +809,6 @@ public class RMAppImpl implements RMApp,
         return RMAppState.ACCEPTED;
       }
 
-      // No existent attempts means the attempt associated with this app was not
-      // started or started but not yet saved.
-      if (app.attempts.isEmpty()) {
-        return RMAppState.SUBMITTED;
-      }
-
       // YARN-1507 is saving the application state after the application is
       // accepted. So after YARN-1507, an app is saved meaning it is accepted.
       // Thus we return ACCECPTED state on recovery.

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Jul 23 01:47:28 2014
@@ -774,11 +774,9 @@ public class RMAppAttemptImpl implements
       }
 
       // create AMRMToken
-      AMRMTokenIdentifier id =
-          new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
       appAttempt.amrmToken =
-          new Token<AMRMTokenIdentifier>(id,
-            appAttempt.rmContext.getAMRMTokenSecretManager());
+          appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+            appAttempt.applicationAttemptId);
 
       // Add the applicationAttempt to the scheduler and inform the scheduler
       // whether to transfer the state from previous attempt.
@@ -926,8 +924,10 @@ public class RMAppAttemptImpl implements
           appAttempt.masterService
               .registerAppAttempt(appAttempt.applicationAttemptId);
 
-          appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
-            appAttempt.getAppAttemptId(), false, false));
+          // Add attempt to scheduler synchronously to guarantee scheduler
+          // knows attempts before AM or NM re-registers.
+          appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent(
+            appAttempt.getAppAttemptId(), false, true));
         }
 
         /*
@@ -1294,12 +1294,20 @@ public class RMAppAttemptImpl implements
   private String getAMContainerCrashedDiagnostics(
       RMAppAttemptContainerFinishedEvent finishEvent) {
     ContainerStatus status = finishEvent.getContainerStatus();
-    String diagnostics =
-        "AM Container for " + finishEvent.getApplicationAttemptId()
-        + " exited with " + " exitCode: " + status.getExitStatus() + ". "
-        + "Check application tracking page: " + this.getTrackingUrl()
-        + " . Then, click on links to logs of each attempt for detailed output. ";
-    return diagnostics;
+    StringBuilder diagnosticsBuilder = new StringBuilder();
+    diagnosticsBuilder.append("AM Container for ").append(
+      finishEvent.getApplicationAttemptId()).append(
+      " exited with ").append(" exitCode: ").append(status.getExitStatus()).
+      append("\n");
+    if (this.getTrackingUrl() != null) {
+      diagnosticsBuilder.append("For more detailed output,").append(
+        " check application tracking page:").append(
+        this.getTrackingUrl()).append(
+        "Then, click on links to logs of each attempt.\n");
+    }
+    diagnosticsBuilder.append("Diagnostics: ").append(status.getDiagnostics())
+        .append("Failing this attempt");
+    return diagnosticsBuilder.toString();
   }
 
   private static class FinalTransition extends BaseFinalTransition {

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java Wed Jul 23 01:47:28 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
+import java.util.List;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 
 /**
@@ -73,5 +76,7 @@ public interface RMContainer extends Eve
   ContainerReport createContainerReport();
   
   boolean isAMContainer();
+  
+  List<ResourceRequest> getResourceRequests();
 
 }

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Wed Jul 23 01:47:28 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
 import java.util.EnumSet;
+import java.util.List;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -158,6 +160,7 @@ public class RMContainerImpl implements 
   private long finishTime;
   private ContainerStatus finishedStatus;
   private boolean isAMContainer;
+  private List<ResourceRequest> resourceRequests;
 
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@@ -180,7 +183,8 @@ public class RMContainerImpl implements 
     this.eventHandler = rmContext.getDispatcher().getEventHandler();
     this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
     this.isAMContainer = false;
-    
+    this.resourceRequests = null;
+
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
@@ -311,6 +315,25 @@ public class RMContainerImpl implements 
       readLock.unlock();
     }
   }
+  
+  @Override
+  public List<ResourceRequest> getResourceRequests() {
+    try {
+      readLock.lock();
+      return resourceRequests;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  public void setResourceRequests(List<ResourceRequest> requests) {
+    try {
+      writeLock.lock();
+      this.resourceRequests = requests;
+    } finally {
+      writeLock.unlock();
+    }
+  }
 
   @Override
   public String toString() {
@@ -432,6 +455,9 @@ public class RMContainerImpl implements 
 
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
+      // Clear ResourceRequest stored in RMContainer
+      container.setResourceRequests(null);
+      
       // Register with containerAllocationExpirer.
       container.containerAllocationExpirer.register(container.getContainerId());
 

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Wed Jul 23 01:47:28 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -122,6 +123,23 @@ public abstract class AbstractYarnSchedu
     return maximumAllocation;
   }
 
+  protected void containerLaunchedOnNode(ContainerId containerId,
+                                         SchedulerNode node) {
+    // Get the application for the finished container
+    SchedulerApplicationAttempt application = getCurrentAttemptForContainer
+        (containerId);
+    if (application == null) {
+      LOG.info("Unknown application "
+          + containerId.getApplicationAttemptId().getApplicationId()
+          + " launched container " + containerId + " on node: " + node);
+      this.rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+      return;
+    }
+
+    application.containerLaunchedOnNode(containerId, node.getNodeID());
+  }
+
   public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
     SchedulerApplication<T> app =
         applications.get(applicationAttemptId.getApplicationId());
@@ -275,6 +293,27 @@ public abstract class AbstractYarnSchedu
     return rmContainer;
   }
 
+  /**
+   * Recover resource request back from RMContainer when a container is 
+   * preempted before AM pulled the same. If container is pulled by
+   * AM, then RMContainer will not have resource request to recover.
+   * @param rmContainer
+   */
+  protected void recoverResourceRequestForContainer(RMContainer rmContainer) {
+    List<ResourceRequest> requests = rmContainer.getResourceRequests();
+
+    // If container state is moved to ACQUIRED, request will be empty.
+    if (requests == null) {
+      return;
+    }
+    // Add resource request back to Scheduler.
+    SchedulerApplicationAttempt schedulerAttempt 
+        = getCurrentAttemptForContainer(rmContainer.getContainerId());
+    if (schedulerAttempt != null) {
+      schedulerAttempt.recoverResourceRequests(requests);
+    }
+  }
+
   public SchedulerNode getSchedulerNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Wed Jul 23 01:47:28 2014
@@ -127,9 +127,10 @@ public class AppSchedulingInfo {
    * by the application.
    *
    * @param requests resources to be acquired
+   * @param recoverPreemptedRequest recover Resource Request on preemption
    */
   synchronized public void updateResourceRequests(
-      List<ResourceRequest> requests) {
+      List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
     QueueMetrics metrics = queue.getMetrics();
     
     // Update resource requests
@@ -163,8 +164,13 @@ public class AppSchedulingInfo {
         asks = new HashMap<String, ResourceRequest>();
         this.requests.put(priority, asks);
         this.priorities.add(priority);
-      } else if (updatePendingResources) {
-        lastRequest = asks.get(resourceName);
+      }
+      lastRequest = asks.get(resourceName);
+
+      if (recoverPreemptedRequest && lastRequest != null) {
+        // Increment the number of containers to 1, as it is recovering a
+        // single container.
+        request.setNumContainers(lastRequest.getNumContainers() + 1);
       }
 
       asks.put(resourceName, request);
@@ -254,14 +260,16 @@ public class AppSchedulingInfo {
    * @param container
    *          the containers allocated.
    */
-  synchronized public void allocate(NodeType type, SchedulerNode node,
-      Priority priority, ResourceRequest request, Container container) {
+  synchronized public List<ResourceRequest> allocate(NodeType type,
+      SchedulerNode node, Priority priority, ResourceRequest request,
+      Container container) {
+    List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>();
     if (type == NodeType.NODE_LOCAL) {
-      allocateNodeLocal(node, priority, request, container);
+      allocateNodeLocal(node, priority, request, container, resourceRequests);
     } else if (type == NodeType.RACK_LOCAL) {
-      allocateRackLocal(node, priority, request, container);
+      allocateRackLocal(node, priority, request, container, resourceRequests);
     } else {
-      allocateOffSwitch(node, priority, request, container);
+      allocateOffSwitch(node, priority, request, container, resourceRequests);
     }
     QueueMetrics metrics = queue.getMetrics();
     if (pending) {
@@ -279,6 +287,7 @@ public class AppSchedulingInfo {
           + " resource=" + request.getCapability());
     }
     metrics.allocateResources(user, 1, request.getCapability(), true);
+    return resourceRequests;
   }
 
   /**
@@ -288,9 +297,9 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateNodeLocal( 
-      SchedulerNode node, Priority priority, 
-      ResourceRequest nodeLocalRequest, Container container) {
+  synchronized private void allocateNodeLocal(SchedulerNode node,
+      Priority priority, ResourceRequest nodeLocalRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
     if (nodeLocalRequest.getNumContainers() == 0) {
@@ -304,7 +313,14 @@ public class AppSchedulingInfo {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+    ResourceRequest offRackRequest = requests.get(priority).get(
+        ResourceRequest.ANY);
+    decrementOutstanding(offRackRequest);
+
+    // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
   }
 
   /**
@@ -314,16 +330,22 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateRackLocal(
-      SchedulerNode node, Priority priority,
-      ResourceRequest rackLocalRequest, Container container) {
+  synchronized private void allocateRackLocal(SchedulerNode node,
+      Priority priority, ResourceRequest rackLocalRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
     if (rackLocalRequest.getNumContainers() == 0) {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+    ResourceRequest offRackRequest = requests.get(priority).get(
+        ResourceRequest.ANY);
+    decrementOutstanding(offRackRequest);
+
+    // Update cloned RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
   }
 
   /**
@@ -333,11 +355,13 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateOffSwitch(
-      SchedulerNode node, Priority priority,
-      ResourceRequest offSwitchRequest, Container container) {
+  synchronized private void allocateOffSwitch(SchedulerNode node,
+      Priority priority, ResourceRequest offSwitchRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     decrementOutstanding(offSwitchRequest);
+    // Update cloned RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(offSwitchRequest));
   }
 
   synchronized private void decrementOutstanding(
@@ -436,4 +460,11 @@ public class AppSchedulingInfo {
     metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
       false);
   }
+  
+  public ResourceRequest cloneResourceRequest(ResourceRequest request) {
+    ResourceRequest newRequest = ResourceRequest.newInstance(
+        request.getPriority(), request.getResourceName(),
+        request.getCapability(), 1, request.getRelaxLocality());
+    return newRequest;
+  }
 }

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Wed Jul 23 01:47:28 2014
@@ -241,7 +241,14 @@ public class SchedulerApplicationAttempt
   public synchronized void updateResourceRequests(
       List<ResourceRequest> requests) {
     if (!isStopped) {
-      appSchedulingInfo.updateResourceRequests(requests);
+      appSchedulingInfo.updateResourceRequests(requests, false);
+    }
+  }
+  
+  public synchronized void recoverResourceRequests(
+      List<ResourceRequest> requests) {
+    if (!isStopped) {
+      appSchedulingInfo.updateResourceRequests(requests, true);
     }
   }
   

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Wed Jul 23 01:47:28 2014
@@ -69,7 +69,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -521,7 +520,7 @@ public class CapacityScheduler extends
   }
 
   private synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user) {
+      String queueName, String user, boolean isAppRecovering) {
     // santiy checks.
     CSQueue queue = getQueue(queueName);
     if (queue == null) {
@@ -553,14 +552,20 @@ public class CapacityScheduler extends
     applications.put(applicationId, application);
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", in queue: " + queueName);
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   private synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
-      boolean shouldNotifyAttemptAdded) {
+      boolean isAttemptRecovering) {
     SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     CSQueue queue = (CSQueue) application.getQueue();
@@ -578,14 +583,15 @@ public class CapacityScheduler extends
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user " + application.getUser() + " in queue "
         + queue.getQueueName());
-    if (shouldNotifyAttemptAdded) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptEvent(applicationAttemptId,
-              RMAppAttemptEventType.ATTEMPT_ADDED));
-    } else {
+    if (isAttemptRecovering) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+        LOG.debug(applicationAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
       }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(applicationAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     }
   }
 
@@ -859,21 +865,6 @@ public class CapacityScheduler extends
   
   }
 
-  private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
-    // Get the application for the finished container
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-      return;
-    }
-    
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
   @Override
   public void handle(SchedulerEvent event) {
     switch(event.getType()) {
@@ -905,7 +896,8 @@ public class CapacityScheduler extends
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
     }
     break;
     case APP_REMOVED:
@@ -921,7 +913,7 @@ public class CapacityScheduler extends
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
         appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
-        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+        appAttemptAddedEvent.getIsAttemptRecovering());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -1089,6 +1081,7 @@ public class CapacityScheduler extends
     if (LOG.isDebugEnabled()) {
       LOG.debug("KILL_CONTAINER: container" + cont.toString());
     }
+    recoverResourceRequestForContainer(cont);
     completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
       cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
       RMContainerEventType.KILL);

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Wed Jul 23 01:47:28 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -77,6 +78,9 @@ public class FiCaSchedulerApp extends Sc
     if (null == liveContainers.remove(rmContainer.getContainerId())) {
       return false;
     }
+    
+    // Remove from the list of newly allocated containers if found
+    newlyAllocatedContainers.remove(rmContainer);
 
     Container container = rmContainer.getContainer();
     ContainerId containerId = container.getId();
@@ -129,8 +133,12 @@ public class FiCaSchedulerApp extends Sc
     liveContainers.put(container.getId(), rmContainer);    
 
     // Update consumption and track allocations
-    appSchedulingInfo.allocate(type, node, priority, request, container);
+    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+        type, node, priority, request, container);
     Resources.addTo(currentConsumption, container.getResource());
+    
+    // Update resource requests related to "request" and store in RMContainer 
+    ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
 
     // Inform the container
     rmContainer.handle(

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java Wed Jul 23 01:47:28 2014
@@ -25,13 +25,20 @@ public class AppAddedSchedulerEvent exte
   private final ApplicationId applicationId;
   private final String queue;
   private final String user;
+  private final boolean isAppRecovering;
 
   public AppAddedSchedulerEvent(
       ApplicationId applicationId, String queue, String user) {
+    this(applicationId, queue, user, false);
+  }
+
+  public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+      String user, boolean isAppRecovering) {
     super(SchedulerEventType.APP_ADDED);
     this.applicationId = applicationId;
     this.queue = queue;
     this.user = user;
+    this.isAppRecovering = isAppRecovering;
   }
 
   public ApplicationId getApplicationId() {
@@ -46,4 +53,7 @@ public class AppAddedSchedulerEvent exte
     return user;
   }
 
+  public boolean getIsAppRecovering() {
+    return isAppRecovering;
+  }
 }

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Wed Jul 23 01:47:28 2014
@@ -24,22 +24,22 @@ public class AppAttemptAddedSchedulerEve
 
   private final ApplicationAttemptId applicationAttemptId;
   private final boolean transferStateFromPreviousAttempt;
-  private final boolean shouldNotifyAttemptAdded;
+  private final boolean isAttemptRecovering;
 
   public AppAttemptAddedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt) {
-    this(applicationAttemptId, transferStateFromPreviousAttempt, true);
+    this(applicationAttemptId, transferStateFromPreviousAttempt, false);
   }
 
   public AppAttemptAddedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
-      boolean shouldNotifyAttemptAdded) {
+      boolean isAttemptRecovering) {
     super(SchedulerEventType.APP_ATTEMPT_ADDED);
     this.applicationAttemptId = applicationAttemptId;
     this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
-    this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded;
+    this.isAttemptRecovering = isAttemptRecovering;
   }
 
   public ApplicationAttemptId getApplicationAttemptId() {
@@ -50,7 +50,7 @@ public class AppAttemptAddedSchedulerEve
     return transferStateFromPreviousAttempt;
   }
 
-  public boolean getShouldNotifyAttemptAdded() {
-    return shouldNotifyAttemptAdded;
+  public boolean getIsAttemptRecovering() {
+    return isAttemptRecovering;
   }
 }

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Wed Jul 23 01:47:28 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -82,6 +83,9 @@ public class FSSchedulerApp extends Sche
     Container container = rmContainer.getContainer();
     ContainerId containerId = container.getId();
     
+    // Remove from the list of newly allocated containers if found
+    newlyAllocatedContainers.remove(rmContainer);
+    
     // Inform the container
     rmContainer.handle(
         new RMContainerFinishedEvent(
@@ -281,9 +285,13 @@ public class FSSchedulerApp extends Sche
     liveContainers.put(container.getId(), rmContainer);    
 
     // Update consumption and track allocations
-    appSchedulingInfo.allocate(type, node, priority, request, container);
+    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+        type, node, priority, request, container);
     Resources.addTo(currentConsumption, container.getResource());
 
+    // Update resource requests related to "request" and store in RMContainer
+    ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
+
     // Inform the container
     rmContainer.handle(
         new RMContainerEvent(container.getId(), RMContainerEventType.START));

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Wed Jul 23 01:47:28 2014
@@ -422,7 +422,7 @@ public class FairScheduler extends
     }
   }
   
-  private void warnOrKillContainer(RMContainer container) {
+  protected void warnOrKillContainer(RMContainer container) {
     ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
     FSSchedulerApp app = getSchedulerApp(appAttemptId);
     FSLeafQueue queue = app.getQueue();
@@ -440,6 +440,7 @@ public class FairScheduler extends
           SchedulerUtils.createPreemptedContainerStatus(
             container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
 
+        recoverResourceRequestForContainer(container);
         // TODO: Not sure if this ever actually adds this to the list of cleanup
         // containers on the RMNode (see SchedulerNode.releaseContainer()).
         completedContainer(container, status, RMContainerEventType.KILL);
@@ -565,7 +566,7 @@ public class FairScheduler extends
    * configured limits, but the app will not be marked as runnable.
    */
   protected synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user) {
+      String queueName, String user, boolean isAppRecovering) {
     if (queueName == null || queueName.isEmpty()) {
       String message = "Reject application " + applicationId +
               " submitted by user " + user + " with an empty queue name.";
@@ -602,8 +603,14 @@ public class FairScheduler extends
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", in queue: " + queueName + ", currently num of applications: "
         + applications.size());
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   /**
@@ -612,7 +619,7 @@ public class FairScheduler extends
   protected synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
-      boolean shouldNotifyAttemptAdded) {
+      boolean isAttemptRecovering) {
     SchedulerApplication<FSSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     String user = application.getUser();
@@ -641,14 +648,15 @@ public class FairScheduler extends
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user: " + user);
 
-    if (shouldNotifyAttemptAdded) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptEvent(applicationAttemptId,
-              RMAppAttemptEventType.ATTEMPT_ADDED));
-    } else {
+    if (isAttemptRecovering) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+        LOG.debug(applicationAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
       }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(applicationAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     }
   }
 
@@ -921,22 +929,6 @@ public class FairScheduler extends
   }
 
   /**
-   * Process a container which has launched on a node, as reported by the node.
-   */
-  private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
-    // Get the application for the finished container
-    FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      return;
-    }
-
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
-  /**
    * Process a heartbeat update from a node.
    */
   private synchronized void nodeUpdate(RMNode nm) {
@@ -978,37 +970,27 @@ public class FairScheduler extends
     }
   }
 
-  private void continuousScheduling() {
-    while (true) {
-      List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
-      // Sort the nodes by space available on them, so that we offer
-      // containers on emptier nodes first, facilitating an even spread. This
-      // requires holding the scheduler lock, so that the space available on a
-      // node doesn't change during the sort.
-      synchronized (this) {
-        Collections.sort(nodeIdList, nodeAvailableResourceComparator);
-      }
-
-      // iterate all nodes
-      for (NodeId nodeId : nodeIdList) {
-        if (nodes.containsKey(nodeId)) {
-          FSSchedulerNode node = getFSSchedulerNode(nodeId);
-          try {
-            if (Resources.fitsIn(minimumAllocation,
-                    node.getAvailableResource())) {
-              attemptScheduling(node);
-            }
-          } catch (Throwable ex) {
-            LOG.warn("Error while attempting scheduling for node " + node +
-                    ": " + ex.toString(), ex);
-          }
-        }
-      }
+  void continuousSchedulingAttempt() {
+    List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
+    // Sort the nodes by space available on them, so that we offer
+    // containers on emptier nodes first, facilitating an even spread. This
+    // requires holding the scheduler lock, so that the space available on a
+    // node doesn't change during the sort.
+    synchronized (this) {
+      Collections.sort(nodeIdList, nodeAvailableResourceComparator);
+    }
+
+    // iterate all nodes
+    for (NodeId nodeId : nodeIdList) {
+      FSSchedulerNode node = getFSSchedulerNode(nodeId);
       try {
-        Thread.sleep(getContinuousSchedulingSleepMs());
-      } catch (InterruptedException e) {
-        LOG.warn("Error while doing sleep in continuous scheduling: " +
-                e.toString(), e);
+        if (node != null && Resources.fitsIn(minimumAllocation,
+            node.getAvailableResource())) {
+          attemptScheduling(node);
+        }
+      } catch (Throwable ex) {
+        LOG.error("Error while attempting scheduling for node " + node +
+            ": " + ex.toString(), ex);
       }
     }
   }
@@ -1018,6 +1000,12 @@ public class FairScheduler extends
 
     @Override
     public int compare(NodeId n1, NodeId n2) {
+      if (!nodes.containsKey(n1)) {
+        return 1;
+      }
+      if (!nodes.containsKey(n2)) {
+        return -1;
+      }
       return RESOURCE_CALCULATOR.compare(clusterResource,
               nodes.get(n2).getAvailableResource(),
               nodes.get(n1).getAvailableResource());
@@ -1135,7 +1123,8 @@ public class FairScheduler extends
       }
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
       break;
     case APP_REMOVED:
       if (!(event instanceof AppRemovedSchedulerEvent)) {
@@ -1153,7 +1142,7 @@ public class FairScheduler extends
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
         appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
-        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+        appAttemptAddedEvent.getIsAttemptRecovering());
       break;
     case APP_ATTEMPT_REMOVED:
       if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
@@ -1241,7 +1230,16 @@ public class FairScheduler extends
           new Runnable() {
             @Override
             public void run() {
-              continuousScheduling();
+              while (!Thread.currentThread().isInterrupted()) {
+                try {
+                  continuousSchedulingAttempt();
+                  Thread.sleep(getContinuousSchedulingSleepMs());
+                } catch (InterruptedException e) {
+                  LOG.error("Continuous scheduling thread interrupted. Exiting. ",
+                      e);
+                  return;
+                }
+              }
             }
           }
       );

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Wed Jul 23 01:47:28 2014
@@ -65,6 +65,7 @@ public class FairSharePolicy extends Sch
   private static class FairShareComparator implements Comparator<Schedulable>,
       Serializable {
     private static final long serialVersionUID = 5564969375856699313L;
+    private static final Resource ONE = Resources.createResource(1);
 
     @Override
     public int compare(Schedulable s1, Schedulable s2) {
@@ -78,11 +79,10 @@ public class FairSharePolicy extends Sch
           s1.getResourceUsage(), minShare1);
       boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
           s2.getResourceUsage(), minShare2);
-      Resource one = Resources.createResource(1);
       minShareRatio1 = (double) s1.getResourceUsage().getMemory()
-          / Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory();
+          / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
       minShareRatio2 = (double) s2.getResourceUsage().getMemory()
-          / Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory();
+          / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();
       useToWeightRatio1 = s1.getResourceUsage().getMemory() /
           s1.getWeights().getWeight(ResourceType.MEMORY);
       useToWeightRatio2 = s2.getResourceUsage().getMemory() /

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Jul 23 01:47:28 2014
@@ -66,7 +66,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@@ -356,22 +355,28 @@ public class FifoScheduler extends
 
   @VisibleForTesting
   public synchronized void addApplication(ApplicationId applicationId,
-      String queue, String user) {
+      String queue, String user, boolean isAppRecovering) {
     SchedulerApplication<FiCaSchedulerApp> application =
         new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
     applications.put(applicationId, application);
     metrics.submitApp(user);
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", currently num of applications: " + applications.size());
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   @VisibleForTesting
   public synchronized void
       addApplicationAttempt(ApplicationAttemptId appAttemptId,
           boolean transferStateFromPreviousAttempt,
-          boolean shouldNotifyAttemptAdded) {
+          boolean isAttemptRecovering) {
     SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(appAttemptId.getApplicationId());
     String user = application.getUser();
@@ -389,14 +394,15 @@ public class FifoScheduler extends
     metrics.submitAppAttempt(user);
     LOG.info("Added Application Attempt " + appAttemptId
         + " to scheduler from user " + application.getUser());
-    if (shouldNotifyAttemptAdded) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptEvent(appAttemptId,
-              RMAppAttemptEventType.ATTEMPT_ADDED));
-    } else {
+    if (isAttemptRecovering) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+        LOG.debug(appAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
       }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(appAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     }
   }
 
@@ -772,7 +778,8 @@ public class FifoScheduler extends
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
     }
     break;
     case APP_REMOVED:
@@ -788,7 +795,7 @@ public class FifoScheduler extends
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
         appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
-        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+        appAttemptAddedEvent.getIsAttemptRecovering());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -823,23 +830,6 @@ public class FifoScheduler extends
     }
   }
 
-  private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
-    // Get the application for the finished container
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      // Some unknown container sneaked into the system. Kill it.
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-
-      return;
-    }
-    
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
   @Lock(FifoScheduler.class)
   private synchronized void containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java Wed Jul 23 01:47:28 2014
@@ -19,22 +19,28 @@
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-
-import javax.crypto.SecretKey;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * AMRM-tokens are per ApplicationAttempt. If users redistribute their
@@ -49,40 +55,66 @@ public class AMRMTokenSecretManager exte
   private static final Log LOG = LogFactory
     .getLog(AMRMTokenSecretManager.class);
 
-  private SecretKey masterKey;
+  private int serialNo = new SecureRandom().nextInt();
+  private MasterKeyData nextMasterKey;
+  private MasterKeyData currentMasterKey;
+
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
   private final Timer timer;
   private final long rollingInterval;
+  private final long activationDelay;
 
-  private final Map<ApplicationAttemptId, byte[]> passwords =
-      new HashMap<ApplicationAttemptId, byte[]>();
+  private final Set<ApplicationAttemptId> appAttemptSet =
+      new HashSet<ApplicationAttemptId>();
 
   /**
    * Create an {@link AMRMTokenSecretManager}
    */
   public AMRMTokenSecretManager(Configuration conf) {
-    rollMasterKey();
     this.timer = new Timer();
     this.rollingInterval =
         conf
           .getLong(
             YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
             YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
+    // Adding delay = 1.5 * expiry interval makes sure that all active AMs get
+    // the updated shared-key.
+    this.activationDelay =
+        (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
+    LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
+        + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + " ms");
+    if (rollingInterval <= activationDelay * 2) {
+      throw new IllegalArgumentException(
+          YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
+              + " should be more than 2 X "
+              + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
+    }
   }
 
   public void start() {
-    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), 0, rollingInterval);
+    if (this.currentMasterKey == null) {
+      this.currentMasterKey = createNewMasterKey();
+    }
+    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
+      rollingInterval);
   }
 
   public void stop() {
     this.timer.cancel();
   }
 
-  public synchronized void applicationMasterFinished(
-      ApplicationAttemptId appAttemptId) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Application finished, removing password for " + appAttemptId);
+  public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Application finished, removing password for " + appAttemptId);
+      this.appAttemptSet.remove(appAttemptId);
+    } finally {
+      this.writeLock.unlock();
     }
-    this.passwords.remove(appAttemptId);
   }
 
   private class MasterKeyRoller extends TimerTask {
@@ -93,49 +125,89 @@ public class AMRMTokenSecretManager exte
   }
 
   @Private
-  public synchronized void setMasterKey(SecretKey masterKey) {
-    this.masterKey = masterKey;
+  void rollMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Rolling master-key for amrm-tokens");
+      this.nextMasterKey = createNewMasterKey();
+      this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
-  @Private
-  public synchronized SecretKey getMasterKey() {
-    return this.masterKey;
+  private class NextKeyActivator extends TimerTask {
+    @Override
+    public void run() {
+      activateNextMasterKey();
+    }
+  }
+
+  public void activateNextMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Activating next master key with id: "
+          + this.nextMasterKey.getMasterKey().getKeyId());
+      this.currentMasterKey = this.nextMasterKey;
+      this.nextMasterKey = null;
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
   @Private
-  synchronized void rollMasterKey() {
-    LOG.info("Rolling master-key for amrm-tokens");
-    this.masterKey = generateSecret();
+  @VisibleForTesting
+  public MasterKeyData createNewMasterKey() {
+    this.writeLock.lock();
+    try {
+      return new MasterKeyData(serialNo++, generateSecret());
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
-  /**
-   * Create a password for a given {@link AMRMTokenIdentifier}. Used to
-   * send to the AppicationAttempt which can give it back during authentication.
-   */
-  @Override
-  public synchronized byte[] createPassword(
-      AMRMTokenIdentifier identifier) {
-    ApplicationAttemptId applicationAttemptId =
-        identifier.getApplicationAttemptId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Creating password for " + applicationAttemptId);
-    }
-    byte[] password = createPassword(identifier.getBytes(), masterKey);
-    this.passwords.put(applicationAttemptId, password);
-    return password;
+  public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
+      ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
+      AMRMTokenIdentifier identifier =
+          new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey()
+            .getKeyId());
+      byte[] password = this.createPassword(identifier);
+      appAttemptSet.add(appAttemptId);
+      return new Token<AMRMTokenIdentifier>(identifier.getBytes(), password,
+        identifier.getKind(), new Text());
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  // If nextMasterKey is not Null, then return nextMasterKey
+  // otherwise return currentMasterKey
+  @VisibleForTesting
+  public MasterKeyData getMasterKey() {
+    this.readLock.lock();
+    try {
+      return nextMasterKey == null ? currentMasterKey : nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   /**
    * Populate persisted password of AMRMToken back to AMRMTokenSecretManager.
    */
-  public synchronized void
-      addPersistedPassword(Token<AMRMTokenIdentifier> token) throws IOException {
-    AMRMTokenIdentifier identifier = token.decodeIdentifier();
-    if (LOG.isDebugEnabled()) {
+  public void addPersistedPassword(Token<AMRMTokenIdentifier> token)
+      throws IOException {
+    this.writeLock.lock();
+    try {
+      AMRMTokenIdentifier identifier = token.decodeIdentifier();
       LOG.debug("Adding password for " + identifier.getApplicationAttemptId());
+      appAttemptSet.add(identifier.getApplicationAttemptId());
+    } finally {
+      this.writeLock.unlock();
     }
-    this.passwords.put(identifier.getApplicationAttemptId(),
-      token.getPassword());
   }
 
   /**
@@ -143,19 +215,35 @@ public class AMRMTokenSecretManager exte
    * Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}.
    */
   @Override
-  public synchronized byte[] retrievePassword(
-      AMRMTokenIdentifier identifier) throws InvalidToken {
-    ApplicationAttemptId applicationAttemptId =
-        identifier.getApplicationAttemptId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Trying to retrieve password for " + applicationAttemptId);
-    }
-    byte[] password = this.passwords.get(applicationAttemptId);
-    if (password == null) {
-      throw new InvalidToken("Password not found for ApplicationAttempt "
-          + applicationAttemptId);
+  public byte[] retrievePassword(AMRMTokenIdentifier identifier)
+      throws InvalidToken {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to retrieve password for " + applicationAttemptId);
+      }
+      if (!appAttemptSet.contains(applicationAttemptId)) {
+        throw new InvalidToken("Password not found for ApplicationAttempt "
+            + applicationAttemptId);
+      }
+      if (identifier.getKeyId() == this.currentMasterKey.getMasterKey()
+        .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+          this.currentMasterKey.getSecretKey());
+      } else if (nextMasterKey != null
+          && identifier.getKeyId() == this.nextMasterKey.getMasterKey()
+            .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+          this.nextMasterKey.getSecretKey());
+      }
+      throw new InvalidToken("Given AMRMToken for application : "
+          + applicationAttemptId.toString()
+          + " seems to have been generated illegally.");
+    } finally {
+      this.readLock.unlock();
     }
-    return password;
   }
 
   /**
@@ -167,4 +255,40 @@ public class AMRMTokenSecretManager exte
     return new AMRMTokenIdentifier();
   }
 
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getCurrnetMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.currentMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getNextMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  @Private
+  protected byte[] createPassword(AMRMTokenIdentifier identifier) {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      LOG.info("Creating password for " + applicationAttemptId);
+      return createPassword(identifier.getBytes(), getMasterKey()
+        .getSecretKey());
+    } finally {
+      this.readLock.unlock();
+    }
+  }
 }

Modified: hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java Wed Jul 23 01:47:28 2014
@@ -29,8 +29,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -193,4 +195,14 @@ public class RMDelegationTokenSecretMana
       addPersistedDelegationToken(entry.getKey(), entry.getValue());
     }
   }
+
+  public long getRenewDate(RMDelegationTokenIdentifier ident)
+      throws InvalidToken {
+    DelegationTokenInformation info = currentTokens.get(ident);
+    if (info == null) {
+      throw new InvalidToken("token (" + ident.toString()
+          + ") can't be found in cache");
+    }
+    return info.getRenewDate();
+  }
 }



Mime
View raw message