hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjs...@apache.org
Subject [09/50] [abbrv] hadoop git commit: YARN-3094. Reset timer for liveness monitors after RM recovery. Contributed by Jun Gong
Date Wed, 11 Feb 2015 19:48:27 GMT
YARN-3094. Reset timer for liveness monitors after RM recovery. Contributed by Jun Gong


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0af6a99a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0af6a99a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0af6a99a

Branch: refs/heads/YARN-2928
Commit: 0af6a99a3fcfa4b47d3bcba5e5cc5fe7b312a152
Parents: 7d73202
Author: Jian He <jianhe@apache.org>
Authored: Mon Feb 9 13:47:08 2015 -0800
Committer: Jian He <jianhe@apache.org>
Committed: Mon Feb 9 13:47:08 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../yarn/util/AbstractLivelinessMonitor.java    |  8 ++
 .../server/resourcemanager/ResourceManager.java |  2 +
 .../rmapp/attempt/AMLivelinessMonitor.java      |  6 ++
 .../rmapp/attempt/TestAMLivelinessMonitor.java  | 81 ++++++++++++++++++++
 5 files changed, 100 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af6a99a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index cc2f34b..578a8cc 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -516,6 +516,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3143. RM Apps REST API can return NPE or entries missing id and other
     fields (jlowe)
 
+    YARN-3094. Reset timer for liveness monitors after RM recovery. (Jun Gong
+    via jianhe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af6a99a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java
index c182531..4f587b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java
@@ -59,6 +59,7 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService
{
   @Override
   protected void serviceStart() throws Exception {
     assert !stopped : "starting when already stopped";
+    resetTimer();
     checkerThread = new Thread(new PingChecker());
     checkerThread.setName("Ping Checker");
     checkerThread.start();
@@ -99,6 +100,13 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService
{
     running.remove(ob);
   }
 
+  public synchronized void resetTimer() {
+    long time = clock.getTime();
+    for (O ob : running.keySet()) {
+      running.put(ob, time);
+    }
+  }
+
   private class PingChecker implements Runnable {
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af6a99a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 4f242e93..a93372a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -564,12 +564,14 @@ public class ResourceManager extends CompositeService implements Recoverable
{
 
       if(recoveryEnabled) {
         try {
+          LOG.info("Recovery started");
           rmStore.checkVersion();
           if (rmContext.isWorkPreservingRecoveryEnabled()) {
             rmContext.setEpoch(rmStore.getAndIncrementEpoch());
           }
           RMState state = rmStore.loadState();
           recover(state);
+          LOG.info("Recovery ended");
         } catch (Exception e) {
           // the Exception from loadState() needs to be handled for
           // HA and we need to give up master status if we got fenced

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af6a99a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java
index 2c1f7f1..76331bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 
 public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAttemptId>
{
@@ -35,6 +36,11 @@ public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAt
     this.dispatcher = d.getEventHandler();
   }
 
+  public AMLivelinessMonitor(Dispatcher d, Clock clock) {
+    super("AMLivelinessMonitor", clock);
+    this.dispatcher = d.getEventHandler();
+  }
+
   public void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
     int expireIntvl = conf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af6a99a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestAMLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestAMLivelinessMonitor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestAMLivelinessMonitor.java
new file mode 100644
index 0000000..e0e6aee
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestAMLivelinessMonitor.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class TestAMLivelinessMonitor {
+
+  @Test(timeout = 10000)
+  public void testResetTimer() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    UserGroupInformation.setConfiguration(conf);
+    conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+    conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 6000);
+    final ControlledClock clock = new ControlledClock(new SystemClock());
+    clock.setTime(0);
+    MemoryRMStateStore memStore = new MemoryRMStateStore() {
+      @Override
+      public synchronized RMState loadState() throws Exception {
+        clock.setTime(8000);
+        return super.loadState();
+      }
+    };
+    memStore.init(conf);
+    final ApplicationAttemptId attemptId = mock(ApplicationAttemptId.class);
+    final Dispatcher dispatcher = mock(Dispatcher.class);
+    final boolean[] expired = new boolean[]{false};
+    final AMLivelinessMonitor monitor = new AMLivelinessMonitor(
+        dispatcher, clock) {
+      @Override
+      protected void expire(ApplicationAttemptId id) {
+        Assert.assertEquals(id, attemptId);
+        expired[0] = true;
+      }
+    };
+    monitor.register(attemptId);
+    MockRM rm = new MockRM(conf, memStore) {
+      @Override
+      protected AMLivelinessMonitor createAMLivelinessMonitor() {
+        return monitor;
+      }
+    };
+    rm.start();
+    // make sure that monitor has started
+    while (monitor.getServiceState() != Service.STATE.STARTED) {
+      Thread.sleep(100);
+    }
+    // expired[0] would be set to true without resetTimer
+    Assert.assertFalse(expired[0]);
+    rm.stop();
+  }
+}


Mime
View raw message