hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject svn commit: r1613514 [4/5] - in /hadoop/common/branches/YARN-1051/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/appl...
Date Fri, 25 Jul 2014 20:33:29 GMT
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java Fri Jul 25 20:33:09 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.AccessControlException;
 import java.nio.ByteBuffer;
+import java.security.Principal;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.Collection;
@@ -36,6 +37,7 @@ import java.util.concurrent.ConcurrentMa
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
@@ -57,6 +59,8 @@ import org.apache.hadoop.io.DataOutputBu
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -67,6 +71,13 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -85,6 +96,7 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
@@ -109,6 +121,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FifoSchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
@@ -118,6 +131,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
@@ -139,6 +153,9 @@ public class RMWebServices {
   private final Configuration conf;
   private @Context HttpServletResponse response;
 
+  public final static String DELEGATION_TOKEN_HEADER =
+      "Hadoop-YARN-RM-Delegation-Token";
+
   @Inject
   public RMWebServices(final ResourceManager rm, Configuration conf) {
     this.rm = rm;
@@ -147,11 +164,7 @@ public class RMWebServices {
 
   protected Boolean hasAccess(RMApp app, HttpServletRequest hsr) {
     // Check for the authorization.
-    String remoteUser = hsr.getRemoteUser();
-    UserGroupInformation callerUGI = null;
-    if (remoteUser != null) {
-      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
-    }
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     if (callerUGI != null
         && !(this.rm.getApplicationACLsManager().checkAccess(callerUGI,
               ApplicationAccessType.VIEW_APP, app.getUser(),
@@ -626,7 +639,7 @@ public class RMWebServices {
   public AppState getAppState(@Context HttpServletRequest hsr,
       @PathParam("appid") String appId) throws AuthorizationException {
     init();
-    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     String userName = "";
     if (callerUGI != null) {
       userName = callerUGI.getUserName();
@@ -661,7 +674,7 @@ public class RMWebServices {
       IOException {
 
     init();
-    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     if (callerUGI == null) {
       String msg = "Unable to obtain user name, user not authenticated";
       throw new AuthorizationException(msg);
@@ -771,9 +784,14 @@ public class RMWebServices {
   }
 
   private UserGroupInformation getCallerUserGroupInformation(
-      HttpServletRequest hsr) {
+      HttpServletRequest hsr, boolean usePrincipal) {
 
     String remoteUser = hsr.getRemoteUser();
+    if (usePrincipal) {
+      Principal princ = hsr.getUserPrincipal();
+      remoteUser = princ == null ? null : princ.getName();
+    }
+
     UserGroupInformation callerUGI = null;
     if (remoteUser != null) {
       callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
@@ -799,7 +817,7 @@ public class RMWebServices {
   public Response createNewApplication(@Context HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
     init();
-    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     if (callerUGI == null) {
       throw new AuthorizationException("Unable to obtain user name, "
           + "user not authenticated");
@@ -835,7 +853,7 @@ public class RMWebServices {
       IOException, InterruptedException {
 
     init();
-    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     if (callerUGI == null) {
       throw new AuthorizationException("Unable to obtain user name, "
           + "user not authenticated");
@@ -887,8 +905,8 @@ public class RMWebServices {
       throw new YarnRuntimeException(msg, e);
     }
     NewApplication appId =
-        new NewApplication(resp.getApplicationId().toString(), new ResourceInfo(
-          resp.getMaximumResourceCapability()));
+        new NewApplication(resp.getApplicationId().toString(),
+          new ResourceInfo(resp.getMaximumResourceCapability()));
     return appId;
   }
 
@@ -962,7 +980,8 @@ public class RMWebServices {
    * @throws IOException
    */
   protected ContainerLaunchContext createContainerLaunchContext(
-      ApplicationSubmissionContextInfo newApp) throws BadRequestException, IOException {
+      ApplicationSubmissionContextInfo newApp) throws BadRequestException,
+      IOException {
 
     // create container launch context
 
@@ -1033,4 +1052,238 @@ public class RMWebServices {
     }
     return ret;
   }
+
+  private UserGroupInformation createKerberosUserGroupInformation(
+      HttpServletRequest hsr) throws AuthorizationException, YarnException {
+
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
+    if (callerUGI == null) {
+      String msg = "Unable to obtain user name, user not authenticated";
+      throw new AuthorizationException(msg);
+    }
+
+    String authType = hsr.getAuthType();
+    if (!KerberosAuthenticationHandler.TYPE.equals(authType)) {
+      String msg =
+          "Delegation token operations can only be carried out on a "
+              + "Kerberos authenticated channel";
+      throw new YarnException(msg);
+    }
+
+    callerUGI.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
+    return callerUGI;
+  }
+
+  @POST
+  @Path("/delegation-token")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response postDelegationToken(DelegationToken tokenData,
+      @Context HttpServletRequest hsr) throws AuthorizationException,
+      IOException, InterruptedException, Exception {
+
+    init();
+    UserGroupInformation callerUGI;
+    try {
+      callerUGI = createKerberosUserGroupInformation(hsr);
+    } catch (YarnException ye) {
+      return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
+    }
+    return createDelegationToken(tokenData, hsr, callerUGI);
+  }
+
+  @POST
+  @Path("/delegation-token/expiration")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response
+      postDelegationTokenExpiration(@Context HttpServletRequest hsr)
+          throws AuthorizationException, IOException, InterruptedException,
+          Exception {
+
+    init();
+    UserGroupInformation callerUGI;
+    try {
+      callerUGI = createKerberosUserGroupInformation(hsr);
+    } catch (YarnException ye) {
+      return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
+    }
+
+    DelegationToken requestToken = new DelegationToken();
+    requestToken.setToken(extractToken(hsr).encodeToUrlString());
+    return renewDelegationToken(requestToken, hsr, callerUGI);
+  }
+
+  private Response createDelegationToken(DelegationToken tokenData,
+      HttpServletRequest hsr, UserGroupInformation callerUGI)
+      throws AuthorizationException, IOException, InterruptedException,
+      Exception {
+
+    final String renewer = tokenData.getRenewer();
+    GetDelegationTokenResponse resp;
+    try {
+      resp =
+          callerUGI
+            .doAs(new PrivilegedExceptionAction<GetDelegationTokenResponse>() {
+              @Override
+              public GetDelegationTokenResponse run() throws IOException,
+                  YarnException {
+                GetDelegationTokenRequest createReq =
+                    GetDelegationTokenRequest.newInstance(renewer);
+                return rm.getClientRMService().getDelegationToken(createReq);
+              }
+            });
+    } catch (Exception e) {
+      LOG.info("Create delegation token request failed", e);
+      throw e;
+    }
+
+    Token<RMDelegationTokenIdentifier> tk =
+        new Token<RMDelegationTokenIdentifier>(resp.getRMDelegationToken()
+          .getIdentifier().array(), resp.getRMDelegationToken().getPassword()
+          .array(), new Text(resp.getRMDelegationToken().getKind()), new Text(
+          resp.getRMDelegationToken().getService()));
+    RMDelegationTokenIdentifier identifier = tk.decodeIdentifier();
+    long currentExpiration =
+        rm.getRMContext().getRMDelegationTokenSecretManager()
+          .getRenewDate(identifier);
+    DelegationToken respToken =
+        new DelegationToken(tk.encodeToUrlString(), renewer, identifier
+          .getOwner().toString(), tk.getKind().toString(), currentExpiration,
+          identifier.getMaxDate());
+    return Response.status(Status.OK).entity(respToken).build();
+  }
+
+  private Response renewDelegationToken(DelegationToken tokenData,
+      HttpServletRequest hsr, UserGroupInformation callerUGI)
+      throws AuthorizationException, IOException, InterruptedException,
+      Exception {
+
+    Token<RMDelegationTokenIdentifier> token =
+        extractToken(tokenData.getToken());
+
+    org.apache.hadoop.yarn.api.records.Token dToken =
+        BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind()
+          .toString(), token.getPassword(), token.getService().toString());
+    final RenewDelegationTokenRequest req =
+        RenewDelegationTokenRequest.newInstance(dToken);
+
+    RenewDelegationTokenResponse resp;
+    try {
+      resp =
+          callerUGI
+            .doAs(new PrivilegedExceptionAction<RenewDelegationTokenResponse>() {
+              @Override
+              public RenewDelegationTokenResponse run() throws IOException,
+                  YarnException {
+                return rm.getClientRMService().renewDelegationToken(req);
+              }
+            });
+    } catch (UndeclaredThrowableException ue) {
+      if (ue.getCause() instanceof YarnException) {
+        if (ue.getCause().getCause() instanceof InvalidToken) {
+          throw new BadRequestException(ue.getCause().getCause().getMessage());
+        } else if (ue.getCause().getCause() instanceof org.apache.hadoop.security.AccessControlException) {
+          return Response.status(Status.FORBIDDEN)
+            .entity(ue.getCause().getCause().getMessage()).build();
+        }
+        LOG.info("Renew delegation token request failed", ue);
+        throw ue;
+      }
+      LOG.info("Renew delegation token request failed", ue);
+      throw ue;
+    } catch (Exception e) {
+      LOG.info("Renew delegation token request failed", e);
+      throw e;
+    }
+    long renewTime = resp.getNextExpirationTime();
+
+    DelegationToken respToken = new DelegationToken();
+    respToken.setNextExpirationTime(renewTime);
+    return Response.status(Status.OK).entity(respToken).build();
+  }
+
+  // For cancelling tokens, the encoded token is passed as a header
+  // There are two reasons for this -
+  // 1. Passing a request body as part of a DELETE request is not
+  // allowed by Jetty
+  // 2. Passing the encoded token as part of the url is not ideal
+  // since urls tend to get logged and anyone with access to
+  // the logs can extract tokens which are meant to be secret
+  @DELETE
+  @Path("/delegation-token")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response cancelDelegationToken(@Context HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException,
+      Exception {
+
+    init();
+    UserGroupInformation callerUGI;
+    try {
+      callerUGI = createKerberosUserGroupInformation(hsr);
+    } catch (YarnException ye) {
+      return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
+    }
+
+    Token<RMDelegationTokenIdentifier> token = extractToken(hsr);
+
+    org.apache.hadoop.yarn.api.records.Token dToken =
+        BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind()
+          .toString(), token.getPassword(), token.getService().toString());
+    final CancelDelegationTokenRequest req =
+        CancelDelegationTokenRequest.newInstance(dToken);
+
+    try {
+      callerUGI
+        .doAs(new PrivilegedExceptionAction<CancelDelegationTokenResponse>() {
+          @Override
+          public CancelDelegationTokenResponse run() throws IOException,
+              YarnException {
+            return rm.getClientRMService().cancelDelegationToken(req);
+          }
+        });
+    } catch (UndeclaredThrowableException ue) {
+      if (ue.getCause() instanceof YarnException) {
+        if (ue.getCause().getCause() instanceof InvalidToken) {
+          throw new BadRequestException(ue.getCause().getCause().getMessage());
+        } else if (ue.getCause().getCause() instanceof org.apache.hadoop.security.AccessControlException) {
+          return Response.status(Status.FORBIDDEN)
+            .entity(ue.getCause().getCause().getMessage()).build();
+        }
+        LOG.info("Renew delegation token request failed", ue);
+        throw ue;
+      }
+      LOG.info("Renew delegation token request failed", ue);
+      throw ue;
+    } catch (Exception e) {
+      LOG.info("Renew delegation token request failed", e);
+      throw e;
+    }
+
+    return Response.status(Status.OK).build();
+  }
+
+  private Token<RMDelegationTokenIdentifier> extractToken(
+      HttpServletRequest request) {
+    String encodedToken = request.getHeader(DELEGATION_TOKEN_HEADER);
+    if (encodedToken == null) {
+      String msg =
+          "Header '" + DELEGATION_TOKEN_HEADER
+              + "' containing encoded token not found";
+      throw new BadRequestException(msg);
+    }
+    return extractToken(encodedToken);
+  }
+
+  private Token<RMDelegationTokenIdentifier> extractToken(String encodedToken) {
+    Token<RMDelegationTokenIdentifier> token =
+        new Token<RMDelegationTokenIdentifier>();
+    try {
+      token.decodeFromUrlString(encodedToken);
+    } catch (Exception ie) {
+      String msg = "Could not decode encoded token";
+      throw new BadRequestException(msg);
+    }
+    return token;
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Fri Jul 25 20:33:09 2014
@@ -232,20 +232,7 @@ public class TestApplicationCleanup {
     containerStatuses.put(app.getApplicationId(), containerStatusList);
 
     NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
-    dispatcher.await();
-    List<ContainerId> contsToClean = resp.getContainersToCleanup();
-    int cleanedConts = contsToClean.size();
-    waitCount = 0;
-    while (cleanedConts < 1 && waitCount++ < 200) {
-      LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
-      Thread.sleep(100);
-      resp = nm1.nodeHeartbeat(true);
-      dispatcher.await();
-      contsToClean = resp.getContainersToCleanup();
-      cleanedConts += contsToClean.size();
-    }
-    LOG.info("Got cleanup for " + contsToClean.get(0));
-    Assert.assertEquals(1, cleanedConts);
+    waitForContainerCleanup(dispatcher, nm1, resp);
 
     // Now to test the case when RM already gave cleanup, and NM suddenly
     // realizes that the container is running.
@@ -258,26 +245,36 @@ public class TestApplicationCleanup {
     containerStatuses.put(app.getApplicationId(), containerStatusList);
 
     resp = nm1.nodeHeartbeat(containerStatuses, true);
-    dispatcher.await();
-    contsToClean = resp.getContainersToCleanup();
-    cleanedConts = contsToClean.size();
     // The cleanup list won't be instantaneous as it is given out by scheduler
     // and not RMNodeImpl.
-    waitCount = 0;
-    while (cleanedConts < 1 && waitCount++ < 200) {
-      LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
-      Thread.sleep(100);
-      resp = nm1.nodeHeartbeat(true);
+    waitForContainerCleanup(dispatcher, nm1, resp);
+
+    rm.stop();
+  }
+
+  protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm,
+      NodeHeartbeatResponse resp) throws Exception {
+    int waitCount = 0, cleanedConts = 0;
+    List<ContainerId> contsToClean;
+    do {
       dispatcher.await();
       contsToClean = resp.getContainersToCleanup();
       cleanedConts += contsToClean.size();
+      if (cleanedConts >= 1) {
+        break;
+      }
+      Thread.sleep(100);
+      resp = nm.nodeHeartbeat(true);
+    } while(waitCount++ < 200);
+
+    if (contsToClean.isEmpty()) {
+      LOG.error("Failed to get any containers to cleanup");
+    } else {
+      LOG.info("Got cleanup for " + contsToClean.get(0));
     }
-    LOG.info("Got cleanup for " + contsToClean.get(0));
     Assert.assertEquals(1, cleanedConts);
-
-    rm.stop();
   }
-  
+
   private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId)
       throws Exception {
     while (true) {
@@ -400,6 +397,58 @@ public class TestApplicationCleanup {
     rm2.stop();
   }
 
+  @SuppressWarnings("resource")
+  @Test (timeout = 60000)
+  public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws
+      Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm1 = new MockRM(conf, memStore) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING);
+    rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+
+    // start new RM
+    final DrainDispatcher dispatcher2 = new DrainDispatcher();
+    MockRM rm2 = new MockRM(conf, memStore) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher2;
+      }
+    };
+    rm2.start();
+
+    // nm1 register to rm2, and do a heartbeat
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm1.registerNode(Arrays.asList(app0.getApplicationId()));
+    rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+
+    // Add unknown container for application unknown to scheduler
+    NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0
+        .getApplicationAttemptId(), 2, ContainerState.RUNNING);
+
+    waitForContainerCleanup(dispatcher2, nm1, response);
+
+    rm1.stop();
+    rm2.stop();
+  }
+
   public static void main(String[] args) throws Exception {
     TestApplicationCleanup t = new TestApplicationCleanup();
     t.testAppCleanup();

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Fri Jul 25 20:33:09 2014
@@ -228,7 +228,7 @@ public class TestFifoScheduler {
     scheduler.handle(new NodeAddedSchedulerEvent(node));
 
     ApplicationId appId = ApplicationId.newInstance(0, 1);
-    scheduler.addApplication(appId, "queue1", "user1");
+    scheduler.addApplication(appId, "queue1", "user1", false);
 
     NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
     try {
@@ -238,7 +238,7 @@ public class TestFifoScheduler {
     }
 
     ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1);
-    scheduler.addApplicationAttempt(attId, false, true);
+    scheduler.addApplicationAttempt(attId, false, false);
 
     rm.stop();
   }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Fri Jul 25 20:33:09 2014
@@ -1250,10 +1250,11 @@ public class TestRMRestart {
             .getEncoded());
 
     // assert AMRMTokenSecretManager also knows about the AMRMToken password
-    Token<AMRMTokenIdentifier> amrmToken = loadedAttempt1.getAMRMToken();
-    Assert.assertArrayEquals(amrmToken.getPassword(),
-      rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
-        amrmToken.decodeIdentifier()));
+    // TODO: fix this on YARN-2211
+//    Token<AMRMTokenIdentifier> amrmToken = loadedAttempt1.getAMRMToken();
+//    Assert.assertArrayEquals(amrmToken.getPassword(),
+//      rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
+//        amrmToken.decodeIdentifier()));
     rm1.stop();
     rm2.stop();
   }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Fri Jul 25 20:33:09 2014
@@ -29,11 +29,13 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -44,6 +46,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -164,6 +167,11 @@ public class TestWorkPreservingRMRestart
 
     // Wait for RM to settle down on recovering containers;
     waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
+    Set<ContainerId> launchedContainers =
+        ((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId()))
+          .getLaunchedContainers();
+    assertTrue(launchedContainers.contains(amContainer.getContainerId()));
+    assertTrue(launchedContainers.contains(runningContainer.getContainerId()));
 
     // check RMContainers are re-recreated and the container state is correct.
     rm2.waitForState(nm1, amContainer.getContainerId(),
@@ -602,6 +610,36 @@ public class TestWorkPreservingRMRestart
         attempt0.getMasterContainer().getId()).isAMContainer());
   }
 
+  @Test (timeout = 20000)
+  public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception {
+    // start RM
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
+
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    // scheduler app/attempt is immediately available after RM is re-started.
+    Assert.assertNotNull(rm2.getResourceScheduler().getSchedulerAppInfo(
+      am0.getApplicationAttemptId()));
+
+    // getTransferredContainers should not throw NPE.
+    ((AbstractYarnScheduler) rm2.getResourceScheduler())
+      .getTransferredContainers(am0.getApplicationAttemptId());
+
+    List<NMContainerStatus> containers = createNMContainerStatusForApp(am0);
+    nm1.registerNode(containers, null);
+    waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
+  }
 
   private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
       int appsPending, int appsRunning, int appsCompleted,

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Fri Jul 25 20:33:09 2014
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -34,7 +35,6 @@ import java.util.Map;
 import javax.crypto.SecretKey;
 
 import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class RMStateStoreTestBase extends ClientBaseWithFixes{
@@ -175,8 +176,11 @@ public class RMStateStoreTestBase extend
     TestDispatcher dispatcher = new TestDispatcher();
     store.setRMDispatcher(dispatcher);
 
-    AMRMTokenSecretManager appTokenMgr =
-        new AMRMTokenSecretManager(conf);
+    AMRMTokenSecretManager appTokenMgr = spy(
+        new AMRMTokenSecretManager(conf));
+    MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
+    when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
+
     ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
         new ClientToAMTokenSecretManagerInRM();
 
@@ -455,10 +459,8 @@ public class RMStateStoreTestBase extend
   private Token<AMRMTokenIdentifier> generateAMRMToken(
       ApplicationAttemptId attemptId,
       AMRMTokenSecretManager appTokenMgr) {
-    AMRMTokenIdentifier appTokenId =
-        new AMRMTokenIdentifier(attemptId);
     Token<AMRMTokenIdentifier> appToken =
-        new Token<AMRMTokenIdentifier>(appTokenId, appTokenMgr);
+        appTokenMgr.createAndGetAMRMToken(attemptId);
     appToken.setService(new Text("appToken service"));
     return appToken;
   }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Fri Jul 25 20:33:09 2014
@@ -97,6 +97,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@@ -224,6 +225,8 @@ public class TestRMAppAttemptTransitions
     amLivelinessMonitor = mock(AMLivelinessMonitor.class);
     amFinishingMonitor = mock(AMLivelinessMonitor.class);
     writer = mock(RMApplicationHistoryWriter.class);
+    MasterKeyData masterKeyData = amRMTokenManager.createNewMasterKey();
+    when(amRMTokenManager.getMasterKey()).thenReturn(masterKeyData);
     rmContext =
         new RMContextImpl(rmDispatcher,
           containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
@@ -820,7 +823,9 @@ public class TestRMAppAttemptTransitions
       applicationAttempt.getAppAttemptState());
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
     verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
-    verifyAMCrashAtAllocatedDiagnosticInfo(applicationAttempt.getDiagnostics());
+    boolean shouldCheckURL = (applicationAttempt.getTrackingUrl() != null);
+    verifyAMCrashAtAllocatedDiagnosticInfo(applicationAttempt.getDiagnostics(),
+      exitCode, shouldCheckURL);
   }
   
   @Test
@@ -1238,11 +1243,18 @@ public class TestRMAppAttemptTransitions
     verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
   }
 
-  private void verifyAMCrashAtAllocatedDiagnosticInfo(String diagnostics) {
-    assertTrue("Diagnostic information does not contain application proxy URL",
-      diagnostics.contains(applicationAttempt.getWebProxyBase()));
+  private void verifyAMCrashAtAllocatedDiagnosticInfo(String diagnostics,
+        int exitCode, boolean shouldCheckURL) {
     assertTrue("Diagnostic information does not point the logs to the users",
       diagnostics.contains("logs"));
+    assertTrue("Diagnostic information does not contain application attempt id",
+      diagnostics.contains(applicationAttempt.getAppAttemptId().toString()));
+    assertTrue("Diagnostic information does not contain application exit code",
+      diagnostics.contains("exitCode: " + exitCode));
+    if (shouldCheckURL) {
+      assertTrue("Diagnostic information does not contain application proxy URL",
+      diagnostics.contains(applicationAttempt.getWebProxyBase()));
+    }
   }
 
   private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java Fri Jul 25 20:33:09 2014
@@ -26,6 +26,9 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -36,17 +39,24 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -204,4 +214,36 @@ public class TestRMContainerImpl {
     assertEquals(RMContainerState.RUNNING, rmContainer.getState());
     verify(writer, never()).containerFinished(any(RMContainer.class));
   }
+  
+  @Test
+  public void testExistenceOfResourceRequestInRMContainer() throws Exception {
+    Configuration conf = new Configuration();
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
+    RMApp app1 = rm1.submitApp(1024);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    ResourceScheduler scheduler = rm1.getResourceScheduler();
+
+    // request a container.
+    am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
+    ContainerId containerId2 = ContainerId.newInstance(
+        am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+
+    // Verify whether list of ResourceRequest is present in RMContainer
+    // while moving to ALLOCATED state
+    Assert.assertNotNull(scheduler.getRMContainer(containerId2)
+        .getResourceRequests());
+
+    // Allocate container
+    am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>())
+        .getAllocatedContainers();
+    rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED);
+
+    // After RMContainer moving to ACQUIRED state, list of ResourceRequest will
+    // be empty
+    Assert.assertNull(scheduler.getRMContainer(containerId2)
+        .getResourceRequests());
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Fri Jul 25 20:33:09 2014
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -79,6 +80,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -87,6 +90,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -947,4 +951,67 @@ public class TestCapacityScheduler {
 
     rm1.stop();
   }
+  
+  @Test(timeout = 30000)
+  public void testRecoverRequestAfterPreemption() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
+    RMApp app1 = rm1.submitApp(1024);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    // request a container.
+    am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
+    ContainerId containerId1 = ContainerId.newInstance(
+        am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED);
+
+    RMContainer rmContainer = cs.getRMContainer(containerId1);
+    List<ResourceRequest> requests = rmContainer.getResourceRequests();
+    FiCaSchedulerApp app = cs.getApplicationAttempt(am1
+        .getApplicationAttemptId());
+
+    FiCaSchedulerNode node = cs.getNode(rmContainer.getAllocatedNode());
+    for (ResourceRequest request : requests) {
+      // Skip the OffRack and RackLocal resource requests.
+      if (request.getResourceName().equals(node.getRackName())
+          || request.getResourceName().equals(ResourceRequest.ANY)) {
+        continue;
+      }
+
+      // Already the node local resource request is cleared from RM after
+      // allocation.
+      Assert.assertNull(app.getResourceRequest(request.getPriority(),
+          request.getResourceName()));
+    }
+
+    // Call killContainer to preempt the container
+    cs.killContainer(rmContainer);
+
+    Assert.assertEquals(3, requests.size());
+    for (ResourceRequest request : requests) {
+      // Resource request must have added back in RM after preempt event
+      // handling.
+      Assert.assertEquals(
+          1,
+          app.getResourceRequest(request.getPriority(),
+              request.getResourceName()).getNumContainers());
+    }
+
+    // New container will be allocated and will move to ALLOCATED state
+    ContainerId containerId2 = ContainerId.newInstance(
+        am1.getApplicationAttemptId(), 3);
+    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+
+    // allocate container
+    List<Container> containers = am1.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+
+    // Now with updated ResourceRequest, a container is allocated for AM.
+    Assert.assertTrue(containers.size() == 1);
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java Fri Jul 25 20:33:09 2014
@@ -147,11 +147,11 @@ public class FairSchedulerTestBase {
       int memory, int vcores, String queueId, String userId, int numContainers,
       int priority) {
     ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
-    scheduler.addApplication(id.getApplicationId(), queueId, userId);
+    scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
     // This conditional is for testAclSubmitApplication where app is rejected
     // and no app is added.
     if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
-      scheduler.addApplicationAttempt(id, false, true);
+      scheduler.addApplicationAttempt(id, false, false);
     }
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
@@ -167,6 +167,27 @@ public class FairSchedulerTestBase {
         .put(id.getApplicationId(), rmApp);
     return id;
   }
+  
+  protected ApplicationAttemptId createSchedulingRequest(String queueId,
+      String userId, List<ResourceRequest> ask) {
+    ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
+        this.ATTEMPT_ID++);
+    scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
+    // This conditional is for testAclSubmitApplication where app is rejected
+    // and no app is added.
+    if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
+      scheduler.addApplicationAttempt(id, false, false);
+    }
+    scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
+    RMApp rmApp = mock(RMApp.class);
+    RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
+    when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
+    when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
+        new RMAppAttemptMetrics(id));
+    resourceManager.getRMContext().getRMApps()
+        .put(id.getApplicationId(), rmApp);
+    return id;
+  }
 
   protected void createSchedulingRequestExistingApplication(
        int memory, int priority, ApplicationAttemptId attId) {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Fri Jul 25 20:33:09 2014
@@ -53,10 +53,13 @@ import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
@@ -77,11 +80,13 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -788,14 +793,14 @@ public class TestFairScheduler extends F
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     ApplicationAttemptId id11 = createAppAttemptId(1, 1);
-    scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1");
-    scheduler.addApplicationAttempt(id11, false, true);
+    scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", false);
+    scheduler.addApplicationAttempt(id11, false, false);
     ApplicationAttemptId id21 = createAppAttemptId(2, 1);
-    scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1");
-    scheduler.addApplicationAttempt(id21, false, true);
+    scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1", false);
+    scheduler.addApplicationAttempt(id21, false, false);
     ApplicationAttemptId id22 = createAppAttemptId(2, 2);
-    scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1");
-    scheduler.addApplicationAttempt(id22, false, true);
+    scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1", false);
+    scheduler.addApplicationAttempt(id22, false, false);
 
     int minReqSize = 
         FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
@@ -1216,6 +1221,79 @@ public class TestFairScheduler extends F
         scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
   }
 
+  @Test
+  public void testPreemptionIsNotDelayedToNextRound() throws Exception {
+    conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
+    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
+
+    MockClock clock = new MockClock();
+    scheduler.setClock(clock);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>8</weight>");
+    out.println("<queue name=\"queueA1\" />");
+    out.println("<queue name=\"queueA2\" />");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>2</weight>");
+    out.println("</queue>");
+    out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Add a node of 8G
+    RMNode node1 = MockNodes.newNodeInfo(1,
+        Resources.createResource(8 * 1024, 8), 1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Run apps in queueA.A1 and queueB
+    ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1,
+        "queueA.queueA1", "user1", 7, 1);
+    // createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
+    ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB",
+        "user2", 1, 1);
+
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+    for (int i = 0; i < 8; i++) {
+      scheduler.handle(nodeUpdate1);
+    }
+
+    // verify if the apps got the containers they requested
+    assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+
+    // Now submit an app in queueA.queueA2
+    ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1,
+        "queueA.queueA2", "user3", 7, 1);
+    scheduler.update();
+
+    // Let 11 sec pass
+    clock.tick(11);
+
+    scheduler.update();
+    Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager()
+        .getLeafQueue("queueA.queueA2", false), clock.getTime());
+    assertEquals(2980, toPreempt.getMemory());
+
+    // verify if the 3 containers required by queueA2 are preempted in the same
+    // round
+    scheduler.preemptResources(toPreempt);
+    assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
+        .size());
+  }
+
   @Test (timeout = 5000)
   /**
    * Tests the timing of decision to preempt tasks.
@@ -1556,8 +1634,8 @@ public class TestFairScheduler extends F
     scheduler.handle(nodeEvent2);
     
     ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
-    scheduler.addApplication(appId.getApplicationId(), "queue1", "user1");
-    scheduler.addApplicationAttempt(appId, false, true);
+    scheduler.addApplication(appId.getApplicationId(), "queue1", "user1", false);
+    scheduler.addApplicationAttempt(appId, false, false);
     
     // 1 request with 2 nodes on the same rack. another request with 1 node on
     // a different rack
@@ -1838,7 +1916,7 @@ public class TestFairScheduler extends F
 
     ApplicationAttemptId attId =
         ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
-    scheduler.addApplication(attId.getApplicationId(), queue, user);
+    scheduler.addApplication(attId.getApplicationId(), queue, user, false);
 
     numTries = 0;
     while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
@@ -2715,8 +2793,8 @@ public class TestFairScheduler extends F
     // send application request
     ApplicationAttemptId appAttemptId =
             createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
-    fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11");
-    fs.addApplicationAttempt(appAttemptId, false, true);
+    fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
+    fs.addApplicationAttempt(appAttemptId, false, false);
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     ResourceRequest request =
             createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
@@ -2758,7 +2836,43 @@ public class TestFairScheduler extends F
     Assert.assertEquals(2, nodes.size());
   }
 
-  
+  @Test
+  public void testContinuousSchedulingWithNodeRemoved() throws Exception {
+    // Disable continuous scheduling, will invoke continuous scheduling once manually
+    scheduler.init(conf);
+    scheduler.start();
+    Assert.assertTrue("Continuous scheduling should be disabled.",
+        !scheduler.isContinuousSchedulingEnabled());
+
+    // Add two nodes
+    RMNode node1 =
+        MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
+            "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+    RMNode node2 =
+        MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
+            "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+    Assert.assertEquals("We should have two alive nodes.",
+        2, scheduler.getNumClusterNodes());
+
+    // Remove one node
+    NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1);
+    scheduler.handle(removeNode1);
+    Assert.assertEquals("We should only have one alive node.",
+        1, scheduler.getNumClusterNodes());
+
+    // Invoke the continuous scheduling once
+    try {
+      scheduler.continuousSchedulingAttempt();
+    } catch (Exception e) {
+      fail("Exception happened when doing continuous scheduling. " +
+        e.toString());
+    }
+  }
+
   @Test
   public void testDontAllowUndeclaredPools() throws Exception{
     conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
@@ -2831,6 +2945,87 @@ public class TestFairScheduler extends F
       }
     }
   }
+    
+  @Test(timeout=5000)
+  public void testRecoverRequestAfterPreemption() throws Exception {
+    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
+    
+    MockClock clock = new MockClock();
+    scheduler.setClock(clock);
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    Priority priority = Priority.newInstance(20);
+    String host = "127.0.0.1";
+    int GB = 1024;
+
+    // Create Node and raised Node Added event
+    RMNode node = MockNodes.newNodeInfo(1,
+        Resources.createResource(16 * 1024, 4), 0, host);
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+
+    // Create 3 container requests and place it in ask
+    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+    ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host,
+        priority.getPriority(), 1, true);
+    ResourceRequest rackLocalRequest = createResourceRequest(GB, 1,
+        node.getRackName(), priority.getPriority(), 1, true);
+    ResourceRequest offRackRequest = createResourceRequest(GB, 1,
+        ResourceRequest.ANY, priority.getPriority(), 1, true);
+    ask.add(nodeLocalRequest);
+    ask.add(rackLocalRequest);
+    ask.add(offRackRequest);
+
+    // Create Request and update
+    ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA",
+        "user1", ask);
+    scheduler.update();
+
+    // Sufficient node check-ins to fully schedule containers
+    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(nodeUpdate);
+
+    assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers()
+        .size());
+    FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId);
+
+    // ResourceRequest will be empty once NodeUpdate is completed
+    Assert.assertNull(app.getResourceRequest(priority, host));
+
+    ContainerId containerId1 = ContainerId.newInstance(appAttemptId, 1);
+    RMContainer rmContainer = app.getRMContainer(containerId1);
+
+    // Create a preempt event and register for preemption
+    scheduler.warnOrKillContainer(rmContainer);
+    
+    // Wait for few clock ticks
+    clock.tick(5);
+    
+    // preempt now
+    scheduler.warnOrKillContainer(rmContainer);
+
+    List<ResourceRequest> requests = rmContainer.getResourceRequests();
+    // Once recovered, resource request will be present again in app
+    Assert.assertEquals(3, requests.size());
+    for (ResourceRequest request : requests) {
+      Assert.assertEquals(1,
+          app.getResourceRequest(priority, request.getResourceName())
+              .getNumContainers());
+    }
+
+    // Send node heartbeat
+    scheduler.update();
+    scheduler.handle(nodeUpdate);
+
+    List<Container> containers = scheduler.allocate(appAttemptId,
+        Collections.<ResourceRequest> emptyList(),
+        Collections.<ContainerId> emptyList(), null, null).getContainers();
+
+    // Now with updated ResourceRequest, a container is allocated for AM.
+    Assert.assertTrue(containers.size() == 1);
+  }
   
   @SuppressWarnings("resource")
   @Test

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java Fri Jul 25 20:33:09 2014
@@ -94,7 +94,7 @@ public class TestFairSchedulerPreemption
     scheduler = (FairScheduler)resourceManager.getResourceScheduler();
 
     scheduler.setClock(clock);
-    scheduler.UPDATE_INTERVAL = 60 * 1000;
+    scheduler.updateInterval = 60 * 1000;
   }
 
   private void registerNodeAndSubmitApp(

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java Fri Jul 25 20:33:09 2014
@@ -23,13 +23,12 @@ import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.Collection;
 
-import javax.crypto.SecretKey;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -41,7 +40,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
@@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
@@ -65,6 +67,8 @@ public class TestAMRMTokens {
 
   private final Configuration conf;
   private static final int maxWaitAttempts = 50;
+  private static final int rolling_interval_sec = 13;
+  private static final long am_expire_ms = 4000;
 
   @Parameters
   public static Collection<Object[]> configs() {
@@ -201,15 +205,22 @@ public class TestAMRMTokens {
   @Test
   public void testMasterKeyRollOver() throws Exception {
 
+    conf.setLong(
+      YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+      rolling_interval_sec);
+    conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms);
     MyContainerManager containerManager = new MyContainerManager();
     final MockRMWithAMS rm =
         new MockRMWithAMS(conf, containerManager);
     rm.start();
-
+    Long startTime = System.currentTimeMillis();
     final Configuration conf = rm.getConfig();
     final YarnRPC rpc = YarnRPC.create(conf);
     ApplicationMasterProtocol rmClient = null;
-
+    AMRMTokenSecretManager appTokenSecretManager =
+        rm.getRMContext().getAMRMTokenSecretManager();
+    MasterKeyData oldKey = appTokenSecretManager.getMasterKey();
+    Assert.assertNotNull(oldKey);
     try {
       MockNM nm1 = rm.registerNode("localhost:1234", 5120);
 
@@ -218,7 +229,7 @@ public class TestAMRMTokens {
       nm1.nodeHeartbeat(true);
 
       int waitCount = 0;
-      while (containerManager.containerTokens == null && waitCount++ < 20) {
+      while (containerManager.containerTokens == null && waitCount++ < maxWaitAttempts) {
         LOG.info("Waiting for AM Launch to happen..");
         Thread.sleep(1000);
       }
@@ -250,21 +261,65 @@ public class TestAMRMTokens {
       Assert.assertTrue(
           rmClient.allocate(allocateRequest).getAMCommand() == null);
 
-      // Simulate a master-key-roll-over
-      AMRMTokenSecretManager appTokenSecretManager =
-          rm.getRMContext().getAMRMTokenSecretManager();
-      SecretKey oldKey = appTokenSecretManager.getMasterKey();
-      appTokenSecretManager.rollMasterKey();
-      SecretKey newKey = appTokenSecretManager.getMasterKey();
+      // Wait for enough time and make sure the roll_over happens
+      // At mean time, the old AMRMToken should continue to work
+      while(System.currentTimeMillis() - startTime < rolling_interval_sec*1000) {
+        rmClient.allocate(allocateRequest);
+        Thread.sleep(500);
+      }
+
+      MasterKeyData newKey = appTokenSecretManager.getMasterKey();
+      Assert.assertNotNull(newKey);
       Assert.assertFalse("Master key should have changed!",
         oldKey.equals(newKey));
 
+      // Another allocate call with old AMRMToken. Should continue to work.
+      rpc.stopProxy(rmClient, conf); // To avoid using cached client
+      rmClient = createRMClient(rm, conf, rpc, currentUser);
+      Assert
+        .assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null);
+
+      waitCount = 0;
+      while(waitCount++ <= maxWaitAttempts) {
+        if (appTokenSecretManager.getCurrnetMasterKeyData() != oldKey) {
+          break;
+        }
+        try {
+          rmClient.allocate(allocateRequest);
+        } catch (Exception ex) {
+          break;
+        }
+        Thread.sleep(200);
+      }
+      // active the nextMasterKey, and replace the currentMasterKey
+      Assert.assertTrue(appTokenSecretManager.getCurrnetMasterKeyData().equals(newKey));
+      Assert.assertTrue(appTokenSecretManager.getMasterKey().equals(newKey));
+      Assert.assertTrue(appTokenSecretManager.getNextMasterKeyData() == null);
+
+      // Create a new Token
+      Token<AMRMTokenIdentifier> newToken =
+          appTokenSecretManager.createAndGetAMRMToken(applicationAttemptId);
+      SecurityUtil.setTokenService(newToken, rmBindAddress);
+      currentUser.addToken(newToken);
       // Another allocate call. Should continue to work.
       rpc.stopProxy(rmClient, conf); // To avoid using cached client
       rmClient = createRMClient(rm, conf, rpc, currentUser);
       allocateRequest = Records.newRecord(AllocateRequest.class);
-      Assert.assertTrue(
-          rmClient.allocate(allocateRequest).getAMCommand() == null);
+      Assert
+        .assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null);
+
+      // Should not work by using the old AMRMToken.
+      rpc.stopProxy(rmClient, conf); // To avoid using cached client
+      try {
+        currentUser.addToken(amRMToken);
+        rmClient = createRMClient(rm, conf, rpc, currentUser);
+        allocateRequest = Records.newRecord(AllocateRequest.class);
+        Assert
+          .assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null);
+        Assert.fail("The old Token should not work");
+      } catch (Exception ex) {
+        // expect exception
+      }
     } finally {
       rm.stop();
       if (rmClient != null) {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java Fri Jul 25 20:33:09 2014
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -673,7 +674,40 @@ public class TestDelegationTokenRenewer 
       Thread.sleep(200);
     }
   }
-  
+
+  @Test(timeout=20000)
+  public void testDTRonAppSubmission()
+      throws IOException, InterruptedException, BrokenBarrierException {
+    final Credentials credsx = new Credentials();
+    final Token<?> tokenx = mock(Token.class);
+    credsx.addToken(new Text("token"), tokenx);
+    doReturn(true).when(tokenx).isManaged();
+    doThrow(new IOException("boom"))
+        .when(tokenx).renew(any(Configuration.class));
+      // fire up the renewer
+    final DelegationTokenRenewer dtr =
+         createNewDelegationTokenRenewer(conf, counter);
+    RMContext mockContext = mock(RMContext.class);
+    ClientRMService mockClientRMService = mock(ClientRMService.class);
+    when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+    InetSocketAddress sockAddr =
+        InetSocketAddress.createUnresolved("localhost", 1234);
+    when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
+    dtr.setRMContext(mockContext);
+    when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
+    dtr.init(conf);
+    dtr.start();
+
+    try {
+      dtr.addApplicationSync(mock(ApplicationId.class), credsx, false);
+      fail("Catch IOException on app submission");
+    } catch (IOException e){
+      Assert.assertTrue(e.getMessage().contains(tokenx.toString()));
+      Assert.assertTrue(e.getCause().toString().contains("boom"));
+    }
+
+  }
+
   @Test(timeout=20000)                                                         
   public void testConcurrentAddApplication()                                  
       throws IOException, InterruptedException, BrokenBarrierException {       

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm Fri Jul 25 20:33:09 2014
@@ -205,6 +205,12 @@ Properties that can be placed in yarn-si
       instead. Defaults to true. If a queue placement policy is given in the
       allocations file, this property is ignored.
 
+ * <<<yarn.scheduler.fair.update-interval-ms>>>
+ 
+    * The interval at which to lock the scheduler and recalculate fair shares,
+      recalculate demand, and check whether anything is due for preemption.
+      Defaults to 500 ms. 
+
 Allocation file format
 
   The allocation file must be in XML format. The format contains five types of



Mime
View raw message