hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [34/50] [abbrv] hadoop git commit: YARN-5611. Provide an API to update lifetime of an application. Contributed by Rohith Sharma K S
Date Fri, 11 Nov 2016 18:57:57 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java
index e550c97..d194204 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java
@@ -18,9 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor;
 
-import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -33,7 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
 import org.apache.hadoop.yarn.util.SystemClock;
 
@@ -47,12 +45,6 @@ public class RMAppLifetimeMonitor
   private static final Log LOG = LogFactory.getLog(RMAppLifetimeMonitor.class);
 
   private RMContext rmContext;
-  private Map<RMAppToMonitor, Long> monitoredApps =
-      new HashMap<RMAppToMonitor, Long>();
-
-  private static final EnumSet<RMAppState> COMPLETED_APP_STATES =
-      EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED,
-          RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING);
 
   public RMAppLifetimeMonitor(RMContext rmContext) {
     super(RMAppLifetimeMonitor.class.getName(), SystemClock.getInstance());
@@ -61,14 +53,16 @@ public class RMAppLifetimeMonitor
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    long monitorInterval = conf.getLong(
-        YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS,
-        YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS);
+    long monitorInterval =
+        conf.getLong(YarnConfiguration.RM_APPLICATION_MONITOR_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_RM_APPLICATION_MONITOR_INTERVAL_MS);
     if (monitorInterval <= 0) {
       monitorInterval =
-          YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS;
+          YarnConfiguration.DEFAULT_RM_APPLICATION_MONITOR_INTERVAL_MS;
     }
     setMonitorInterval(monitorInterval);
+    setExpireInterval(0); // No need of expire interval for App.
+    setResetTimeOnStart(false); // do not reset expire time on restart
     LOG.info("Application lifelime monitor interval set to " + monitorInterval
         + " ms.");
     super.serviceInit(conf);
@@ -77,54 +71,42 @@ public class RMAppLifetimeMonitor
   @SuppressWarnings("unchecked")
   @Override
   protected synchronized void expire(RMAppToMonitor monitoredAppKey) {
-    Long remove = monitoredApps.remove(monitoredAppKey);
     ApplicationId appId = monitoredAppKey.getApplicationId();
     RMApp app = rmContext.getRMApps().get(appId);
     if (app == null) {
       return;
     }
-    // Don't trigger a KILL event if application is in completed states
-    if (!COMPLETED_APP_STATES.contains(app.getState())) {
-      String diagnostics =
-          "Application killed due to exceeding its lifetime period " + remove
-              + " milliseconds";
-      rmContext.getDispatcher().getEventHandler()
-          .handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnostics));
-    } else {
-      LOG.info("Application " + appId
-          + " is about to complete. So not killing the application.");
-    }
+    String diagnostics =
+        "Application killed due to exceeding its lifetime period";
+    rmContext.getDispatcher().getEventHandler()
+        .handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnostics));
   }
 
-  public synchronized void registerApp(ApplicationId appId,
-      ApplicationTimeoutType timeoutType, long monitorStartTime, long timeout) {
+  public void registerApp(ApplicationId appId,
+      ApplicationTimeoutType timeoutType, long expireTime) {
     RMAppToMonitor appToMonitor = new RMAppToMonitor(appId, timeoutType);
-    register(appToMonitor, monitorStartTime);
-    monitoredApps.putIfAbsent(appToMonitor, timeout);
-  }
-
-  @Override
-  protected synchronized long getExpireInterval(
-      RMAppToMonitor monitoredAppKey) {
-    return monitoredApps.get(monitoredAppKey);
+    register(appToMonitor, expireTime);
   }
 
-  public synchronized void unregisterApp(ApplicationId appId,
+  public void unregisterApp(ApplicationId appId,
       ApplicationTimeoutType timeoutType) {
-    RMAppToMonitor appToRemove = new RMAppToMonitor(appId, timeoutType);
-    unregister(appToRemove);
-    monitoredApps.remove(appToRemove);
+    RMAppToMonitor remove = new RMAppToMonitor(appId, timeoutType);
+    unregister(remove);
   }
 
-  public synchronized void unregisterApp(ApplicationId appId,
-      Set<ApplicationTimeoutType> types) {
-    for (ApplicationTimeoutType type : types) {
-      unregisterApp(appId, type);
+  public void unregisterApp(ApplicationId appId,
+      Set<ApplicationTimeoutType> timeoutTypes) {
+    for (ApplicationTimeoutType timeoutType : timeoutTypes) {
+      unregisterApp(appId, timeoutType);
     }
   }
 
-  public synchronized void updateApplicationTimeouts(ApplicationId appId,
+  public void updateApplicationTimeouts(ApplicationId appId,
       Map<ApplicationTimeoutType, Long> timeouts) {
-    // TODO in YARN-5611
+    for (Entry<ApplicationTimeoutType, Long> entry : timeouts.entrySet()) {
+      ApplicationTimeoutType timeoutType = entry.getKey();
+      RMAppToMonitor update = new RMAppToMonitor(appId, timeoutType);
+      register(update, entry.getValue());
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 7e98f10..af51f3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2418,8 +2418,9 @@ public class CapacityScheduler extends
         ApplicationStateData.newInstance(rmApp.getSubmitTime(),
             rmApp.getStartTime(), rmApp.getApplicationSubmissionContext(),
             rmApp.getUser(), rmApp.getCallerContext());
+    appState.setApplicationTimeouts(rmApp.getApplicationTimeouts());
     rmContext.getStateStore().updateApplicationStateSynchronously(appState,
-        false);
+        false, null);
 
     // As we use iterator over a TreeSet for OrderingPolicy, once we change
     // priority then reinsert back to make order correct.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
index 6e2398a..4693818 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
@@ -69,6 +69,7 @@ message ApplicationStateDataProto {
     optional string diagnostics = 6 [default = "N/A"];
     optional int64 finish_time = 7;
     optional hadoop.common.RPCCallerContextProto caller_context = 8;
+    repeated ApplicationTimeoutMapProto application_timeouts = 9;
 }
 
 message ApplicationAttemptStateDataProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 19ee0b1..e5b166d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -229,6 +230,11 @@ public abstract class MockAsm extends MockApps {
     public CallerContext getCallerContext() {
       throw new UnsupportedOperationException("Not supported yet.");
     }
+
+    @Override
+    public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
   }
 
   public static RMApp newApplication(int i) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 62a5c52..bbfa60f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -315,4 +316,9 @@ public class MockRMApp implements RMApp {
   public void setCollectorAddr(String collectorAddr) {
     throw new UnsupportedOperationException("Not supported yet.");
   }
+
+  @Override
+  public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java
index 3f2db1d..e803a88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -27,11 +28,14 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -39,8 +43,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
 import org.apache.hadoop.yarn.server.resourcemanager.TestWorkPreservingRMRestart;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.util.Times;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -60,15 +66,11 @@ public class TestApplicationLifetimeMonitor {
     Logger rootLogger = LogManager.getRootLogger();
     rootLogger.setLevel(Level.DEBUG);
     UserGroupInformation.setConfiguration(conf);
-    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
-    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
-        true);
-    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
-    conf.setLong(YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS,
+    conf.setLong(YarnConfiguration.RM_APPLICATION_MONITOR_INTERVAL_MS,
         3000L);
   }
 
-  @Test(timeout = 90000)
+  @Test(timeout = 60000)
   public void testApplicationLifetimeMonitor() throws Exception {
     MockRM rm = null;
     try {
@@ -81,22 +83,64 @@ public class TestApplicationLifetimeMonitor {
           new HashMap<ApplicationTimeoutType, Long>();
       timeouts.put(ApplicationTimeoutType.LIFETIME, 10L);
       RMApp app1 = rm.submitApp(1024, appPriority, timeouts);
+
+      // 20L seconds
+      timeouts.put(ApplicationTimeoutType.LIFETIME, 20L);
+      RMApp app2 = rm.submitApp(1024, appPriority, timeouts);
+
       nm1.nodeHeartbeat(true);
       // Send launch Event
       MockAM am1 =
           rm.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
       am1.registerAppAttempt();
       rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
-      Assert.assertTrue("Applicaiton killed before lifetime value",
+      Assert.assertTrue("Application killed before lifetime value",
           (System.currentTimeMillis() - app1.getSubmitTime()) > 10000);
+
+      Map<ApplicationTimeoutType, String> updateTimeout =
+          new HashMap<ApplicationTimeoutType, String>();
+      long newLifetime = 10L;
+      // update 10L seconds more to timeout
+      updateTimeout.put(ApplicationTimeoutType.LIFETIME,
+          Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000));
+      UpdateApplicationTimeoutsRequest request =
+          UpdateApplicationTimeoutsRequest.newInstance(app2.getApplicationId(),
+              updateTimeout);
+
+      Map<ApplicationTimeoutType, Long> applicationTimeouts =
+          app2.getApplicationTimeouts();
+      // has old timeout time
+      long beforeUpdate =
+          applicationTimeouts.get(ApplicationTimeoutType.LIFETIME);
+
+      // update app2 lifetime to new time i.e now + timeout
+      rm.getRMContext().getClientRMService().updateApplicationTimeouts(request);
+
+      applicationTimeouts =
+          app2.getApplicationTimeouts();
+      long afterUpdate =
+          applicationTimeouts.get(ApplicationTimeoutType.LIFETIME);
+
+      Assert.assertTrue("Application lifetime value not updated",
+          afterUpdate > beforeUpdate);
+
+      rm.waitForState(app2.getApplicationId(), RMAppState.KILLED);
+      // verify for app killed with updated lifetime
+      Assert.assertTrue("Application killed before lifetime value",
+          app2.getFinishTime() > afterUpdate);
+
     } finally {
       stopRM(rm);
     }
   }
 
-  @SuppressWarnings("rawtypes")
   @Test(timeout = 180000)
   public void testApplicationLifetimeOnRMRestart() throws Exception {
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
+        true);
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+
     MemoryRMStateStore memStore = new MemoryRMStateStore();
     memStore.init(conf);
     MockRM rm1 = new MockRM(conf, memStore);
@@ -115,6 +159,12 @@ public class TestApplicationLifetimeMonitor {
 
     // Re-start RM
     MockRM rm2 = new MockRM(conf, memStore);
+
+    // make sure app has been unregistered with old RM else both will trigger
+    // Expire event
+    rm1.getRMContext().getRMAppLifetimeMonitor().unregisterApp(
+        app1.getApplicationId(), ApplicationTimeoutType.LIFETIME);
+
     rm2.start();
     nm1.setResourceTrackerService(rm2.getResourceTrackerService());
 
@@ -152,9 +202,87 @@ public class TestApplicationLifetimeMonitor {
 
     // wait for app life time and application to be in killed state.
     rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.KILLED);
-    Assert.assertTrue("Applicaiton killed before lifetime value",
-        (System.currentTimeMillis()
-            - recoveredApp1.getSubmitTime()) > appLifetime);
+    Assert.assertTrue("Application killed before lifetime value",
+        recoveredApp1.getFinishTime() > (recoveredApp1.getSubmitTime()
+            + appLifetime * 1000));
+  }
+
+  @Test(timeout = 60000)
+  public void testUpdateApplicationTimeoutForStateStoreUpdateFail()
+      throws Exception {
+    MockRM rm1 = null;
+    try {
+      conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+
+      MemoryRMStateStore memStore = new MemoryRMStateStore() {
+        private int count = 0;
+
+        @Override
+        public synchronized void updateApplicationStateInternal(
+            ApplicationId appId, ApplicationStateData appState)
+            throws Exception {
+          // fail only 1 time.
+          if (count++ == 0) {
+            throw new Exception("State-store update failed");
+          }
+          super.updateApplicationStateInternal(appId, appState);
+        }
+      };
+      memStore.init(conf);
+      rm1 = new MockRM(conf, memStore);
+      rm1.start();
+      MockNM nm1 =
+          new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+      nm1.registerNode();
+      nm1.nodeHeartbeat(true);
+
+      long appLifetime = 30L;
+      Map<ApplicationTimeoutType, Long> timeouts =
+          new HashMap<ApplicationTimeoutType, Long>();
+      timeouts.put(ApplicationTimeoutType.LIFETIME, appLifetime);
+      RMApp app1 = rm1.submitApp(200, Priority.newInstance(0), timeouts);
+
+      Map<ApplicationTimeoutType, String> updateTimeout =
+          new HashMap<ApplicationTimeoutType, String>();
+      long newLifetime = 10L;
+      // update 10L seconds more to timeout i.e 30L seconds overall
+      updateTimeout.put(ApplicationTimeoutType.LIFETIME,
+          Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000));
+      UpdateApplicationTimeoutsRequest request =
+          UpdateApplicationTimeoutsRequest.newInstance(app1.getApplicationId(),
+              updateTimeout);
+
+      Map<ApplicationTimeoutType, Long> applicationTimeouts =
+          app1.getApplicationTimeouts();
+      // has old timeout time
+      long beforeUpdate =
+          applicationTimeouts.get(ApplicationTimeoutType.LIFETIME);
+
+      try {
+        // update app2 lifetime to new time i.e now + timeout
+        rm1.getRMContext().getClientRMService()
+            .updateApplicationTimeouts(request);
+        fail("Update application should fail.");
+      } catch (YarnException e) {
+        // expected
+        assertTrue("State-store exception does not containe appId",
+            e.getMessage().contains(app1.getApplicationId().toString()));
+      }
+
+      applicationTimeouts = app1.getApplicationTimeouts();
+      // has old timeout time
+      long afterUpdate =
+          applicationTimeouts.get(ApplicationTimeoutType.LIFETIME);
+
+      Assert.assertEquals("Application timeout is updated", beforeUpdate,
+          afterUpdate);
+      rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+      // verify for app killed with updated lifetime
+      Assert.assertTrue("Application killed before lifetime value",
+          app1.getFinishTime() > afterUpdate);
+    } finally {
+      stopRM(rm1);
+    }
   }
 
   private void stopRM(MockRM rm) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message