hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [49/50] [abbrv] hadoop git commit: YARN-4438. Implement RM leader election with curator. Contributed by Jian He
Date Thu, 07 Jan 2016 22:42:06 GMT
YARN-4438. Implement RM leader election with curator. Contributed by Jian He


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/89022f8d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/89022f8d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/89022f8d

Branch: refs/heads/HDFS-1312
Commit: 89022f8d4bac0e9d0b848fd91e9c4d700fe1cdbe
Parents: 52b7757
Author: Xuan <xgong@apache.org>
Authored: Thu Jan 7 14:33:06 2016 -0800
Committer: Xuan <xgong@apache.org>
Committed: Thu Jan 7 14:33:06 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |   5 +
 .../yarn/conf/TestYarnConfigurationFields.java  |   1 +
 .../server/resourcemanager/AdminService.java    |  49 +++-
 .../resourcemanager/LeaderElectorService.java   | 144 ++++++++++
 .../yarn/server/resourcemanager/RMContext.java  |   4 +
 .../server/resourcemanager/RMContextImpl.java   |  11 +
 .../server/resourcemanager/ResourceManager.java |  24 +-
 .../TestLeaderElectorService.java               | 269 +++++++++++++++++++
 .../yarn/server/resourcemanager/TestRMHA.java   |   8 +-
 10 files changed, 496 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 5273614..00d31d8 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -86,6 +86,8 @@ Release 2.9.0 - UNRELEASED
 
     YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda)
 
+    YARN-4438. Implement RM leader election with curator. (Jian He via xgong)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 9a1eb54..37c81ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -545,6 +545,11 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX
       + "failover-controller.active-standby-elector.zk.retries";
 
+  @Private
+  public static final String CURATOR_LEADER_ELECTOR =
+      RM_HA_PREFIX + "curator-leader-elector.enabled";
+  public static final boolean DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED = false;
+
   ////////////////////////////////
   // RM state store configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 41c3d87..0e508ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -90,6 +90,7 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase
{
         .add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
     configurationPropsToSkipCompare
         .add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
+    configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR);
 
     // Ignore all YARN Application Timeline Service (version 1) properties
     configurationPrefixToSkipCompare.add("yarn.timeline-service.");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 353e72d..fcce722 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -106,6 +106,7 @@ public class AdminService extends CompositeService implements
   private String rmId;
 
   private boolean autoFailoverEnabled;
+  private boolean curatorEnabled;
   private EmbeddedElectorService embeddedElector;
 
   private Server server;
@@ -132,13 +133,16 @@ public class AdminService extends CompositeService implements
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     if (rmContext.isHAEnabled()) {
+      curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
+          YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
       autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
-      if (autoFailoverEnabled) {
+      if (autoFailoverEnabled && !curatorEnabled) {
         if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
           embeddedElector = createEmbeddedElectorService();
           addIfService(embeddedElector);
         }
       }
+
     }
 
     masterServiceBindAddress = conf.getSocketAddr(
@@ -319,7 +323,7 @@ public class AdminService extends CompositeService implements
       rm.transitionToActive();
     } catch (Exception e) {
       RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
-          "", "RMHAProtocolService",
+          "", "RM",
           "Exception transitioning to active");
       throw new ServiceFailedException(
           "Error when transitioning to Active mode", e);
@@ -338,7 +342,7 @@ public class AdminService extends CompositeService implements
           "Error on refreshAll during transistion to Active", e);
     }
     RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
-        "RMHAProtocolService");
+        "RM");
   }
 
   @Override
@@ -356,10 +360,10 @@ public class AdminService extends CompositeService implements
     try {
       rm.transitionToStandby(true);
       RMAuditLogger.logSuccess(user.getShortUserName(),
-          "transitionToStandby", "RMHAProtocolService");
+          "transitionToStandby", "RM");
     } catch (Exception e) {
       RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
-          "", "RMHAProtocolService",
+          "", "RM",
           "Exception transitioning to standby");
       throw new ServiceFailedException(
           "Error when transitioning to Standby mode", e);
@@ -369,15 +373,28 @@ public class AdminService extends CompositeService implements
   @Override
   public synchronized HAServiceStatus getServiceStatus() throws IOException {
     checkAccess("getServiceState");
-    HAServiceState haState = rmContext.getHAServiceState();
-    HAServiceStatus ret = new HAServiceStatus(haState);
-    if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
-      ret.setReadyToBecomeActive();
+    if (curatorEnabled) {
+      HAServiceStatus state;
+      if (rmContext.getLeaderElectorService().hasLeaderShip()) {
+        state = new HAServiceStatus(HAServiceState.ACTIVE);
+      } else {
+        state = new HAServiceStatus(HAServiceState.STANDBY);
+      }
+      // set empty string to avoid NPE at
+      // HAServiceProtocolServerSideTranslatorPB#getServiceStatus
+      state.setNotReadyToBecomeActive("");
+      return state;
     } else {
-      ret.setNotReadyToBecomeActive("State is " + haState);
+      HAServiceState haState = rmContext.getHAServiceState();
+      HAServiceStatus ret = new HAServiceStatus(haState);
+      if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
+        ret.setReadyToBecomeActive();
+      } else {
+        ret.setNotReadyToBecomeActive("State is " + haState);
+      }
+      return ret;
     }
-    return ret;
-  } 
+  }
 
   @Override
   public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
@@ -837,6 +854,12 @@ public class AdminService extends CompositeService implements
     } else if (!autoFailoverEnabled) {
       return "Auto Failover is not enabled.";
     }
-    return this.embeddedElector.getHAZookeeperConnectionState();
+    if (curatorEnabled) {
+      return "Connected to zookeeper : " + rmContext
+          .getLeaderElectorService().getCuratorClient().getZookeeperClient()
+          .isConnected();
+    } else {
+      return this.embeddedElector.getHAZookeeperConnectionState();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
new file mode 100644
index 0000000..3766676
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.io.IOException;
+
+
+public class LeaderElectorService extends AbstractService implements
+    LeaderLatchListener {
+  public static final Log LOG = LogFactory.getLog(LeaderElectorService.class);
+  private LeaderLatch leaderLatch;
+  private CuratorFramework curator;
+  private RMContext rmContext;
+  private String latchPath;
+  private String rmId;
+
+  public LeaderElectorService(RMContext rmContext) {
+    super(LeaderElectorService.class.getName());
+    this.rmContext = rmContext;
+
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
+    Preconditions.checkNotNull(zkHostPort,
+        YarnConfiguration.RM_ZK_ADDRESS + " is not set");
+
+    rmId = HAUtil.getRMHAId(conf);
+    String clusterId = YarnConfiguration.getClusterId(conf);
+
+    int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
+        YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
+    int maxRetryNum = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
+        YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
+
+    String zkBasePath = conf.get(
+        YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
+        YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
+    latchPath = zkBasePath + "/" + clusterId;
+
+    curator = CuratorFrameworkFactory.builder().connectString(zkHostPort)
+        .retryPolicy(new RetryNTimes(maxRetryNum, zkSessionTimeout)).build();
+    curator.start();
+    initAndStartLeaderLatch();
+    super.serviceInit(conf);
+  }
+
+  private void initAndStartLeaderLatch() throws Exception {
+    leaderLatch = new LeaderLatch(curator, latchPath, rmId);
+    leaderLatch.addListener(this);
+    leaderLatch.start();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    closeLeaderLatch();
+    super.serviceStop();
+  }
+
+  public boolean hasLeaderShip() {
+    return leaderLatch.hasLeadership();
+  }
+
+
+  @Override
+  public void isLeader() {
+    LOG.info(rmId + "is elected leader, transitioning to active");
+    try {
+      rmContext.getRMAdminService().transitionToActive(
+          new HAServiceProtocol.StateChangeRequestInfo(
+              HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
+    } catch (Exception e) {
+      LOG.info(rmId + " failed to transition to active, giving up leadership",
+          e);
+      notLeader();
+      reJoinElection();
+    }
+  }
+
+  public void reJoinElection() {
+    try {
+      closeLeaderLatch();
+      Thread.sleep(1000);
+      initAndStartLeaderLatch();
+    } catch (Exception e) {
+      LOG.info("Fail to re-join election.", e);
+    }
+  }
+
+  private void closeLeaderLatch() throws IOException {
+    if (leaderLatch != null) {
+      leaderLatch.close();
+    }
+  }
+  @Override
+  public void notLeader() {
+    LOG.info(rmId + " relinquish leadership");
+    try {
+      rmContext.getRMAdminService().transitionToStandby(
+          new HAServiceProtocol.StateChangeRequestInfo(
+              HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
+    } catch (Exception e) {
+      LOG.info(rmId + " did not transition to standby successfully.");
+    }
+  }
+
+  // only for testing
+  @VisibleForTesting
+  public CuratorFramework getCuratorClient() {
+    return this.curator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 9802a37..f50da3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -135,4 +135,8 @@ public interface RMContext {
   PlacementManager getQueuePlacementManager();
   
   void setQueuePlacementManager(PlacementManager placementMgr);
+
+  void setLeaderElectorService(LeaderElectorService elector);
+
+  LeaderElectorService getLeaderElectorService();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index ed9942b..ec2aeb7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -72,6 +72,7 @@ public class RMContextImpl implements RMContext {
 
   private RMApplicationHistoryWriter rmApplicationHistoryWriter;
   private SystemMetricsPublisher systemMetricsPublisher;
+  private LeaderElectorService elector;
 
   /**
    * Default constructor. To be used in conjunction with setter methods for
@@ -134,6 +135,16 @@ public class RMContextImpl implements RMContext {
   }
 
   @Override
+  public void setLeaderElectorService(LeaderElectorService elector) {
+    this.elector = elector;
+  }
+
+  @Override
+  public LeaderElectorService getLeaderElectorService() {
+    return this.elector;
+  }
+
+  @Override
   public RMStateStore getStateStore() {
     return activeServiceContext.getStateStore();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index aada69f..3b23ad8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -157,6 +157,7 @@ public class ResourceManager extends CompositeService implements Recoverable
{
   private AppReportFetcher fetcher = null;
   protected ResourceTrackerService resourceTracker;
   private JvmPauseMonitor pauseMonitor;
+  private boolean curatorEnabled = false;
 
   @VisibleForTesting
   protected String webAppAddress;
@@ -228,6 +229,13 @@ public class ResourceManager extends CompositeService implements Recoverable
{
     this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
     if (this.rmContext.isHAEnabled()) {
       HAUtil.verifyAndSetConfiguration(this.conf);
+      curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
+          YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
+      if (curatorEnabled) {
+        LeaderElectorService elector = new LeaderElectorService(rmContext);
+        addService(elector);
+        rmContext.setLeaderElectorService(elector);
+      }
     }
     
     // Set UGI and do login
@@ -759,7 +767,11 @@ public class ResourceManager extends CompositeService implements Recoverable
{
         // Transition to standby and reinit active services
         LOG.info("Transitioning RM to Standby mode");
         transitionToStandby(true);
-        adminService.resetLeaderElection();
+        if (curatorEnabled) {
+          rmContext.getLeaderElectorService().reJoinElection();
+        } else {
+          adminService.resetLeaderElection();
+        }
         return;
       } catch (Exception e) {
         LOG.fatal("Failed to transition RM to Standby mode.");
@@ -996,7 +1008,7 @@ public class ResourceManager extends CompositeService implements Recoverable
{
    * instance of {@link RMActiveServices} and initializes it.
    * @throws Exception
    */
-  protected void createAndInitActiveServices() throws Exception {
+  protected void createAndInitActiveServices() {
     activeServices = new RMActiveServices(this);
     activeServices.init(conf);
   }
@@ -1016,14 +1028,14 @@ public class ResourceManager extends CompositeService implements Recoverable
{
    * Helper method to stop {@link #activeServices}.
    * @throws Exception
    */
-  void stopActiveServices() throws Exception {
+  void stopActiveServices() {
     if (activeServices != null) {
       activeServices.stop();
       activeServices = null;
     }
   }
 
-  void reinitialize(boolean initialize) throws Exception {
+  void reinitialize(boolean initialize) {
     ClusterMetrics.destroy();
     QueueMetrics.clearQueueMetrics();
     if (initialize) {
@@ -1042,7 +1054,6 @@ public class ResourceManager extends CompositeService implements Recoverable
{
       LOG.info("Already in active state");
       return;
     }
-
     LOG.info("Transitioning to active state");
 
     this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() {
@@ -1083,7 +1094,7 @@ public class ResourceManager extends CompositeService implements Recoverable
{
   @Override
   protected void serviceStart() throws Exception {
     if (this.rmContext.isHAEnabled()) {
-      transitionToStandby(true);
+      transitionToStandby(false);
     } else {
       transitionToActive();
     }
@@ -1338,4 +1349,5 @@ public class ResourceManager extends CompositeService implements Recoverable
{
     out.println("                            "
         + "[-remove-application-from-state-store <appId>]" + "\n");
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
new file mode 100644
index 0000000..bb10041
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.base.Supplier;
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.TestingCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TestLeaderElectorService {
+
+  private static final String RM1_ADDRESS = "1.1.1.1:1";
+  private static final String RM1_NODE_ID = "rm1";
+
+  private static final String RM2_ADDRESS = "0.0.0.0:0";
+  private static final String RM2_NODE_ID = "rm2";
+
+  Configuration conf ;
+  MockRM rm1;
+  MockRM rm2;
+  TestingCluster zkCluster;
+  @Before
+  public void setUp() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.INFO);
+    conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, true);
+    conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
+
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+    conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
+
+    for (String confKey : YarnConfiguration.getServiceAddressConfKeys(conf)) {
+      conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
+      conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
+    }
+    zkCluster = new TestingCluster(3);
+    conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString());
+    zkCluster.start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (rm1 != null) {
+      rm1.stop();
+    }
+    if (rm2 !=null) {
+      rm2.stop();
+    }
+  }
+
+  // 1. rm1 active
+  // 2. rm2 standby
+  // 3. stop rm1
+  // 4. rm2 become active
+  @Test (timeout = 20000)
+  public void testRMShutDownCauseFailover() throws Exception {
+    rm1 = startRM("rm1", HAServiceState.ACTIVE);
+    rm2 = startRM("rm2", HAServiceState.STANDBY);
+
+    // wait for some time to make sure rm2 will not become active;
+    Thread.sleep(5000);
+    waitFor(rm2, HAServiceState.STANDBY);
+
+    rm1.stop();
+    // rm2 should become active;
+    waitFor(rm2, HAServiceState.ACTIVE);
+  }
+
+  // 1. rm1 active
+  // 2. rm2 standby
+  // 3. submit a job to rm1 which triggers state-store failure.
+  // 4. rm2 become
+  @Test
+  public void testStateStoreFailureCauseFailover() throws  Exception {
+
+    conf.set(YarnConfiguration.RM_HA_ID, "rm1");
+    MemoryRMStateStore memStore = new MemoryRMStateStore() {
+      @Override
+      public synchronized void storeApplicationStateInternal(ApplicationId
+          appId, ApplicationStateData appState) throws Exception{
+        throw new Exception("store app failure.");
+      }
+    };
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.init(conf);
+    rm1.start();
+
+    waitFor(rm1, HAServiceState.ACTIVE);
+
+    rm2 = startRM("rm2", HAServiceState.STANDBY);
+
+    // submit an app which will trigger state-store failure.
+    rm1.submitApp(200, "app1", "user1", null, "default", false);
+    waitFor(rm1, HAServiceState.STANDBY);
+
+    // rm2 should become active;
+    waitFor(rm2, HAServiceState.ACTIVE);
+
+    rm2.stop();
+    // rm1 will become active again
+    waitFor(rm1, HAServiceState.ACTIVE);
+  }
+
+  // 1. rm1 active
+  // 2. restart zk cluster
+  // 3. rm1 will first relinquish leadership and re-acquire leadership
+  @Test
+  public void testZKClusterDown() throws Exception {
+    rm1 = startRM("rm1", HAServiceState.ACTIVE);
+
+    // stop zk cluster
+    zkCluster.stop();
+    waitFor(rm1, HAServiceState.STANDBY);
+
+    Collection<InstanceSpec> instanceSpecs = zkCluster.getInstances();
+    zkCluster = new TestingCluster(instanceSpecs);
+    zkCluster.start();
+    // rm becomes active again
+    waitFor(rm1, HAServiceState.ACTIVE);
+  }
+
+  // 1. rm1 active
+  // 2. kill the zk session between the rm and zk cluster.
+  // 3. rm1 will first relinquish leadership and re-acquire leadership
+  @Test
+  public void testExpireCurrentZKSession() throws Exception{
+
+    rm1 = startRM("rm1", HAServiceState.ACTIVE);
+
+    LeaderElectorService service = rm1.getRMContext().getLeaderElectorService();
+    CuratorZookeeperClient client =
+        service.getCuratorClient().getZookeeperClient();
+    // this will expire current curator client session. curator will re-establish
+    // the session. RM will first relinquish leadership and re-acquire leadership
+    KillSession
+        .kill(client.getZooKeeper(), client.getCurrentConnectionString());
+
+    waitFor(rm1, HAServiceState.ACTIVE);
+  }
+
+  // 1. rm1 fail to become active.
+  // 2. rm1 will rejoin leader election and retry the leadership
+  @Test
+  public void testRMFailToTransitionToActive() throws Exception{
+    conf.set(YarnConfiguration.RM_HA_ID, "rm1");
+    final AtomicBoolean throwException = new AtomicBoolean(true);
+    Thread launchRM = new Thread() {
+      @Override
+      public void run() {
+        rm1 = new MockRM(conf) {
+          @Override
+          synchronized void transitionToActive() throws Exception {
+            if (throwException.get()) {
+             throw new Exception("Fail to transition to active");
+            } else {
+              super.transitionToActive();
+            }
+          }
+        };
+        rm1.init(conf);
+        rm1.start();
+      }
+     };
+    launchRM.start();
+    // wait some time, rm will keep retry the leadership;
+    Thread.sleep(5000);
+    throwException.set(false);
+    waitFor(rm1, HAServiceState.ACTIVE);
+  }
+
+  // 1. rm1 active
+  // 2. rm2 standby
+  // 3. kill the current connected zk instance
+  // 4. either rm1 or rm2 will become active.
+  @Test
+  public void testKillZKInstance() throws Exception {
+    rm1 = startRM("rm1", HAServiceState.ACTIVE);
+    rm2 = startRM("rm2", HAServiceState.STANDBY);
+
+    ZooKeeper zkClient =
+        rm1.getRMContext().getLeaderElectorService().getCuratorClient()
+            .getZookeeperClient().getZooKeeper();
+    InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient);
+    zkCluster.killServer(connectionInstance);
+
+    // wait for rm1 or rm2 to be active by randomness
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override public Boolean get() {
+        try {
+          HAServiceState rm1State =
+              rm1.getAdminService().getServiceStatus().getState();
+          HAServiceState rm2State =
+              rm2.getAdminService().getServiceStatus().getState();
+          return (rm1State.equals(HAServiceState.ACTIVE) && rm2State
+              .equals(HAServiceState.STANDBY)) || (
+              rm1State.equals(HAServiceState.STANDBY) && rm2State
+                  .equals(HAServiceState.ACTIVE));
+        } catch (IOException e) {
+        }
+        return false;
+      }
+    }, 2000, 15000);
+  }
+
+  private MockRM startRM(String rmId, HAServiceState state) throws Exception{
+    YarnConfiguration yarnConf = new YarnConfiguration(conf);
+    yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
+    MockRM rm = new MockRM(yarnConf);
+    rm.init(yarnConf);
+    rm.start();
+    waitFor(rm, state);
+    return rm;
+  }
+
+  private void waitFor(final MockRM rm,
+      final HAServiceState state)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override public Boolean get() {
+        try {
+          return rm.getAdminService().getServiceStatus().getState()
+              .equals(state);
+        } catch (IOException e) {
+        }
+        return false;
+      }
+    }, 2000, 15000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index 62cfe84..70bba15 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -471,8 +471,12 @@ public class TestRMHA {
     memStore.init(conf);
     rm = new MockRM(conf, memStore) {
       @Override
-      void stopActiveServices() throws Exception {
-        Thread.sleep(10000);
+      void stopActiveServices() {
+        try {
+          Thread.sleep(10000);
+        } catch (Exception e) {
+          throw new RuntimeException (e);
+        }
         super.stopActiveServices();
       }
     };


Mime
View raw message