hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [1/3] hadoop git commit: Y-5709. Add an interface EmbeddedElector that is extended by both Curator-based and ActiveStandbyElector-based implementations.
Date Wed, 07 Dec 2016 19:05:50 GMT
Repository: hadoop
Updated Branches:
  refs/heads/yarn-5709 [created] f05f45f10


Y-5709. Add an interface EmbeddedElector that is extended by both Curator-based and ActiveStandbyElector-based implementations.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4a6ed7e1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4a6ed7e1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4a6ed7e1

Branch: refs/heads/yarn-5709
Commit: 4a6ed7e11b4183e03f08a90acf1a57199edf4319
Parents: a7288da
Author: Karthik Kambatla <kasha@cloudera.com>
Authored: Tue Dec 6 15:32:32 2016 -0800
Committer: Karthik Kambatla <kasha@apache.org>
Committed: Tue Dec 6 18:57:41 2016 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  14 +-
 ...ActiveStandbyElectorBasedElectorService.java | 265 +++++++++++++++++++
 .../server/resourcemanager/AdminService.java    |  67 ++---
 .../CuratorBasedElectorService.java             | 135 ++++++++++
 .../server/resourcemanager/EmbeddedElector.java |  36 +++
 .../resourcemanager/EmbeddedElectorService.java | 260 ------------------
 .../resourcemanager/LeaderElectorService.java   | 129 ---------
 .../yarn/server/resourcemanager/RMContext.java  |   4 +-
 .../server/resourcemanager/RMContextImpl.java   |   6 +-
 .../server/resourcemanager/ResourceManager.java |  34 ++-
 .../yarn/server/resourcemanager/MockRM.java     |  12 +-
 .../TestLeaderElectorService.java               |  10 +-
 .../resourcemanager/TestRMEmbeddedElector.java  |  49 ++--
 13 files changed, 531 insertions(+), 490 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6ed7e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index fce78c9..055505e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ActiveStandbyElector;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -695,9 +696,20 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX
       + "failover-controller.active-standby-elector.zk.retries";
 
-  @Private
+
+  /**
+   * Whether to use curator-based elector for leader election.
+   *
+   * @deprecated Eventually, we want to default to the curator-based
+   * implementation and remove the {@link ActiveStandbyElector} based
+   * implementation. We should remove this config then.
+   */
+  @Unstable
+  @Deprecated
   public static final String CURATOR_LEADER_ELECTOR =
       RM_HA_PREFIX + "curator-leader-elector.enabled";
+  @Private
+  @Unstable
   public static final boolean DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED = false;
 
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6ed7e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
new file mode 100644
index 0000000..6be9828
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.ActiveStandbyElector;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ZKUtil;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.ACL;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ActiveStandbyElectorBasedElectorService extends AbstractService
+    implements EmbeddedElector,
+    ActiveStandbyElector.ActiveStandbyElectorCallback {
+  private static final Log LOG =
+      LogFactory.getLog(ActiveStandbyElectorBasedElectorService.class.getName());
+  private static final HAServiceProtocol.StateChangeRequestInfo req =
+      new HAServiceProtocol.StateChangeRequestInfo(
+          HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
+
+  private RMContext rmContext;
+
+  private byte[] localActiveNodeInfo;
+  private ActiveStandbyElector elector;
+  private long zkSessionTimeout;
+  private Timer zkDisconnectTimer;
+  @VisibleForTesting
+  final Object zkDisconnectLock = new Object();
+
+  ActiveStandbyElectorBasedElectorService(RMContext rmContext) {
+    super(ActiveStandbyElectorBasedElectorService.class.getName());
+    this.rmContext = rmContext;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf)
+      throws Exception {
+    conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf);
+
+    String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
+    if (zkQuorum == null) {
+     throw new YarnRuntimeException("Embedded automatic failover " +
+          "is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
+          " is not set");
+    }
+
+    String rmId = HAUtil.getRMHAId(conf);
+    String clusterId = YarnConfiguration.getClusterId(conf);
+    localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);
+
+    String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
+        YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
+    String electionZNode = zkBasePath + "/" + clusterId;
+
+    zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
+        YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
+
+    List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
+    List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
+
+    int maxRetryNum =
+        conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf
+          .getInt(CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
+            CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT));
+    elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
+        electionZNode, zkAcls, zkAuths, this, maxRetryNum, false);
+
+    elector.ensureParentZNode();
+    if (!isParentZnodeSafe(clusterId)) {
+      notifyFatalError(electionZNode + " znode has invalid data! "+
+          "Might need formatting!");
+    }
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    elector.joinElection(localActiveNodeInfo);
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    /**
+     * When error occurs in serviceInit(), serviceStop() can be called.
+     * We need null check for the case.
+     */
+    if (elector != null) {
+      elector.quitElection(false);
+      elector.terminateConnection();
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public void becomeActive() throws ServiceFailedException {
+    cancelDisconnectTimer();
+
+    try {
+      rmContext.getRMAdminService().transitionToActive(req);
+    } catch (Exception e) {
+      throw new ServiceFailedException("RM could not transition to Active", e);
+    }
+  }
+
+  @Override
+  public void becomeStandby() {
+    cancelDisconnectTimer();
+
+    try {
+      rmContext.getRMAdminService().transitionToStandby(req);
+    } catch (Exception e) {
+      LOG.error("RM could not transition to Standby", e);
+    }
+  }
+
+  /**
+   * Stop the disconnect timer.  Any running tasks will be allowed to complete.
+   */
+  private void cancelDisconnectTimer() {
+    synchronized (zkDisconnectLock) {
+      if (zkDisconnectTimer != null) {
+        zkDisconnectTimer.cancel();
+        zkDisconnectTimer = null;
+      }
+    }
+  }
+
+  /**
+   * When the ZK client loses contact with ZK, this method will be called to
+   * allow the RM to react. Because the loss of connection can be noticed
+   * before the session timeout happens, it is undesirable to transition
+   * immediately. Instead the method starts a timer that will wait
+   * {@link YarnConfiguration#RM_ZK_TIMEOUT_MS} milliseconds before
+   * initiating the transition into standby state.
+   */
+  @Override
+  public void enterNeutralMode() {
+    LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
+        + zkSessionTimeout + " ms if connection is not reestablished.");
+
+    // If we've just become disconnected, start a timer.  When the time's up,
+    // we'll transition to standby.
+    synchronized (zkDisconnectLock) {
+      if (zkDisconnectTimer == null) {
+        zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
+        zkDisconnectTimer.schedule(new TimerTask() {
+          @Override
+          public void run() {
+            synchronized (zkDisconnectLock) {
+              // Only run if the timer hasn't been cancelled
+              if (zkDisconnectTimer != null) {
+                becomeStandby();
+              }
+            }
+          }
+        }, zkSessionTimeout);
+      }
+    }
+  }
+
+  @SuppressWarnings(value = "unchecked")
+  @Override
+  public void notifyFatalError(String errorMessage) {
+    rmContext.getDispatcher().getEventHandler().handle(
+        new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage));
+  }
+
+  @Override
+  public void fenceOldActive(byte[] oldActiveData) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Request to fence old active being ignored, " +
+          "as embedded leader election doesn't support fencing");
+    }
+  }
+
+  private static byte[] createActiveNodeInfo(String clusterId, String rmId)
+      throws IOException {
+    return YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
+        .newBuilder()
+        .setClusterId(clusterId)
+        .setRmId(rmId)
+        .build()
+        .toByteArray();
+  }
+
+  private boolean isParentZnodeSafe(String clusterId)
+      throws InterruptedException, IOException, KeeperException {
+    byte[] data;
+    try {
+      data = elector.getActiveData();
+    } catch (ActiveStandbyElector.ActiveNotFoundException e) {
+      // no active found, parent znode is safe
+      return true;
+    }
+
+    YarnServerResourceManagerServiceProtos.ActiveRMInfoProto proto;
+    try {
+      proto = YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
+          .parseFrom(data);
+    } catch (InvalidProtocolBufferException e) {
+      LOG.error("Invalid data in ZK: " + StringUtils.byteToHexString(data));
+      return false;
+    }
+
+    // Check if the passed proto corresponds to an RM in the same cluster
+    if (!proto.getClusterId().equals(clusterId)) {
+      LOG.error("Mismatched cluster! The other RM seems " +
+          "to be from a different cluster. Current cluster = " + clusterId +
+          "Other RM's cluster = " + proto.getClusterId());
+      return false;
+    }
+    return true;
+  }
+
+  // EmbeddedElector methods
+
+  @Override
+  public void rejoinElection() {
+    elector.quitElection(false);
+    elector.joinElection(localActiveNodeInfo);
+  }
+
+  @Override
+  public String getZookeeperConnectionState() {
+    return elector.getHAZookeeperConnectionState();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6ed7e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index c060659..fda4d9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -29,7 +29,6 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ha.HAServiceProtocol;
@@ -108,8 +107,6 @@ public class AdminService extends CompositeService implements
   private String rmId;
 
   private boolean autoFailoverEnabled;
-  private boolean curatorEnabled;
-  private EmbeddedElectorService embeddedElector;
 
   private Server server;
 
@@ -134,18 +131,8 @@ public class AdminService extends CompositeService implements
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
-    if (rmContext.isHAEnabled()) {
-      curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
-          YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
-      autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
-      if (autoFailoverEnabled && !curatorEnabled) {
-        if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
-          embeddedElector = createEmbeddedElectorService();
-          addIfService(embeddedElector);
-        }
-      }
-
-    }
+    autoFailoverEnabled =
+        rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf);
 
     masterServiceBindAddress = conf.getSocketAddr(
         YarnConfiguration.RM_BIND_HOST,
@@ -228,17 +215,6 @@ public class AdminService extends CompositeService implements
     }
   }
 
-  protected EmbeddedElectorService createEmbeddedElectorService() {
-    return new EmbeddedElectorService(rmContext);
-  }
-
-  @InterfaceAudience.Private
-  void resetLeaderElection() {
-    if (embeddedElector != null) {
-      embeddedElector.resetLeaderElection();
-    }
-  }
-
   private UserGroupInformation checkAccess(String method) throws IOException {
     return RMServerUtils.verifyAdminAccess(authorizer, method, LOG);
   }
@@ -375,30 +351,24 @@ public class AdminService extends CompositeService implements
     }
   }
 
+  /**
+   * Return the HA status of this RM. This includes the current state and
+   * whether the RM is ready to become active.
+   *
+   * @return {@link HAServiceStatus} of the current RM
+   * @throws IOException
+   */
   @Override
   public synchronized HAServiceStatus getServiceStatus() throws IOException {
     checkAccess("getServiceState");
-    if (curatorEnabled) {
-      HAServiceStatus state;
-      if (rmContext.getLeaderElectorService().hasLeaderShip()) {
-        state = new HAServiceStatus(HAServiceState.ACTIVE);
-      } else {
-        state = new HAServiceStatus(HAServiceState.STANDBY);
-      }
-      // set empty string to avoid NPE at
-      // HAServiceProtocolServerSideTranslatorPB#getServiceStatus
-      state.setNotReadyToBecomeActive("");
-      return state;
+    HAServiceState haState = rmContext.getHAServiceState();
+    HAServiceStatus ret = new HAServiceStatus(haState);
+    if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
+      ret.setReadyToBecomeActive();
     } else {
-      HAServiceState haState = rmContext.getHAServiceState();
-      HAServiceStatus ret = new HAServiceStatus(haState);
-      if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
-        ret.setReadyToBecomeActive();
-      } else {
-        ret.setNotReadyToBecomeActive("State is " + haState);
-      }
-      return ret;
+      ret.setNotReadyToBecomeActive("State is " + haState);
     }
+    return ret;
   }
 
   @Override
@@ -932,13 +902,8 @@ public class AdminService extends CompositeService implements
       return "ResourceManager HA is not enabled.";
     } else if (!autoFailoverEnabled) {
       return "Auto Failover is not enabled.";
-    }
-    if (curatorEnabled) {
-      return "Connected to zookeeper : " + rmContext
-          .getLeaderElectorService().getCuratorClient().getZookeeperClient()
-          .isConnected();
     } else {
-      return this.embeddedElector.getHAZookeeperConnectionState();
+      return rmContext.getLeaderElectorService().getZookeeperConnectionState();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6ed7e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
new file mode 100644
index 0000000..020c9e3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.io.IOException;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class CuratorBasedElectorService extends AbstractService
+    implements EmbeddedElector, LeaderLatchListener {
+  public static final Log LOG = LogFactory.getLog(CuratorBasedElectorService.class);
+  private LeaderLatch leaderLatch;
+  private CuratorFramework curator;
+  private RMContext rmContext;
+  private String latchPath;
+  private String rmId;
+  private ResourceManager rm;
+
+  public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) {
+    super(CuratorBasedElectorService.class.getName());
+    this.rmContext = rmContext;
+    this.rm = rm;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    rmId = HAUtil.getRMHAId(conf);
+    String clusterId = YarnConfiguration.getClusterId(conf);
+    String zkBasePath = conf.get(
+        YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
+        YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
+    latchPath = zkBasePath + "/" + clusterId;
+    curator = rm.getCurator();
+    initAndStartLeaderLatch();
+    super.serviceInit(conf);
+  }
+
+  private void initAndStartLeaderLatch() throws Exception {
+    leaderLatch = new LeaderLatch(curator, latchPath, rmId);
+    leaderLatch.addListener(this);
+    leaderLatch.start();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    closeLeaderLatch();
+    super.serviceStop();
+  }
+
+  @Override
+  public void rejoinElection() {
+    try {
+      closeLeaderLatch();
+      Thread.sleep(1000);
+      initAndStartLeaderLatch();
+    } catch (Exception e) {
+      LOG.info("Fail to re-join election.", e);
+    }
+  }
+
+  @Override
+  public String getZookeeperConnectionState() {
+    return "Connected to zookeeper : " +
+        curator.getZookeeperClient().isConnected();
+  }
+
+  @Override
+  public void isLeader() {
+    LOG.info(rmId + "is elected leader, transitioning to active");
+    try {
+      rmContext.getRMAdminService().transitionToActive(
+          new HAServiceProtocol.StateChangeRequestInfo(
+              HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
+    } catch (Exception e) {
+      LOG.info(rmId + " failed to transition to active, giving up leadership",
+          e);
+      notLeader();
+      rejoinElection();
+    }
+  }
+
+  private void closeLeaderLatch() throws IOException {
+    if (leaderLatch != null) {
+      leaderLatch.close();
+    }
+  }
+
+  @Override
+  public void notLeader() {
+    LOG.info(rmId + " relinquish leadership");
+    try {
+      rmContext.getRMAdminService().transitionToStandby(
+          new HAServiceProtocol.StateChangeRequestInfo(
+              HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
+    } catch (Exception e) {
+      LOG.info(rmId + " did not transition to standby successfully.");
+    }
+  }
+
+  // only for testing
+  @VisibleForTesting
+  public CuratorFramework getCuratorClient() {
+    return this.curator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6ed7e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java
new file mode 100644
index 0000000..3035aef
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.Service;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface EmbeddedElector extends Service{
+  /**
+   * Leave and rejoin leader election.
+   */
+  void rejoinElection();
+
+  /**
+   * Get information about the elector's connection to Zookeeper.
+   */
+  String getZookeeperConnectionState();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6ed7e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
deleted file mode 100644
index 88d2e10..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.ha.ActiveStandbyElector;
-import org.apache.hadoop.ha.HAServiceProtocol;
-import org.apache.hadoop.ha.ServiceFailedException;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ZKUtil;
-import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.ACL;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class EmbeddedElectorService extends AbstractService
-    implements ActiveStandbyElector.ActiveStandbyElectorCallback {
-  private static final Log LOG =
-      LogFactory.getLog(EmbeddedElectorService.class.getName());
-  private static final HAServiceProtocol.StateChangeRequestInfo req =
-      new HAServiceProtocol.StateChangeRequestInfo(
-          HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
-
-  private RMContext rmContext;
-
-  private byte[] localActiveNodeInfo;
-  private ActiveStandbyElector elector;
-  private long zkSessionTimeout;
-  private Timer zkDisconnectTimer;
-  @VisibleForTesting
-  final Object zkDisconnectLock = new Object();
-
-  EmbeddedElectorService(RMContext rmContext) {
-    super(EmbeddedElectorService.class.getName());
-    this.rmContext = rmContext;
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf)
-      throws Exception {
-    conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf);
-
-    String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
-    if (zkQuorum == null) {
-     throw new YarnRuntimeException("Embedded automatic failover " +
-          "is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
-          " is not set");
-    }
-
-    String rmId = HAUtil.getRMHAId(conf);
-    String clusterId = YarnConfiguration.getClusterId(conf);
-    localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);
-
-    String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
-        YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
-    String electionZNode = zkBasePath + "/" + clusterId;
-
-    zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
-        YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
-
-    List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
-    List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
-
-    int maxRetryNum =
-        conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf
-          .getInt(CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
-            CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT));
-    elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
-        electionZNode, zkAcls, zkAuths, this, maxRetryNum, false);
-
-    elector.ensureParentZNode();
-    if (!isParentZnodeSafe(clusterId)) {
-      notifyFatalError(electionZNode + " znode has invalid data! "+
-          "Might need formatting!");
-    }
-
-    super.serviceInit(conf);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    elector.joinElection(localActiveNodeInfo);
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    /**
-     * When error occurs in serviceInit(), serviceStop() can be called.
-     * We need null check for the case.
-     */
-    if (elector != null) {
-      elector.quitElection(false);
-      elector.terminateConnection();
-    }
-    super.serviceStop();
-  }
-
-  @Override
-  public void becomeActive() throws ServiceFailedException {
-    cancelDisconnectTimer();
-
-    try {
-      rmContext.getRMAdminService().transitionToActive(req);
-    } catch (Exception e) {
-      throw new ServiceFailedException("RM could not transition to Active", e);
-    }
-  }
-
-  @Override
-  public void becomeStandby() {
-    cancelDisconnectTimer();
-
-    try {
-      rmContext.getRMAdminService().transitionToStandby(req);
-    } catch (Exception e) {
-      LOG.error("RM could not transition to Standby", e);
-    }
-  }
-
-  /**
-   * Stop the disconnect timer.  Any running tasks will be allowed to complete.
-   */
-  private void cancelDisconnectTimer() {
-    synchronized (zkDisconnectLock) {
-      if (zkDisconnectTimer != null) {
-        zkDisconnectTimer.cancel();
-        zkDisconnectTimer = null;
-      }
-    }
-  }
-
-  /**
-   * When the ZK client loses contact with ZK, this method will be called to
-   * allow the RM to react. Because the loss of connection can be noticed
-   * before the session timeout happens, it is undesirable to transition
-   * immediately. Instead the method starts a timer that will wait
-   * {@link YarnConfiguration#RM_ZK_TIMEOUT_MS} milliseconds before
-   * initiating the transition into standby state.
-   */
-  @Override
-  public void enterNeutralMode() {
-    LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
-        + zkSessionTimeout + " ms if connection is not reestablished.");
-
-    // If we've just become disconnected, start a timer.  When the time's up,
-    // we'll transition to standby.
-    synchronized (zkDisconnectLock) {
-      if (zkDisconnectTimer == null) {
-        zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
-        zkDisconnectTimer.schedule(new TimerTask() {
-          @Override
-          public void run() {
-            synchronized (zkDisconnectLock) {
-              // Only run if the timer hasn't been cancelled
-              if (zkDisconnectTimer != null) {
-                becomeStandby();
-              }
-            }
-          }
-        }, zkSessionTimeout);
-      }
-    }
-  }
-
-  @SuppressWarnings(value = "unchecked")
-  @Override
-  public void notifyFatalError(String errorMessage) {
-    rmContext.getDispatcher().getEventHandler().handle(
-        new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage));
-  }
-
-  @Override
-  public void fenceOldActive(byte[] oldActiveData) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Request to fence old active being ignored, " +
-          "as embedded leader election doesn't support fencing");
-    }
-  }
-
-  private static byte[] createActiveNodeInfo(String clusterId, String rmId)
-      throws IOException {
-    return YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
-        .newBuilder()
-        .setClusterId(clusterId)
-        .setRmId(rmId)
-        .build()
-        .toByteArray();
-  }
-
-  private boolean isParentZnodeSafe(String clusterId)
-      throws InterruptedException, IOException, KeeperException {
-    byte[] data;
-    try {
-      data = elector.getActiveData();
-    } catch (ActiveStandbyElector.ActiveNotFoundException e) {
-      // no active found, parent znode is safe
-      return true;
-    }
-
-    YarnServerResourceManagerServiceProtos.ActiveRMInfoProto proto;
-    try {
-      proto = YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
-          .parseFrom(data);
-    } catch (InvalidProtocolBufferException e) {
-      LOG.error("Invalid data in ZK: " + StringUtils.byteToHexString(data));
-      return false;
-    }
-
-    // Check if the passed proto corresponds to an RM in the same cluster
-    if (!proto.getClusterId().equals(clusterId)) {
-      LOG.error("Mismatched cluster! The other RM seems " +
-          "to be from a different cluster. Current cluster = " + clusterId +
-          "Other RM's cluster = " + proto.getClusterId());
-      return false;
-    }
-    return true;
-  }
-
-  public void resetLeaderElection() {
-    elector.quitElection(false);
-    elector.joinElection(localActiveNodeInfo);
-  }
-
-  public String getHAZookeeperConnectionState() {
-    return elector.getHAZookeeperConnectionState();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6ed7e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
deleted file mode 100644
index 8c1a6eb..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.HAServiceProtocol;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-import java.io.IOException;
-
-
-public class LeaderElectorService extends AbstractService implements
-    LeaderLatchListener {
-  public static final Log LOG = LogFactory.getLog(LeaderElectorService.class);
-  private LeaderLatch leaderLatch;
-  private CuratorFramework curator;
-  private RMContext rmContext;
-  private String latchPath;
-  private String rmId;
-  private ResourceManager rm;
-
-  public LeaderElectorService(RMContext rmContext, ResourceManager rm) {
-    super(LeaderElectorService.class.getName());
-    this.rmContext = rmContext;
-    this.rm = rm;
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    rmId = HAUtil.getRMHAId(conf);
-    String clusterId = YarnConfiguration.getClusterId(conf);
-    String zkBasePath = conf.get(
-        YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
-        YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
-    latchPath = zkBasePath + "/" + clusterId;
-    curator = rm.getCurator();
-    initAndStartLeaderLatch();
-    super.serviceInit(conf);
-  }
-
-  private void initAndStartLeaderLatch() throws Exception {
-    leaderLatch = new LeaderLatch(curator, latchPath, rmId);
-    leaderLatch.addListener(this);
-    leaderLatch.start();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    closeLeaderLatch();
-    super.serviceStop();
-  }
-
-  public boolean hasLeaderShip() {
-    return leaderLatch.hasLeadership();
-  }
-
-
-  @Override
-  public void isLeader() {
-    LOG.info(rmId + "is elected leader, transitioning to active");
-    try {
-      rmContext.getRMAdminService().transitionToActive(
-          new HAServiceProtocol.StateChangeRequestInfo(
-              HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
-    } catch (Exception e) {
-      LOG.info(rmId + " failed to transition to active, giving up leadership",
-          e);
-      notLeader();
-      reJoinElection();
-    }
-  }
-
-  public void reJoinElection() {
-    try {
-      closeLeaderLatch();
-      Thread.sleep(1000);
-      initAndStartLeaderLatch();
-    } catch (Exception e) {
-      LOG.info("Fail to re-join election.", e);
-    }
-  }
-
-  private void closeLeaderLatch() throws IOException {
-    if (leaderLatch != null) {
-      leaderLatch.close();
-    }
-  }
-  @Override
-  public void notLeader() {
-    LOG.info(rmId + " relinquish leadership");
-    try {
-      rmContext.getRMAdminService().transitionToStandby(
-          new HAServiceProtocol.StateChangeRequestInfo(
-              HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
-    } catch (Exception e) {
-      LOG.info(rmId + " did not transition to standby successfully.");
-    }
-  }
-
-  // only for testing
-  @VisibleForTesting
-  public CuratorFramework getCuratorClient() {
-    return this.curator;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6ed7e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index c9d185f..30bf5ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -145,9 +145,9 @@ public interface RMContext {
   
   void setQueuePlacementManager(PlacementManager placementMgr);
 
-  void setLeaderElectorService(LeaderElectorService elector);
+  void setLeaderElectorService(EmbeddedElector elector);
 
-  LeaderElectorService getLeaderElectorService();
+  EmbeddedElector getLeaderElectorService();
 
   QueueLimitCalculator getNodeManagerQueueLimitCalculator();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6ed7e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 3f17ac6..a98f09a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -76,7 +76,7 @@ public class RMContextImpl implements RMContext {
 
   private RMApplicationHistoryWriter rmApplicationHistoryWriter;
   private SystemMetricsPublisher systemMetricsPublisher;
-  private LeaderElectorService elector;
+  private EmbeddedElector elector;
 
   private QueueLimitCalculator queueLimitCalculator;
 
@@ -143,12 +143,12 @@ public class RMContextImpl implements RMContext {
   }
 
   @Override
-  public void setLeaderElectorService(LeaderElectorService elector) {
+  public void setLeaderElectorService(EmbeddedElector elector) {
     this.elector = elector;
   }
 
   @Override
-  public LeaderElectorService getLeaderElectorService() {
+  public EmbeddedElector getLeaderElectorService() {
     return this.elector;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6ed7e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 8ddbc20..192bbca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -272,16 +272,17 @@ public class ResourceManager extends CompositeService implements Recoverable {
     this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
     if (this.rmContext.isHAEnabled()) {
       HAUtil.verifyAndSetConfiguration(this.conf);
-      curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
-          YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
-      if (curatorEnabled) {
-        this.curator = createAndStartCurator(conf);
-        LeaderElectorService elector = new LeaderElectorService(rmContext, this);
+
+      // If the RM is configured to use an embedded leader elector,
+      // initialize the leader elector.
+      if (HAUtil.isAutomaticFailoverEnabled(conf) &&
+          HAUtil.isAutomaticFailoverEmbedded(conf)) {
+        EmbeddedElector elector = createEmbeddedElector();
         addService(elector);
         rmContext.setLeaderElectorService(elector);
       }
     }
-    
+
     // Set UGI and do login
     // If security is enabled, use login user
     // If security is not enabled, use current user
@@ -331,6 +332,20 @@ public class ResourceManager extends CompositeService implements Recoverable {
     super.serviceInit(this.conf);
   }
 
+  protected EmbeddedElector createEmbeddedElector() throws IOException {
+    EmbeddedElector elector;
+    curatorEnabled =
+        conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
+            YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
+    if (curatorEnabled) {
+      this.curator = createAndStartCurator(conf);
+      elector = new CuratorBasedElectorService(rmContext, this);
+    } else {
+      elector = new ActiveStandbyElectorBasedElectorService(rmContext);
+    }
+    return elector;
+  }
+
   public CuratorFramework createAndStartCurator(Configuration conf)
       throws IOException {
     String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
@@ -802,12 +817,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
         // Transition to standby and reinit active services
         LOG.info("Transitioning RM to Standby mode");
         transitionToStandby(true);
-        if (curatorEnabled) {
-          rmContext.getLeaderElectorService().reJoinElection();
-        } else {
-          adminService.resetLeaderElection();
-        }
-        return;
+        rmContext.getLeaderElectorService().rejoinElection();
       } catch (Exception e) {
         LOG.fatal("Failed to transition RM to Standby mode.");
         ExitUtil.terminate(1, e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6ed7e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index ea573e2..1b354af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -109,6 +109,8 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.Assert;
 
+import static org.mockito.Mockito.mock;
+
 @SuppressWarnings("unchecked")
 public class MockRM extends ResourceManager {
 
@@ -193,6 +195,11 @@ public class MockRM extends ResourceManager {
   }
 
   @Override
+  protected EmbeddedElector createEmbeddedElector() {
+    return mock(EmbeddedElector.class);
+  }
+
+  @Override
   protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
     return new EventHandler<SchedulerEvent>() {
       @Override
@@ -984,11 +991,6 @@ public class MockRM extends ResourceManager {
       protected void stopServer() {
         // don't do anything
       }
-
-      @Override
-      protected EmbeddedElectorService createEmbeddedElectorService() {
-        return null;
-      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6ed7e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
index bb10041..e58d77e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
@@ -167,7 +167,8 @@ public class TestLeaderElectorService {
 
     rm1 = startRM("rm1", HAServiceState.ACTIVE);
 
-    LeaderElectorService service = rm1.getRMContext().getLeaderElectorService();
+    CuratorBasedElectorService service = (CuratorBasedElectorService)
+        rm1.getRMContext().getLeaderElectorService();
     CuratorZookeeperClient client =
         service.getCuratorClient().getZookeeperClient();
     // this will expire current curator client session. curator will re-establish
@@ -217,9 +218,12 @@ public class TestLeaderElectorService {
     rm1 = startRM("rm1", HAServiceState.ACTIVE);
     rm2 = startRM("rm2", HAServiceState.STANDBY);
 
+    CuratorBasedElectorService service = (CuratorBasedElectorService)
+        rm1.getRMContext().getLeaderElectorService();
+
     ZooKeeper zkClient =
-        rm1.getRMContext().getLeaderElectorService().getCuratorClient()
-            .getZookeeperClient().getZooKeeper();
+        service.getCuratorClient().getZookeeperClient().getZooKeeper();
+
     InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient);
     zkCluster.killServer(connectionInstance);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6ed7e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
index bfd0b4e..1fe9bbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
@@ -127,7 +127,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
     myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50);
     when(rc.getRMAdminService()).thenReturn(as);
 
-    EmbeddedElectorService ees = new EmbeddedElectorService(rc);
+    ActiveStandbyElectorBasedElectorService
+        ees = new ActiveStandbyElectorBasedElectorService(rc);
     ees.init(myConf);
 
     ees.enterNeutralMode();
@@ -164,7 +165,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
    * @throws InterruptedException if interrupted
    */
   private void testCallbackSynchronizationActive(AdminService as,
-      EmbeddedElectorService ees) throws IOException, InterruptedException {
+      ActiveStandbyElectorBasedElectorService ees)
+      throws IOException, InterruptedException {
     ees.becomeActive();
 
     Thread.sleep(100);
@@ -183,7 +185,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
    * @throws InterruptedException if interrupted
    */
   private void testCallbackSynchronizationStandby(AdminService as,
-      EmbeddedElectorService ees) throws IOException, InterruptedException {
+      ActiveStandbyElectorBasedElectorService ees)
+      throws IOException, InterruptedException {
     ees.becomeStandby();
 
     Thread.sleep(100);
@@ -201,7 +204,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
    * @throws InterruptedException if interrupted
    */
   private void testCallbackSynchronizationNeutral(AdminService as,
-      EmbeddedElectorService ees) throws IOException, InterruptedException {
+      ActiveStandbyElectorBasedElectorService ees)
+      throws IOException, InterruptedException {
     ees.enterNeutralMode();
 
     Thread.sleep(100);
@@ -220,7 +224,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
    * @throws InterruptedException if interrupted
    */
   private void testCallbackSynchronizationTimingActive(AdminService as,
-      EmbeddedElectorService ees) throws IOException, InterruptedException {
+      ActiveStandbyElectorBasedElectorService ees)
+      throws IOException, InterruptedException {
     synchronized (ees.zkDisconnectLock) {
       // Sleep while holding the lock so that the timer thread can't do
       // anything when it runs.  Sleep until we're pretty sure the timer thread
@@ -250,7 +255,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
    * @throws InterruptedException if interrupted
    */
   private void testCallbackSynchronizationTimingStandby(AdminService as,
-      EmbeddedElectorService ees) throws IOException, InterruptedException {
+      ActiveStandbyElectorBasedElectorService ees)
+      throws IOException, InterruptedException {
     synchronized (ees.zkDisconnectLock) {
       // Sleep while holding the lock so that the timer thread can't do
       // anything when it runs.  Sleep until we're pretty sure the timer thread
@@ -283,25 +289,20 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
     }
 
     @Override
-    protected AdminService createAdminService() {
-      return new AdminService(MockRMWithElector.this, getRMContext()) {
+    protected EmbeddedElector createEmbeddedElector() {
+      return new ActiveStandbyElectorBasedElectorService(getRMContext()) {
         @Override
-        protected EmbeddedElectorService createEmbeddedElectorService() {
-          return new EmbeddedElectorService(getRMContext()) {
-            @Override
-            public void becomeActive() throws
-                ServiceFailedException {
-              try {
-                callbackCalled.set(true);
-                TestRMEmbeddedElector.LOG.info("Callback called. Sleeping now");
-                Thread.sleep(delayMs);
-                TestRMEmbeddedElector.LOG.info("Sleep done");
-              } catch (InterruptedException e) {
-                e.printStackTrace();
-              }
-              super.becomeActive();
-            }
-          };
+        public void becomeActive() throws
+            ServiceFailedException {
+          try {
+            callbackCalled.set(true);
+            TestRMEmbeddedElector.LOG.info("Callback called. Sleeping now");
+            Thread.sleep(delayMs);
+            TestRMEmbeddedElector.LOG.info("Sleep done");
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+          super.becomeActive();
         }
       };
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message