hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject svn commit: r1609878 [5/7] - in /hadoop/common/branches/YARN-1051/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ex...
Date Sat, 12 Jul 2014 02:25:04 GMT
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Sat Jul 12 02:24:40 2014
@@ -44,16 +44,17 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.CyclicBarrier;
 
 import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
@@ -138,6 +139,10 @@ public class TestClientRMService {
   
   private final static String QUEUE_1 = "Q-1";
   private final static String QUEUE_2 = "Q-2";
+  private final static String kerberosRule = "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT";
+  static {
+    KerberosName.setRules(kerberosRule);
+  }
   
   @BeforeClass
   public static void setupSecretManager() throws IOException {
@@ -479,6 +484,17 @@ public class TestClientRMService {
       UserGroupInformation.createRemoteUser("owner");
   private static final UserGroupInformation other =
       UserGroupInformation.createRemoteUser("other");
+  private static final UserGroupInformation tester =
+      UserGroupInformation.createRemoteUser("tester");
+  private static final String testerPrincipal = "tester@EXAMPLE.COM";
+  private static final String ownerPrincipal = "owner@EXAMPLE.COM";
+  private static final String otherPrincipal = "other@EXAMPLE.COM";
+  private static final UserGroupInformation testerKerb =
+      UserGroupInformation.createRemoteUser(testerPrincipal);
+  private static final UserGroupInformation ownerKerb =
+      UserGroupInformation.createRemoteUser(ownerPrincipal);
+  private static final UserGroupInformation otherKerb =
+      UserGroupInformation.createRemoteUser(otherPrincipal);
   
   @Test
   public void testTokenRenewalByOwner() throws Exception {
@@ -546,6 +562,147 @@ public class TestClientRMService {
     rmService.renewDelegationToken(request);
   }
 
+  @Test
+  public void testTokenCancellationByOwner() throws Exception {
+    // two tests required - one with a kerberos name
+    // and with a short name
+    RMContext rmContext = mock(RMContext.class);
+    final ClientRMService rmService =
+        new ClientRMService(rmContext, null, null, null, null, dtsm);
+    testerKerb.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        checkTokenCancellation(rmService, testerKerb, other);
+        return null;
+      }
+    });
+    owner.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        checkTokenCancellation(owner, other);
+        return null;
+      }
+    });
+  }
+
+  @Test
+  public void testTokenCancellationByRenewer() throws Exception {
+    // two tests required - one with a kerberos name
+    // and with a short name
+    RMContext rmContext = mock(RMContext.class);
+    final ClientRMService rmService =
+        new ClientRMService(rmContext, null, null, null, null, dtsm);
+    testerKerb.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        checkTokenCancellation(rmService, owner, testerKerb);
+        return null;
+      }
+    });
+    other.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        checkTokenCancellation(owner, other);
+        return null;
+      }
+    });
+  }
+
+  @Test
+  public void testTokenCancellationByWrongUser() {
+    // two sets to test -
+    // 1. try to cancel tokens of short and kerberos users as a kerberos UGI
+    // 2. try to cancel tokens of short and kerberos users as a simple auth UGI
+
+    RMContext rmContext = mock(RMContext.class);
+    final ClientRMService rmService =
+        new ClientRMService(rmContext, null, null, null, null, dtsm);
+    UserGroupInformation[] kerbTestOwners =
+        { owner, other, tester, ownerKerb, otherKerb };
+    UserGroupInformation[] kerbTestRenewers =
+        { owner, other, ownerKerb, otherKerb };
+    for (final UserGroupInformation tokOwner : kerbTestOwners) {
+      for (final UserGroupInformation tokRenewer : kerbTestRenewers) {
+        try {
+          testerKerb.doAs(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+              try {
+                checkTokenCancellation(rmService, tokOwner, tokRenewer);
+                Assert.fail("We should not reach here; token owner = "
+                    + tokOwner.getUserName() + ", renewer = "
+                    + tokRenewer.getUserName());
+                return null;
+              } catch (YarnException e) {
+                Assert.assertTrue(e.getMessage().contains(
+                  testerKerb.getUserName()
+                      + " is not authorized to cancel the token"));
+                return null;
+              }
+            }
+          });
+        } catch (Exception e) {
+          Assert.fail("Unexpected exception; " + e.getMessage());
+        }
+      }
+    }
+
+    UserGroupInformation[] simpleTestOwners =
+        { owner, other, ownerKerb, otherKerb, testerKerb };
+    UserGroupInformation[] simpleTestRenewers =
+        { owner, other, ownerKerb, otherKerb };
+    for (final UserGroupInformation tokOwner : simpleTestOwners) {
+      for (final UserGroupInformation tokRenewer : simpleTestRenewers) {
+        try {
+          tester.doAs(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+              try {
+                checkTokenCancellation(tokOwner, tokRenewer);
+                Assert.fail("We should not reach here; token owner = "
+                    + tokOwner.getUserName() + ", renewer = "
+                    + tokRenewer.getUserName());
+                return null;
+              } catch (YarnException ex) {
+                Assert.assertTrue(ex.getMessage().contains(
+                  tester.getUserName()
+                      + " is not authorized to cancel the token"));
+                return null;
+              }
+            }
+          });
+        } catch (Exception e) {
+          Assert.fail("Unexpected exception; " + e.getMessage());
+        }
+      }
+    }
+  }
+
+  private void checkTokenCancellation(UserGroupInformation owner,
+      UserGroupInformation renewer) throws IOException, YarnException {
+    RMContext rmContext = mock(RMContext.class);
+    final ClientRMService rmService =
+        new ClientRMService(rmContext, null, null, null, null, dtsm);
+    checkTokenCancellation(rmService, owner, renewer);
+  }
+
+  private void checkTokenCancellation(ClientRMService rmService,
+      UserGroupInformation owner, UserGroupInformation renewer)
+      throws IOException, YarnException {
+    RMDelegationTokenIdentifier tokenIdentifier =
+        new RMDelegationTokenIdentifier(new Text(owner.getUserName()),
+          new Text(renewer.getUserName()), null);
+    Token<?> token =
+        new Token<RMDelegationTokenIdentifier>(tokenIdentifier, dtsm);
+    org.apache.hadoop.yarn.api.records.Token dToken =
+        BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind()
+          .toString(), token.getPassword(), token.getService().toString());
+    CancelDelegationTokenRequest request =
+        Records.newRecord(CancelDelegationTokenRequest.class);
+    request.setDelegationToken(dToken);
+    rmService.cancelDelegationToken(request);
+  }
+
   @Test (timeout = 30000)
   @SuppressWarnings ("rawtypes")
   public void testAppSubmit() throws Exception {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Sat Jul 12 02:24:40 2014
@@ -238,7 +238,7 @@ public class TestFifoScheduler {
     }
 
     ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1);
-    scheduler.addApplicationAttempt(attId, false);
+    scheduler.addApplicationAttempt(attId, false, true);
 
     rm.stop();
   }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Sat Jul 12 02:24:40 2014
@@ -28,6 +28,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.junit.After;
 import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
@@ -78,7 +81,14 @@ public class TestRM {
 
   // Milliseconds to sleep for when waiting for something to happen
   private final static int WAIT_SLEEP_MS = 100;
-  
+
+  @After
+  public void tearDown() {
+    ClusterMetrics.destroy();
+    QueueMetrics.clearQueueMetrics();
+    DefaultMetricsSystem.shutdown();
+  }
+
   @Test
   public void testGetNewAppId() throws Exception {
     Logger rootLogger = LogManager.getRootLogger();

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java Sat Jul 12 02:24:40 2014
@@ -96,10 +96,7 @@ public class TestRMHA {
     configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
     ClusterMetrics.destroy();
     QueueMetrics.clearQueueMetrics();
-    MetricsSystem ms = DefaultMetricsSystem.instance();
-    if (ms.getSource("ClusterMetrics") != null) {
-      DefaultMetricsSystem.shutdown();
-    }
+    DefaultMetricsSystem.shutdown();
   }
 
   private void checkMonitorHealth() throws IOException {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Sat Jul 12 02:24:40 2014
@@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -292,7 +293,7 @@ public class TestRMRestart {
     AllocateResponse allocResponse = am1.allocate(
         new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>());
-    Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC);
+    Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand());
     
     // NM should be rebooted on heartbeat, even first heartbeat for nm2
     NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@@ -1647,7 +1648,7 @@ public class TestRMRestart {
     rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
     MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
     am1.registerAppAttempt();
-    am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>()); 
+    am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
     nm1.nodeHeartbeat(true);
     List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>()).getAllocatedContainers();
@@ -1851,7 +1852,8 @@ public class TestRMRestart {
     ContainerId containerId = ContainerId.newInstance(appAttemptId, id);
     NMContainerStatus containerReport =
         NMContainerStatus.newInstance(containerId, containerState,
-          Resource.newInstance(1024, 1), "recover container", 0);
+          Resource.newInstance(1024, 1), "recover container", 0,
+          Priority.newInstance(0), 0);
     return containerReport;
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java Sat Jul 12 02:24:40 2014
@@ -18,11 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 
@@ -33,12 +36,12 @@ import org.apache.hadoop.metrics2.lib.De
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -57,17 +60,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-
 public class TestResourceTrackerService {
 
   private final static File TEMP_DIR = new File(System.getProperty(
@@ -493,7 +489,7 @@ public class TestResourceTrackerService 
           ContainerId.newInstance(
             ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
           ContainerState.COMPLETE, Resource.newInstance(1024, 1),
-          "Dummy Completed", 0);
+          "Dummy Completed", 0, Priority.newInstance(10), 1234);
     rm.getResourceTrackerService().handleNMContainerStatus(report);
     verify(handler, never()).handle((Event) any());
 
@@ -504,7 +500,7 @@ public class TestResourceTrackerService 
     report = NMContainerStatus.newInstance(
           ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
           ContainerState.COMPLETE, Resource.newInstance(1024, 1),
-          "Dummy Completed", 0);
+          "Dummy Completed", 0, Priority.newInstance(10), 1234);
     rm.getResourceTrackerService().handleNMContainerStatus(report);
     verify(handler, never()).handle((Event)any());
 
@@ -516,7 +512,7 @@ public class TestResourceTrackerService 
           ContainerId.newInstance(
             ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
           ContainerState.COMPLETE, Resource.newInstance(1024, 1),
-          "Dummy Completed", 0);
+          "Dummy Completed", 0, Priority.newInstance(10), 1234);
     try {
       rm.getResourceTrackerService().handleNMContainerStatus(report);
     } catch (Exception e) {
@@ -531,7 +527,7 @@ public class TestResourceTrackerService 
     report = NMContainerStatus.newInstance(
       ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
       ContainerState.COMPLETE, Resource.newInstance(1024, 1),
-      "Dummy Completed", 0);
+      "Dummy Completed", 0, Priority.newInstance(10), 1234);
     try {
       rm.getResourceTrackerService().handleNMContainerStatus(report);
     } catch (Exception e) {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Sat Jul 12 02:24:40 2014
@@ -62,6 +62,7 @@ import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -218,7 +219,7 @@ public class TestWorkPreservingRMRestart
     assertEquals(availableResources, schedulerAttempt.getHeadroom());
 
     // *********** check appSchedulingInfo state ***********
-    assertEquals(4, schedulerAttempt.getNewContainerId());
+    assertEquals((1 << 22) + 1, schedulerAttempt.getNewContainerId());
   }
 
   private void checkCSQueue(MockRM rm,
@@ -535,6 +536,73 @@ public class TestWorkPreservingRMRestart
     assertNull(scheduler.getRMContainer(completedContainer.getContainerId()));
   }
 
+  @Test (timeout = 600000)
+  public void testAppReregisterOnRMWorkPreservingRestart() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
+
+    // start new RM
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+    rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
+
+    am0.setAMRMProtocol(rm2.getApplicationMasterService());
+    am0.registerAppAttempt(false);
+
+    rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+    rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
+  }
+  
+  @Test (timeout = 30000)
+  public void testAMContainerStatusWithRMRestart() throws Exception {  
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1_1 = rm1.submitApp(1024);
+    MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1);
+    
+    RMAppAttempt attempt0 = app1_1.getCurrentAppAttempt();
+    AbstractYarnScheduler scheduler =
+        ((AbstractYarnScheduler) rm1.getResourceScheduler());
+    
+    Assert.assertTrue(scheduler.getRMContainer(
+        attempt0.getMasterContainer().getId()).isAMContainer());
+
+    // Re-start RM
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+
+    List<NMContainerStatus> am1_1Containers =
+        createNMContainerStatusForApp(am1_1);
+    nm1.registerNode(am1_1Containers, null);
+
+    // Wait for RM to settle down on recovering containers;
+    waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId());
+
+    scheduler = ((AbstractYarnScheduler) rm2.getResourceScheduler());
+    Assert.assertTrue(scheduler.getRMContainer(
+        attempt0.getMasterContainer().getId()).isAMContainer());
+  }
+
+
   private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
       int appsPending, int appsRunning, int appsCompleted,
       int allocatedContainers, int availableMB, int availableVirtualCores,

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java Sat Jul 12 02:24:40 2014
@@ -165,7 +165,7 @@ public class TestRMApplicationHistoryWri
     when(container.getAllocatedResource()).thenReturn(
       Resource.newInstance(-1, -1));
     when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED);
-    when(container.getStartTime()).thenReturn(0L);
+    when(container.getCreationTime()).thenReturn(0L);
     when(container.getFinishTime()).thenReturn(1L);
     when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info");
     when(container.getLogURL()).thenReturn("test log url");
@@ -281,7 +281,7 @@ public class TestRMApplicationHistoryWri
     Assert.assertEquals(Resource.newInstance(-1, -1),
       containerHD.getAllocatedResource());
     Assert.assertEquals(Priority.UNDEFINED, containerHD.getPriority());
-    Assert.assertEquals(0L, container.getStartTime());
+    Assert.assertEquals(0L, container.getCreationTime());
 
     writer.containerFinished(container);
     for (int i = 0; i < MAX_RETRIES; ++i) {
@@ -420,7 +420,7 @@ public class TestRMApplicationHistoryWri
     int waitCount = 0;
     int allocatedSize = allocated.size();
     while (allocatedSize < request && waitCount++ < 200) {
-      Thread.sleep(100);
+      Thread.sleep(300);
       allocated =
           am.allocate(new ArrayList<ResourceRequest>(),
             new ArrayList<ContainerId>()).getAllocatedContainers();

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java Sat Jul 12 02:24:40 2014
@@ -32,10 +32,12 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -165,6 +167,11 @@ public abstract class MockAsm extends Mo
     public Set<NodeId> getRanNodes() {
       throw new UnsupportedOperationException("Not supported yet.");
     }
+
+    @Override
+    public RMAppMetrics getRMAppMetrics() {
+      return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0);
+    }
   }
 
   public static RMApp newApplication(int i) {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Sat Jul 12 02:24:40 2014
@@ -19,41 +19,47 @@
 package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-
-import org.junit.Assert;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
 import org.junit.Test;
 
-/**
- * Test to restart the AM on failure.
- *
- */
 public class TestAMRestart {
 
-  @Test
+  @Test(timeout = 30000)
   public void testAMRestartWithExistingContainers() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
@@ -123,9 +129,9 @@ public class TestAMRestart {
         ContainerId.newInstance(am1.getApplicationAttemptId(), 6);
     nm1.nodeHeartbeat(true);
     SchedulerApplicationAttempt schedulerAttempt =
-        ((CapacityScheduler) rm1.getResourceScheduler())
+        ((AbstractYarnScheduler) rm1.getResourceScheduler())
           .getCurrentAttemptForContainer(containerId6);
-    while (schedulerAttempt.getReservedContainers().size() == 0) {
+    while (schedulerAttempt.getReservedContainers().isEmpty()) {
       System.out.println("Waiting for container " + containerId6
           + " to be reserved.");
       nm1.nodeHeartbeat(true);
@@ -219,7 +225,7 @@ public class TestAMRestart {
 
     // record the scheduler attempt for testing.
     SchedulerApplicationAttempt schedulerNewAttempt =
-        ((CapacityScheduler) rm1.getResourceScheduler())
+        ((AbstractYarnScheduler) rm1.getResourceScheduler())
           .getCurrentAttemptForContainer(containerId2);
     // finish this application
     MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am2);
@@ -245,7 +251,7 @@ public class TestAMRestart {
     }
   }
 
-  @Test
+  @Test(timeout = 30000)
   public void testNMTokensRebindOnAMRestart() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
@@ -345,4 +351,234 @@ public class TestAMRestart {
     Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens));
     rm1.stop();
   }
+
+  // AM container preempted, nm disk failure
+  // should not be counted towards AM max retry count.
+  @Test(timeout = 100000)
+  public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    // explicitly set max-am-retry count as 1.
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    CapacityScheduler scheduler =
+        (CapacityScheduler) rm1.getResourceScheduler();
+    ContainerId amContainer =
+        ContainerId.newInstance(am1.getApplicationAttemptId(), 1);
+    // Preempt the first attempt;
+    scheduler.killContainer(scheduler.getRMContainer(amContainer));
+
+    am1.waitForState(RMAppAttemptState.FAILED);
+    Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    ApplicationState appState =
+        memStore.getState().getApplicationState().get(app1.getApplicationId());
+    // AM should be restarted even though max-am-attempt is 1.
+    MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
+    Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
+
+    // Preempt the second attempt.
+    ContainerId amContainer2 =
+        ContainerId.newInstance(am2.getApplicationAttemptId(), 1);
+    scheduler.killContainer(scheduler.getRMContainer(amContainer2));
+
+    am2.waitForState(RMAppAttemptState.FAILED);
+    Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
+    Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt());
+
+    // mimic NM disk_failure
+    ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
+    containerStatus.setContainerId(attempt3.getMasterContainer().getId());
+    containerStatus.setDiagnostics("mimic NM disk_failure");
+    containerStatus.setState(ContainerState.COMPLETE);
+    containerStatus.setExitStatus(ContainerExitStatus.DISKS_FAILED);
+    Map<ApplicationId, List<ContainerStatus>> conts =
+        new HashMap<ApplicationId, List<ContainerStatus>>();
+    conts.put(app1.getApplicationId(),
+      Collections.singletonList(containerStatus));
+    nm1.nodeHeartbeat(conts, true);
+
+    am3.waitForState(RMAppAttemptState.FAILED);
+    Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry());
+    Assert.assertEquals(ContainerExitStatus.DISKS_FAILED,
+      appState.getAttempt(am3.getApplicationAttemptId())
+        .getAMContainerExitStatus());
+
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    MockAM am4 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    RMAppAttempt attempt4 = app1.getCurrentAppAttempt();
+    Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
+
+    // create second NM, and register to rm1
+    MockNM nm2 =
+        new MockNM("127.0.0.1:2234", 8000, rm1.getResourceTrackerService());
+    nm2.registerNode();
+    // nm1 heartbeats to report unhealthy
+    // This will mimic ContainerExitStatus.ABORT
+    nm1.nodeHeartbeat(false);
+    am4.waitForState(RMAppAttemptState.FAILED);
+    Assert.assertTrue(! attempt4.shouldCountTowardsMaxAttemptRetry());
+    Assert.assertEquals(ContainerExitStatus.ABORTED,
+      appState.getAttempt(am4.getApplicationAttemptId())
+        .getAMContainerExitStatus());
+    // launch next AM in nm2
+    nm2.nodeHeartbeat(true);
+    MockAM am5 =
+        rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2);
+    RMAppAttempt attempt5 = app1.getCurrentAppAttempt();
+    Assert.assertTrue(((RMAppAttemptImpl) attempt5).mayBeLastAttempt());
+    // fail the AM normally
+    nm2
+      .nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am5.waitForState(RMAppAttemptState.FAILED);
+    Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry());
+
+    // AM should not be restarted.
+    rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
+    Assert.assertEquals(5, app1.getAppAttempts().size());
+    rm1.stop();
+  }
+
+  // Test RM restarts after AM container is preempted, new RM should not count
+  // AM preemption failure towards the max-retry-account and should be able to
+  // re-launch the AM.
+  @Test(timeout = 20000)
+  public void testPreemptedAMRestartOnRMRestart() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    // explicitly set max-am-retry count as 1.
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    CapacityScheduler scheduler =
+        (CapacityScheduler) rm1.getResourceScheduler();
+    ContainerId amContainer =
+        ContainerId.newInstance(am1.getApplicationAttemptId(), 1);
+
+    // Forcibly preempt the am container;
+    scheduler.killContainer(scheduler.getRMContainer(amContainer));
+
+    am1.waitForState(RMAppAttemptState.FAILED);
+    Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+    // state store has 1 attempt stored.
+    ApplicationState appState =
+        memStore.getState().getApplicationState().get(app1.getApplicationId());
+    Assert.assertEquals(1, appState.getAttemptCount());
+    // attempt stored has the preempted container exit status.
+    Assert.assertEquals(ContainerExitStatus.PREEMPTED,
+      appState.getAttempt(am1.getApplicationAttemptId())
+        .getAMContainerExitStatus());
+    // Restart rm.
+    MockRM rm2 = new MockRM(conf, memStore);
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm1.registerNode();
+    rm2.start();
+
+    // Restarted RM should re-launch the am.
+    MockAM am2 =
+        rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
+    MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2);
+    RMAppAttempt attempt2 =
+        rm2.getRMContext().getRMApps().get(app1.getApplicationId())
+          .getCurrentAppAttempt();
+    Assert.assertTrue(attempt2.shouldCountTowardsMaxAttemptRetry());
+    Assert.assertEquals(ContainerExitStatus.INVALID,
+      appState.getAttempt(am2.getApplicationAttemptId())
+        .getAMContainerExitStatus());
+    rm1.stop();
+    rm2.stop();
+  }
+
+  // Test regular RM restart/failover, new RM should not count
+  // AM failure towards the max-retry-account and should be able to
+  // re-launch the AM.
+  @Test(timeout = 50000)
+  public void testRMRestartOrFailoverNotCountedForAMFailures()
+      throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    // explicitly set max-am-retry count as 1.
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    // AM should be restarted even though max-am-attempt is 1.
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    Assert.assertTrue(((RMAppAttemptImpl) attempt1).mayBeLastAttempt());
+
+    // Restart rm.
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    ApplicationState appState =
+        memStore.getState().getApplicationState().get(app1.getApplicationId());
+    // re-register the NM
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
+    status
+      .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
+    status.setContainerId(attempt1.getMasterContainer().getId());
+    status.setContainerState(ContainerState.COMPLETE);
+    status.setDiagnostics("");
+    nm1.registerNode(Collections.singletonList(status), null);
+
+    rm2.waitForState(attempt1.getAppAttemptId(), RMAppAttemptState.FAILED);
+    Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+      appState.getAttempt(am1.getApplicationAttemptId())
+        .getAMContainerExitStatus());
+    // Will automatically start a new AppAttempt in rm2
+    rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    MockAM am2 =
+        rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
+    MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2);
+    RMAppAttempt attempt3 =
+        rm2.getRMContext().getRMApps().get(app1.getApplicationId())
+          .getCurrentAppAttempt();
+    Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry());
+    Assert.assertEquals(ContainerExitStatus.INVALID,
+      appState.getAttempt(am2.getApplicationAttemptId())
+        .getAMContainerExitStatus());
+
+    rm1.stop();
+    rm2.stop();
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java Sat Jul 12 02:24:40 2014
@@ -80,6 +80,8 @@ public class TestProportionalCapacityPre
   static final long TS = 3141592653L;
 
   int appAlloc = 0;
+  boolean setAMContainer = false;
+  float setAMResourcePercent = 0.0f;
   Random rand = null;
   Clock mClock = null;
   Configuration conf = null;
@@ -466,7 +468,108 @@ public class TestProportionalCapacityPre
     
     fail("Failed to find SchedulingMonitor service, please check what happened");
   }
+  
+  @Test
+  public void testSkipAMContainer() {
+    int[][] qData = new int[][] {
+        //  /   A   B
+        { 100, 50, 50 }, // abs
+        { 100, 100, 100 }, // maxcap
+        { 100, 100, 0 }, // used
+        { 70, 20, 50 }, // pending
+        { 0, 0, 0 }, // reserved
+        { 5, 4, 1 }, // apps
+        { -1, 1, 1 }, // req granularity
+        { 2, 0, 0 }, // subqueues
+    };
+    setAMContainer = true;
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    
+    // By skipping AM Container, all other 24 containers of appD will be
+    // preempted
+    verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD)));
+
+    // By skipping AM Container, all other 24 containers of appC will be
+    // preempted
+    verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+    // Since AM containers of appC and appD are saved, 2 containers from appB
+    // has to be preempted.
+    verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
+    setAMContainer = false;
+  }
+  
+  @Test
+  public void testPreemptSkippedAMContainers() {
+    int[][] qData = new int[][] {
+        //  /   A   B
+        { 100, 10, 90 }, // abs
+        { 100, 100, 100 }, // maxcap
+        { 100, 100, 0 }, // used
+        { 70, 20, 90 }, // pending
+        { 0, 0, 0 }, // reserved
+        { 5, 4, 1 }, // apps
+        { -1, 5, 5 }, // req granularity
+        { 2, 0, 0 }, // subqueues
+    };
+    setAMContainer = true;
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    
+    // All 5 containers of appD will be preempted including AM container.
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD)));
 
+    // All 5 containers of appC will be preempted including AM container.
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    
+    // By skipping AM Container, all other 4 containers of appB will be
+    // preempted
+    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
+
+    // By skipping AM Container, all other 4 containers of appA will be
+    // preempted
+    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    setAMContainer = false;
+  }
+  
+  @Test
+  public void testAMResourcePercentForSkippedAMContainers() {
+    int[][] qData = new int[][] {
+        //  /   A   B
+        { 100, 10, 90 }, // abs
+        { 100, 100, 100 }, // maxcap
+        { 100, 100, 0 }, // used
+        { 70, 20, 90 }, // pending
+        { 0, 0, 0 }, // reserved
+        { 5, 4, 1 }, // apps
+        { -1, 5, 5 }, // req granularity
+        { 2, 0, 0 }, // subqueues
+    };
+    setAMContainer = true;
+    setAMResourcePercent = 0.5f;
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    
+    // AMResoucePercent is 50% of cluster and maxAMCapacity will be 5Gb.
+    // Total used AM container size is 20GB, hence 2 AM container has
+    // to be preempted as Queue Capacity is 10Gb.
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD)));
+
+    // Including AM Container, all other 4 containers of appC will be
+    // preempted
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    
+    // By skipping AM Container, all other 4 containers of appB will be
+    // preempted
+    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
+
+    // By skipping AM Container, all other 4 containers of appA will be
+    // preempted
+    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    setAMContainer = false;
+  }
+  
   static class IsPreemptionRequestFor
       extends ArgumentMatcher<ContainerPreemptEvent> {
     private final ApplicationAttemptId appAttId;
@@ -583,6 +686,9 @@ public class TestProportionalCapacityPre
       }
     }
     when(lq.getApplications()).thenReturn(qApps);
+    if(setAMResourcePercent != 0.0f){
+      when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent);
+    }
     p.getChildQueues().add(lq);
     return lq;
   }
@@ -607,7 +713,11 @@ public class TestProportionalCapacityPre
 
     List<RMContainer> cLive = new ArrayList<RMContainer>();
     for (int i = 0; i < used; i += gran) {
-      cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
+      if(setAMContainer && i == 0){
+        cLive.add(mockContainer(appAttId, cAlloc, unit, 0));
+      }else{
+        cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
+      }
       ++cAlloc;
     }
     when(app.getLiveContainers()).thenReturn(cLive);
@@ -623,6 +733,10 @@ public class TestProportionalCapacityPre
     RMContainer mC = mock(RMContainer.class);
     when(mC.getContainerId()).thenReturn(cId);
     when(mC.getContainer()).thenReturn(c);
+    when(mC.getApplicationAttemptId()).thenReturn(appAttId);
+    if(0 == priority){
+      when(mC.isAMContainer()).thenReturn(true);
+    }
     return mC;
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Sat Jul 12 02:24:40 2014
@@ -267,6 +267,7 @@ public class RMStateStoreTestBase extend
     // attempt1 is loaded correctly
     assertNotNull(attemptState);
     assertEquals(attemptId1, attemptState.getAttemptId());
+    assertEquals(-1000, attemptState.getAMContainerExitStatus());
     // attempt1 container is loaded correctly
     assertEquals(containerId1, attemptState.getMasterContainer().getId());
     // attempt1 applicationToken is loaded correctly
@@ -308,7 +309,7 @@ public class RMStateStoreTestBase extend
           oldAttemptState.getAppAttemptCredentials(),
           oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
           "myTrackingUrl", "attemptDiagnostics",
-          FinalApplicationStatus.SUCCEEDED);
+          FinalApplicationStatus.SUCCEEDED, 100);
     store.updateApplicationAttemptState(newAttemptState);
 
     // test updating the state of an app/attempt whose initial state was not
@@ -331,7 +332,7 @@ public class RMStateStoreTestBase extend
           oldAttemptState.getAppAttemptCredentials(),
           oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
           "myTrackingUrl", "attemptDiagnostics",
-          FinalApplicationStatus.SUCCEEDED);
+          FinalApplicationStatus.SUCCEEDED, 111);
     store.updateApplicationAttemptState(dummyAttempt);
 
     // let things settle down
@@ -370,6 +371,7 @@ public class RMStateStoreTestBase extend
     assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState());
     assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl());
     assertEquals("attemptDiagnostics", updatedAttemptState.getDiagnostics());
+    assertEquals(100, updatedAttemptState.getAMContainerExitStatus());
     assertEquals(FinalApplicationStatus.SUCCEEDED,
       updatedAttemptState.getFinalApplicationStatus());
 
@@ -493,21 +495,53 @@ public class RMStateStoreTestBase extend
       Assert.assertTrue(t instanceof RMStateVersionIncompatibleException);
     }
   }
+  
+  public void testEpoch(RMStateStoreHelper stateStoreHelper)
+      throws Exception {
+    RMStateStore store = stateStoreHelper.getRMStateStore();
+    store.setRMDispatcher(new TestDispatcher());
+    
+    int firstTimeEpoch = store.getAndIncrementEpoch();
+    Assert.assertEquals(0, firstTimeEpoch);
+    
+    int secondTimeEpoch = store.getAndIncrementEpoch();
+    Assert.assertEquals(1, secondTimeEpoch);
+    
+    int thirdTimeEpoch = store.getAndIncrementEpoch();
+    Assert.assertEquals(2, thirdTimeEpoch);
+  }
 
   public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
       throws Exception {
     RMStateStore store = stateStoreHelper.getRMStateStore();
     store.setRMDispatcher(new TestDispatcher());
-    // create and store apps
+    ArrayList<RMApp> appList = createAndStoreApps(stateStoreHelper, store, 5);
+
+    for (RMApp app : appList) {
+      // remove the app
+      store.removeApplication(app);
+      // wait for app to be removed.
+      while (true) {
+        if (!stateStoreHelper.appExists(app)) {
+          break;
+        } else {
+          Thread.sleep(100);
+        }
+      }
+    }
+  }
+
+  private ArrayList<RMApp> createAndStoreApps(
+      RMStateStoreHelper stateStoreHelper, RMStateStore store, int numApps)
+      throws Exception {
     ArrayList<RMApp> appList = new ArrayList<RMApp>();
-    int NUM_APPS = 5;
-    for (int i = 0; i < NUM_APPS; i++) {
+    for (int i = 0; i < numApps; i++) {
       ApplicationId appId = ApplicationId.newInstance(1383183338, i);
       RMApp app = storeApp(store, appId, 123456789, 987654321);
       appList.add(app);
     }
 
-    Assert.assertEquals(NUM_APPS, appList.size());
+    Assert.assertEquals(numApps, appList.size());
     for (RMApp app : appList) {
       // wait for app to be stored.
       while (true) {
@@ -518,18 +552,17 @@ public class RMStateStoreTestBase extend
         }
       }
     }
+    return appList;
+  }
 
+  public void testDeleteStore(RMStateStoreHelper stateStoreHelper)
+      throws Exception {
+    RMStateStore store = stateStoreHelper.getRMStateStore();
+    ArrayList<RMApp> appList = createAndStoreApps(stateStoreHelper, store, 5);
+    store.deleteStore();
+    // verify apps deleted
     for (RMApp app : appList) {
-      // remove the app
-      store.removeApplication(app);
-      // wait for app to be removed.
-      while (true) {
-        if (!stateStoreHelper.appExists(app)) {
-          break;
-        } else {
-          Thread.sleep(100);
-        }
-      }
+      Assert.assertFalse(stateStoreHelper.appExists(app));
     }
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java Sat Jul 12 02:24:40 2014
@@ -158,7 +158,9 @@ public class TestFSRMStateStore extends 
           .getFileSystem(conf).exists(tempAppAttemptFile));
       testRMDTSecretManagerStateStore(fsTester);
       testCheckVersion(fsTester);
+      testEpoch(fsTester);
       testAppDeletion(fsTester);
+      testDeleteStore(fsTester);
     } finally {
       cluster.shutdown();
     }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java Sat Jul 12 02:24:40 2014
@@ -120,7 +120,9 @@ public class TestZKRMStateStore extends 
     testRMAppStateStore(zkTester);
     testRMDTSecretManagerStateStore(zkTester);
     testCheckVersion(zkTester);
+    testEpoch(zkTester);
     testAppDeletion(zkTester);
+    testDeleteStore(zkTester);
   }
 
   private Configuration createHARMConf(

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java Sat Jul 12 02:24:40 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -238,4 +239,13 @@ public class MockRMApp implements RMApp 
   public Set<NodeId> getRanNodes() {
     return null;
   }
+  
+  public Resource getResourcePreempted() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public RMAppMetrics getRMAppMetrics() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Sat Jul 12 02:24:40 2014
@@ -86,6 +86,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -600,6 +602,9 @@ public class TestRMAppAttemptTransitions
             any(List.class), 
             any(List.class))).
     thenReturn(allocation);
+    RMContainer rmContainer = mock(RMContainerImpl.class);
+    when(scheduler.getRMContainer(container.getId())).
+        thenReturn(rmContainer);
     
     applicationAttempt.handle(
         new RMAppAttemptContainerAllocatedEvent(
@@ -815,6 +820,7 @@ public class TestRMAppAttemptTransitions
       applicationAttempt.getAppAttemptState());
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
     verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
+    verifyAMCrashAtAllocatedDiagnosticInfo(applicationAttempt.getDiagnostics());
   }
   
   @Test
@@ -1232,6 +1238,13 @@ public class TestRMAppAttemptTransitions
     verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
   }
 
+  private void verifyAMCrashAtAllocatedDiagnosticInfo(String diagnostics) {
+    assertTrue("Diagnostic information does not contain application proxy URL",
+      diagnostics.contains(applicationAttempt.getWebProxyBase()));
+    assertTrue("Diagnostic information does not point the logs to the users",
+      diagnostics.contains("logs"));
+  }
+
   private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
     verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId);
     if (UserGroupInformation.isSecurityEnabled()) {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java Sat Jul 12 02:24:40 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.*;
 
@@ -61,10 +62,15 @@ public class TestSchedulerApplicationAtt
     QueueMetrics newMetrics = newQueue.getMetrics();
 
     ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getEpoch()).thenReturn(3);
     SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
-        user, oldQueue, oldQueue.getActiveUsersManager(), null);
+        user, oldQueue, oldQueue.getActiveUsersManager(), rmContext);
     oldMetrics.submitApp(user);
     
+    // confirm that containerId is calculated based on epoch.
+    assertEquals(app.getNewContainerId(), 0x00c00001);
+    
     // Resource request
     Resource requestedResource = Resource.newInstance(1536, 2);
     Priority requestedPriority = Priority.newInstance(2);

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Sat Jul 12 02:24:40 2014
@@ -25,17 +25,32 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -46,13 +61,24 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
+import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
+import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -685,5 +711,240 @@ public class TestCapacityScheduler {
       CapacityScheduler.schedule(cs);
     }
   }
+  
+  private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
+      throws Exception {
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    nm.nodeHeartbeat(true);
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
+    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+    return am;
+  }
+
+  private void waitForAppPreemptionInfo(RMApp app, Resource preempted,
+      int numAMPreempted, int numTaskPreempted,
+      Resource currentAttemptPreempted, boolean currentAttemptAMPreempted,
+      int numLatestAttemptTaskPreempted) throws InterruptedException {
+    while (true) {
+      RMAppMetrics appPM = app.getRMAppMetrics();
+      RMAppAttemptMetrics attemptPM =
+          app.getCurrentAppAttempt().getRMAppAttemptMetrics();
+
+      if (appPM.getResourcePreempted().equals(preempted)
+          && appPM.getNumAMContainersPreempted() == numAMPreempted
+          && appPM.getNumNonAMContainersPreempted() == numTaskPreempted
+          && attemptPM.getResourcePreempted().equals(currentAttemptPreempted)
+          && app.getCurrentAppAttempt().getRMAppAttemptMetrics()
+            .getIsPreempted() == currentAttemptAMPreempted
+          && attemptPM.getNumNonAMContainersPreempted() == 
+             numLatestAttemptTaskPreempted) {
+        return;
+      }
+      Thread.sleep(500);
+    }
+  }
+
+  private void waitForNewAttemptCreated(RMApp app,
+      ApplicationAttemptId previousAttemptId) throws InterruptedException {
+    while (app.getCurrentAppAttempt().equals(previousAttemptId)) {
+      Thread.sleep(500);
+    }
+  }
+  
+  @Test(timeout = 30000)
+  public void testAllocateDoesNotBlockOnSchedulerLock() throws Exception {
+    final YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MyContainerManager containerManager = new MyContainerManager();
+    final MockRMWithAMS rm =
+        new MockRMWithAMS(conf, containerManager);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("localhost:1234", 5120);
+
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>(2);
+    acls.put(ApplicationAccessType.VIEW_APP, "*");
+    RMApp app = rm.submitApp(1024, "appname", "appuser", acls);
+
+    nm1.nodeHeartbeat(true);
+
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
+    int msecToWait = 10000;
+    int msecToSleep = 100;
+    while (attempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED
+        && msecToWait > 0) {
+      LOG.info("Waiting for AppAttempt to reach LAUNCHED state. "
+          + "Current state is " + attempt.getAppAttemptState());
+      Thread.sleep(msecToSleep);
+      msecToWait -= msecToSleep;
+    }
+    Assert.assertEquals(attempt.getAppAttemptState(),
+        RMAppAttemptState.LAUNCHED);
+
+    // Create a client to the RM.
+    final YarnRPC rpc = YarnRPC.create(conf);
+
+    UserGroupInformation currentUser =
+        UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
+    Credentials credentials = containerManager.getContainerCredentials();
+    final InetSocketAddress rmBindAddress =
+        rm.getApplicationMasterService().getBindAddress();
+    Token<? extends TokenIdentifier> amRMToken =
+        MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress,
+          credentials.getAllTokens());
+    currentUser.addToken(amRMToken);
+    ApplicationMasterProtocol client =
+        currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
+          @Override
+          public ApplicationMasterProtocol run() {
+            return (ApplicationMasterProtocol) rpc.getProxy(
+              ApplicationMasterProtocol.class, rmBindAddress, conf);
+          }
+        });
+
+    RegisterApplicationMasterRequest request =
+        RegisterApplicationMasterRequest.newInstance("localhost", 12345, "");
+    client.registerApplicationMaster(request);
+
+    // grab the scheduler lock from another thread
+    // and verify an allocate call in this thread doesn't block on it
+    final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    final CyclicBarrier barrier = new CyclicBarrier(2);
+    Thread otherThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        synchronized(cs) {
+          try {
+            barrier.await();
+            barrier.await();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          } catch (BrokenBarrierException e) {
+            e.printStackTrace();
+          }
+        }
+      }
+    });
+    otherThread.start();
+    barrier.await();
+    AllocateRequest allocateRequest =
+        AllocateRequest.newInstance(0, 0.0f, null, null, null);
+    client.allocate(allocateRequest);
+    barrier.await();
+    otherThread.join();
+
+    rm.stop();
+  }
+
+  @Test
+  public void testNumClusterNodes() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(conf);
+    RMContextImpl rmContext =  new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
+    cs.setRMContext(rmContext);
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    cs.init(csConf);
+    cs.start();
+    assertEquals(0, cs.getNumClusterNodes());
+
+    RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
+    RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+    cs.handle(new NodeAddedSchedulerEvent(n2));
+    assertEquals(2, cs.getNumClusterNodes());
 
+    cs.handle(new NodeRemovedSchedulerEvent(n1));
+    assertEquals(1, cs.getNumClusterNodes());
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+    assertEquals(2, cs.getNumClusterNodes());
+    cs.handle(new NodeRemovedSchedulerEvent(n2));
+    cs.handle(new NodeRemovedSchedulerEvent(n1));
+    assertEquals(0, cs.getNumClusterNodes());
+
+    cs.stop();
+  }
+
+  @Test(timeout = 120000)
+  public void testPreemptionInfo() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    int CONTAINER_MEMORY = 1024; // start RM
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+
+    // get scheduler
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    // start NM
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(CONTAINER_MEMORY);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+
+    // get scheduler app
+    FiCaSchedulerApp schedulerAppAttempt =
+        cs.getSchedulerApplications().get(app0.getApplicationId())
+            .getCurrentAppAttempt();
+
+    // allocate some containers and launch them
+    List<Container> allocatedContainers =
+        am0.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);
+
+    // kill the 3 containers
+    for (Container c : allocatedContainers) {
+      cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId()));
+    }
+
+    // check values
+    waitForAppPreemptionInfo(app0,
+        Resource.newInstance(CONTAINER_MEMORY * 3, 3), 0, 3,
+        Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);
+
+    // kill app0-attempt0 AM container
+    cs.killContainer(schedulerAppAttempt.getRMContainer(app0
+        .getCurrentAppAttempt().getMasterContainer().getId()));
+
+    // wait for app0 failed
+    waitForNewAttemptCreated(app0, am0.getApplicationAttemptId());
+
+    // check values
+    waitForAppPreemptionInfo(app0,
+        Resource.newInstance(CONTAINER_MEMORY * 4, 4), 1, 3,
+        Resource.newInstance(0, 0), false, 0);
+
+    // launch app0-attempt1
+    MockAM am1 = launchAM(app0, rm1, nm1);
+    schedulerAppAttempt =
+        cs.getSchedulerApplications().get(app0.getApplicationId())
+            .getCurrentAppAttempt();
+
+    // allocate some containers and launch them
+    allocatedContainers =
+        am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);
+    for (Container c : allocatedContainers) {
+      cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId()));
+    }
+
+    // check values
+    waitForAppPreemptionInfo(app0,
+        Resource.newInstance(CONTAINER_MEMORY * 7, 7), 1, 6,
+        Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);
+
+    rm1.stop();
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java Sat Jul 12 02:24:40 2014
@@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.server.re
 import java.util.ArrayList;
 import java.util.List;
 
-import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -31,6 +29,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
@@ -48,6 +47,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -209,10 +209,11 @@ public class TestContainerAllocation {
 
         @Override
         public Token createContainerToken(ContainerId containerId,
-            NodeId nodeId, String appSubmitter, Resource capability) {
+            NodeId nodeId, String appSubmitter, Resource capability,
+            Priority priority, long createTime) {
           numRetries++;
           return super.createContainerToken(containerId, nodeId, appSubmitter,
-            capability);
+            capability, priority, createTime);
         }
       };
     }



Mime
View raw message