Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C035BCC0D for ; Thu, 7 Jun 2012 21:26:04 +0000 (UTC) Received: (qmail 9503 invoked by uid 500); 7 Jun 2012 21:26:04 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 9449 invoked by uid 500); 7 Jun 2012 21:26:04 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 9442 invoked by uid 99); 7 Jun 2012 21:26:04 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Jun 2012 21:26:04 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Jun 2012 21:25:59 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 1C75023889E0; Thu, 7 Jun 2012 21:25:39 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1347804 [1/2] - in /hadoop/common/branches/branch-2/hadoop-common-project: ./ hadoop-auth/ hadoop-common/ hadoop-common/dev-support/ hadoop-common/src/main/bin/ hadoop-common/src/main/docs/ hadoop-common/src/main/java/ hadoop-common/src/ma... Date: Thu, 07 Jun 2012 21:25:37 -0000 To: common-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120607212539.1C75023889E0@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: todd Date: Thu Jun 7 21:25:34 2012 New Revision: 1347804 URL: http://svn.apache.org/viewvc?rev=1347804&view=rev Log: Merge HDFS-3042 (automatic failover) to branch-2 from trunk Added: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAZKUtil.java - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAZKUtil.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHAZKUtil.java - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHAZKUtil.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ZKFCTestUtil.java - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ZKFCTestUtil.java Modified: hadoop/common/branches/branch-2/hadoop-common-project/ (props changed) hadoop/common/branches/branch-2/hadoop-common-project/hadoop-auth/ (props changed) hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/ (props changed) hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (props changed) hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/docs/ (props changed) hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/ (props changed) hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-env.sh hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/core/ (props changed) hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestFailoverController.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java Propchange: hadoop/common/branches/branch-2/hadoop-common-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-common-project:r1342112 Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-auth/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-common-project/hadoop-auth:r1342112 Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common:r1342112 Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.txt:r1306184-1342109 Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1342112 Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml?rev=1347804&r1=1347803&r2=1347804&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml Thu Jun 7 21:25:34 2012 @@ -290,5 +290,9 @@ + + + + Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh?rev=1347804&r1=1347803&r2=1347804&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh Thu Jun 7 21:25:34 2012 @@ -141,7 +141,7 @@ case $startStop in echo starting $command, logging to $log cd "$HADOOP_PREFIX" case $command in - namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer) + namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer|zkfc) if [ -z "$HADOOP_HDFS_HOME" ]; then hdfsScript="$HADOOP_PREFIX"/bin/hdfs else Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/docs/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/docs:r1342112 Merged /hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/docs:r1306184-1342109 Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/ ------------------------------------------------------------------------------ Merged /hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java:r1306184-1342109 Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1342112 Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1347804&r1=1347803&r2=1347804&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java Thu Jun 7 21:25:34 2012 @@ -117,6 +117,8 @@ public class CommonConfigurationKeys ext "security.refresh.user.mappings.protocol.acl"; public static final String SECURITY_HA_SERVICE_PROTOCOL_ACL = "security.ha.service.protocol.acl"; + public static final String + SECURITY_ZKFC_PROTOCOL_ACL = "security.zkfc.protocol.acl"; public static final String HADOOP_SECURITY_TOKEN_SERVICE_USE_IP = "hadoop.security.token.service.use_ip"; Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1347804&r1=1347803&r2=1347804&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Thu Jun 7 21:25:34 2012 @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo; import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.KeeperException; @@ -81,9 +82,15 @@ public class ActiveStandbyElector implem */ public interface ActiveStandbyElectorCallback { /** - * This method is called when the app becomes the active leader + * This method is called when the app becomes the active leader. + * If the service fails to become active, it should throw + * ServiceFailedException. This will cause the elector to + * sleep for a short period, then re-join the election. + * + * Callback implementations are expected to manage their own + * timeouts (e.g. when making an RPC to a remote node). */ - void becomeActive(); + void becomeActive() throws ServiceFailedException; /** * This method is called when the app becomes a standby @@ -134,7 +141,8 @@ public class ActiveStandbyElector implem public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class); - private static final int NUM_RETRIES = 3; + static int NUM_RETRIES = 3; + private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000; private static enum ConnectionState { DISCONNECTED, CONNECTED, TERMINATED @@ -154,6 +162,7 @@ public class ActiveStandbyElector implem private final String zkHostPort; private final int zkSessionTimeout; private final List zkAcl; + private final List zkAuthInfo; private byte[] appData; private final String zkLockFilePath; private final String zkBreadCrumbPath; @@ -185,6 +194,8 @@ public class ActiveStandbyElector implem * znode under which to create the lock * @param acl * ZooKeeper ACL's + * @param authInfo a list of authentication credentials to add to the + * ZK connection * @param app * reference to callback interface object * @throws IOException @@ -192,6 +203,7 @@ public class ActiveStandbyElector implem */ public ActiveStandbyElector(String zookeeperHostPorts, int zookeeperSessionTimeout, String parentZnodeName, List acl, + List authInfo, ActiveStandbyElectorCallback app) throws IOException, HadoopIllegalArgumentException { if (app == null || acl == null || parentZnodeName == null @@ -201,6 +213,7 @@ public class ActiveStandbyElector implem zkHostPort = zookeeperHostPorts; zkSessionTimeout = zookeeperSessionTimeout; zkAcl = acl; + zkAuthInfo = authInfo; appClient = app; znodeWorkingDir = parentZnodeName; zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME; @@ -227,8 +240,6 @@ public class ActiveStandbyElector implem public synchronized void joinElection(byte[] data) throws HadoopIllegalArgumentException { - LOG.debug("Attempting active election"); - if (data == null) { throw new HadoopIllegalArgumentException("data cannot be null"); } @@ -236,6 +247,7 @@ public class ActiveStandbyElector implem appData = new byte[data.length]; System.arraycopy(data, 0, appData, 0, data.length); + LOG.debug("Attempting active election for " + this); joinElectionInternal(); } @@ -259,6 +271,9 @@ public class ActiveStandbyElector implem */ public synchronized void ensureParentZNode() throws IOException, InterruptedException { + Preconditions.checkState(!wantToBeInElection, + "ensureParentZNode() may not be called while in the election"); + String pathParts[] = znodeWorkingDir.split("/"); Preconditions.checkArgument(pathParts.length >= 1 && "".equals(pathParts[0]), @@ -292,6 +307,9 @@ public class ActiveStandbyElector implem */ public synchronized void clearParentZNode() throws IOException, InterruptedException { + Preconditions.checkState(!wantToBeInElection, + "clearParentZNode() may not be called while in the election"); + try { LOG.info("Recursively deleting " + znodeWorkingDir + " from ZK..."); @@ -360,7 +378,7 @@ public class ActiveStandbyElector implem createConnection(); } Stat stat = new Stat(); - return zkClient.getData(zkLockFilePath, false, stat); + return getDataWithRetries(zkLockFilePath, false, stat); } catch(KeeperException e) { Code code = e.code(); if (isNodeDoesNotExist(code)) { @@ -380,13 +398,17 @@ public class ActiveStandbyElector implem String name) { if (isStaleClient(ctx)) return; LOG.debug("CreateNode result: " + rc + " for path: " + path - + " connectionState: " + zkConnectionState); + + " connectionState: " + zkConnectionState + + " for " + this); Code code = Code.get(rc); if (isSuccess(code)) { // we successfully created the znode. we are the leader. start monitoring - becomeActive(); - monitorActiveStatus(); + if (becomeActive()) { + monitorActiveStatus(); + } else { + reJoinElectionAfterFailureToBecomeActive(); + } return; } @@ -433,8 +455,13 @@ public class ActiveStandbyElector implem public synchronized void processResult(int rc, String path, Object ctx, Stat stat) { if (isStaleClient(ctx)) return; + + assert wantToBeInElection : + "Got a StatNode result after quitting election"; + LOG.debug("StatNode result: " + rc + " for path: " + path - + " connectionState: " + zkConnectionState); + + " connectionState: " + zkConnectionState + " for " + this); + Code code = Code.get(rc); if (isSuccess(code)) { @@ -442,7 +469,9 @@ public class ActiveStandbyElector implem // creation was retried if (stat.getEphemeralOwner() == zkClient.getSessionId()) { // we own the lock znode. so we are the leader - becomeActive(); + if (!becomeActive()) { + reJoinElectionAfterFailureToBecomeActive(); + } } else { // we dont own the lock znode. so we are a standby. becomeStandby(); @@ -470,20 +499,37 @@ public class ActiveStandbyElector implem } errorMessage = errorMessage + ". Not retrying further znode monitoring connection errors."; + } else if (isSessionExpired(code)) { + // This isn't fatal - the client Watcher will re-join the election + LOG.warn("Lock monitoring failed because session was lost"); + return; } fatalError(errorMessage); } /** - * interface implementation of Zookeeper watch events (connection and node) + * We failed to become active. Re-join the election, but + * sleep for a few seconds after terminating our existing + * session, so that other nodes have a chance to become active. + * The failure to become active is already logged inside + * becomeActive(). + */ + private void reJoinElectionAfterFailureToBecomeActive() { + reJoinElection(SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE); + } + + /** + * interface implementation of Zookeeper watch events (connection and node), + * proxied by {@link WatcherWithClientRef}. */ synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) { Event.EventType eventType = event.getType(); if (isStaleClient(zk)) return; LOG.debug("Watcher event type: " + eventType + " with state:" + event.getState() + " for path:" + event.getPath() - + " connectionState: " + zkConnectionState); + + " connectionState: " + zkConnectionState + + " for " + this); if (eventType == Event.EventType.None) { // the connection state has changed @@ -494,7 +540,8 @@ public class ActiveStandbyElector implem // be undone ConnectionState prevConnectionState = zkConnectionState; zkConnectionState = ConnectionState.CONNECTED; - if (prevConnectionState == ConnectionState.DISCONNECTED) { + if (prevConnectionState == ConnectionState.DISCONNECTED && + wantToBeInElection) { monitorActiveStatus(); } break; @@ -511,7 +558,7 @@ public class ActiveStandbyElector implem // call listener to reconnect LOG.info("Session expired. Entering neutral mode and rejoining..."); enterNeutralMode(); - reJoinElection(); + reJoinElection(0); break; default: fatalError("Unexpected Zookeeper watch event state: " @@ -559,16 +606,21 @@ public class ActiveStandbyElector implem protected synchronized ZooKeeper getNewZooKeeper() throws IOException { ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null); zk.register(new WatcherWithClientRef(zk)); + for (ZKAuthInfo auth : zkAuthInfo) { + zk.addAuthInfo(auth.getScheme(), auth.getAuth()); + } return zk; } private void fatalError(String errorMessage) { + LOG.fatal(errorMessage); reset(); appClient.notifyFatalError(errorMessage); } private void monitorActiveStatus() { - LOG.debug("Monitoring active leader"); + assert wantToBeInElection; + LOG.debug("Monitoring active leader for " + this); statRetryCount = 0; monitorLockNodeAsync(); } @@ -586,7 +638,7 @@ public class ActiveStandbyElector implem createLockNodeAsync(); } - private void reJoinElection() { + private void reJoinElection(int sleepTime) { LOG.info("Trying to re-establish ZK session"); // Some of the test cases rely on expiring the ZK sessions and @@ -599,12 +651,30 @@ public class ActiveStandbyElector implem sessionReestablishLockForTests.lock(); try { terminateConnection(); + sleepFor(sleepTime); + joinElectionInternal(); } finally { sessionReestablishLockForTests.unlock(); } } - + + /** + * Sleep for the given number of milliseconds. + * This is non-static, and separated out, so that unit tests + * can override the behavior not to sleep. + */ + @VisibleForTesting + protected void sleepFor(int sleepMs) { + if (sleepMs > 0) { + try { + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + @VisibleForTesting void preventSessionReestablishmentForTests() { sessionReestablishLockForTests.lock(); @@ -616,8 +686,12 @@ public class ActiveStandbyElector implem } @VisibleForTesting - long getZKSessionIdForTests() { - return zkClient.getSessionId(); + synchronized long getZKSessionIdForTests() { + if (zkClient != null) { + return zkClient.getSessionId(); + } else { + return -1; + } } @VisibleForTesting @@ -629,17 +703,13 @@ public class ActiveStandbyElector implem int connectionRetryCount = 0; boolean success = false; while(!success && connectionRetryCount < NUM_RETRIES) { - LOG.debug("Establishing zookeeper connection"); + LOG.debug("Establishing zookeeper connection for " + this); try { createConnection(); success = true; } catch(IOException e) { LOG.warn(e); - try { - Thread.sleep(5000); - } catch(InterruptedException e1) { - LOG.warn(e1); - } + sleepFor(5000); } ++connectionRetryCount; } @@ -647,14 +717,24 @@ public class ActiveStandbyElector implem } private void createConnection() throws IOException { + if (zkClient != null) { + try { + zkClient.close(); + } catch (InterruptedException e) { + throw new IOException("Interrupted while closing ZK", + e); + } + zkClient = null; + } zkClient = getNewZooKeeper(); + LOG.debug("Created new connection for " + this); } - private void terminateConnection() { + void terminateConnection() { if (zkClient == null) { return; } - LOG.debug("Terminating ZK connection"); + LOG.debug("Terminating ZK connection for " + this); ZooKeeper tempZk = zkClient; zkClient = null; try { @@ -670,20 +750,24 @@ public class ActiveStandbyElector implem terminateConnection(); } - private void becomeActive() { + private boolean becomeActive() { assert wantToBeInElection; - if (state != State.ACTIVE) { - try { - Stat oldBreadcrumbStat = fenceOldActive(); - writeBreadCrumbNode(oldBreadcrumbStat); - } catch (Exception e) { - LOG.warn("Exception handling the winning of election", e); - reJoinElection(); - return; - } - LOG.debug("Becoming active"); - state = State.ACTIVE; + if (state == State.ACTIVE) { + // already active + return true; + } + try { + Stat oldBreadcrumbStat = fenceOldActive(); + writeBreadCrumbNode(oldBreadcrumbStat); + + LOG.debug("Becoming active for " + this); appClient.becomeActive(); + state = State.ACTIVE; + return true; + } catch (Exception e) { + LOG.warn("Exception handling the winning of election", e); + // Caller will handle quitting and rejoining the election. + return false; } } @@ -779,7 +863,7 @@ public class ActiveStandbyElector implem private void becomeStandby() { if (state != State.STANDBY) { - LOG.debug("Becoming standby"); + LOG.debug("Becoming standby for " + this); state = State.STANDBY; appClient.becomeStandby(); } @@ -787,7 +871,7 @@ public class ActiveStandbyElector implem private void enterNeutralMode() { if (state != State.NEUTRAL) { - LOG.debug("Entering neutral mode"); + LOG.debug("Entering neutral mode for " + this); state = State.NEUTRAL; appClient.enterNeutralMode(); } @@ -814,6 +898,15 @@ public class ActiveStandbyElector implem }); } + private byte[] getDataWithRetries(final String path, final boolean watch, + final Stat stat) throws InterruptedException, KeeperException { + return zkDoWithRetries(new ZKAction() { + public byte[] run() throws KeeperException, InterruptedException { + return zkClient.getData(path, watch, stat); + } + }); + } + private Stat setDataWithRetries(final String path, final byte[] data, final int version) throws InterruptedException, KeeperException { return zkDoWithRetries(new ZKAction() { @@ -884,8 +977,14 @@ public class ActiveStandbyElector implem @Override public void process(WatchedEvent event) { - ActiveStandbyElector.this.processWatchEvent( - zk, event); + try { + ActiveStandbyElector.this.processWatchEvent( + zk, event); + } catch (Throwable t) { + fatalError( + "Failed to process watcher event " + event + ": " + + StringUtils.stringifyException(t)); + } } } @@ -913,5 +1012,13 @@ public class ActiveStandbyElector implem } return false; } + + @Override + public String toString() { + return "elector id=" + System.identityHashCode(this) + + " appData=" + + ((appData == null) ? "null" : StringUtils.byteToHexString(appData)) + + " cb=" + appClient; + } } Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java?rev=1347804&r1=1347803&r2=1347804&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java Thu Jun 7 21:25:34 2012 @@ -27,6 +27,8 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; +import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; import org.apache.hadoop.ipc.RPC; import com.google.common.base.Preconditions; @@ -48,9 +50,12 @@ public class FailoverController { private final Configuration conf; + private final RequestSource requestSource; - public FailoverController(Configuration conf) { + public FailoverController(Configuration conf, + RequestSource source) { this.conf = conf; + this.requestSource = source; this.gracefulFenceTimeout = getGracefulFenceTimeout(conf); this.rpcTimeoutToNewActive = getRpcTimeoutToNewActive(conf); @@ -100,7 +105,7 @@ public class FailoverController { toSvcStatus = toSvc.getServiceStatus(); } catch (IOException e) { String msg = "Unable to get service state for " + target; - LOG.error(msg, e); + LOG.error(msg + ": " + e.getLocalizedMessage()); throw new FailoverFailedException(msg, e); } @@ -122,7 +127,7 @@ public class FailoverController { } try { - HAServiceProtocolHelper.monitorHealth(toSvc); + HAServiceProtocolHelper.monitorHealth(toSvc, createReqInfo()); } catch (HealthCheckFailedException hce) { throw new FailoverFailedException( "Can't failover to an unhealthy service", hce); @@ -132,7 +137,10 @@ public class FailoverController { } } - + private StateChangeRequestInfo createReqInfo() { + return new StateChangeRequestInfo(requestSource); + } + /** * Try to get the HA state of the node at the given address. This * function is guaranteed to be "quick" -- ie it has a short timeout @@ -143,7 +151,7 @@ public class FailoverController { HAServiceProtocol proxy = null; try { proxy = svc.getProxy(conf, gracefulFenceTimeout); - proxy.transitionToStandby(); + proxy.transitionToStandby(createReqInfo()); return true; } catch (ServiceFailedException sfe) { LOG.warn("Unable to gracefully make " + svc + " standby (" + @@ -198,7 +206,8 @@ public class FailoverController { Throwable cause = null; try { HAServiceProtocolHelper.transitionToActive( - toSvc.getProxy(conf, rpcTimeoutToNewActive)); + toSvc.getProxy(conf, rpcTimeoutToNewActive), + createReqInfo()); } catch (ServiceFailedException sfe) { LOG.error("Unable to make " + toSvc + " active (" + sfe.getMessage() + "). Failing back."); Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java?rev=1347804&r1=1347803&r2=1347804&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java Thu Jun 7 21:25:34 2012 @@ -19,11 +19,11 @@ package org.apache.hadoop.ha; import java.io.IOException; import java.io.PrintStream; +import java.util.Arrays; import java.util.Map; import org.apache.commons.cli.Options; import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; @@ -33,9 +33,12 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; +import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; /** @@ -49,6 +52,13 @@ public abstract class HAAdmin extends Co private static final String FORCEFENCE = "forcefence"; private static final String FORCEACTIVE = "forceactive"; + + /** + * Undocumented flag which allows an administrator to use manual failover + * state transitions even when auto-failover is enabled. This is an unsafe + * operation, which is why it is not documented in the usage below. + */ + private static final String FORCEMANUAL = "forcemanual"; private static final Log LOG = LogFactory.getLog(HAAdmin.class); private int rpcTimeoutForChecks = -1; @@ -79,6 +89,7 @@ public abstract class HAAdmin extends Co /** Output stream for errors, for use in tests */ protected PrintStream errOut = System.err; PrintStream out = System.out; + private RequestSource requestSource = RequestSource.REQUEST_BY_USER; protected abstract HAServiceTarget resolveTarget(String string); @@ -106,63 +117,83 @@ public abstract class HAAdmin extends Co errOut.println("Usage: HAAdmin [" + cmd + " " + usage.args + "]"); } - private int transitionToActive(final String[] argv) + private int transitionToActive(final CommandLine cmd) throws IOException, ServiceFailedException { - if (argv.length != 2) { + String[] argv = cmd.getArgs(); + if (argv.length != 1) { errOut.println("transitionToActive: incorrect number of arguments"); printUsage(errOut, "-transitionToActive"); return -1; } - - HAServiceProtocol proto = resolveTarget(argv[1]).getProxy( + HAServiceTarget target = resolveTarget(argv[0]); + if (!checkManualStateManagementOK(target)) { + return -1; + } + HAServiceProtocol proto = target.getProxy( getConf(), 0); - HAServiceProtocolHelper.transitionToActive(proto); + HAServiceProtocolHelper.transitionToActive(proto, createReqInfo()); return 0; } - private int transitionToStandby(final String[] argv) + private int transitionToStandby(final CommandLine cmd) throws IOException, ServiceFailedException { - if (argv.length != 2) { + String[] argv = cmd.getArgs(); + if (argv.length != 1) { errOut.println("transitionToStandby: incorrect number of arguments"); printUsage(errOut, "-transitionToStandby"); return -1; } - HAServiceProtocol proto = resolveTarget(argv[1]).getProxy( + HAServiceTarget target = resolveTarget(argv[0]); + if (!checkManualStateManagementOK(target)) { + return -1; + } + HAServiceProtocol proto = target.getProxy( getConf(), 0); - HAServiceProtocolHelper.transitionToStandby(proto); + HAServiceProtocolHelper.transitionToStandby(proto, createReqInfo()); return 0; } + /** + * Ensure that we are allowed to manually manage the HA state of the target + * service. If automatic failover is configured, then the automatic + * failover controllers should be doing state management, and it is generally + * an error to use the HAAdmin command line to do so. + * + * @param target the target to check + * @return true if manual state management is allowed + */ + private boolean checkManualStateManagementOK(HAServiceTarget target) { + if (target.isAutoFailoverEnabled()) { + if (requestSource != RequestSource.REQUEST_BY_USER_FORCED) { + errOut.println( + "Automatic failover is enabled for " + target + "\n" + + "Refusing to manually manage HA state, since it may cause\n" + + "a split-brain scenario or other incorrect state.\n" + + "If you are very sure you know what you are doing, please \n" + + "specify the " + FORCEMANUAL + " flag."); + return false; + } else { + LOG.warn("Proceeding with manual HA state management even though\n" + + "automatic failover is enabled for " + target); + return true; + } + } + return true; + } - private int failover(final String[] argv) - throws IOException, ServiceFailedException { - boolean forceFence = false; - boolean forceActive = false; - - Options failoverOpts = new Options(); - // "-failover" isn't really an option but we need to add - // it to appease CommandLineParser - failoverOpts.addOption("failover", false, "failover"); - failoverOpts.addOption(FORCEFENCE, false, "force fencing"); - failoverOpts.addOption(FORCEACTIVE, false, "force failover"); + private StateChangeRequestInfo createReqInfo() { + return new StateChangeRequestInfo(requestSource); + } - CommandLineParser parser = new GnuParser(); - CommandLine cmd; + private int failover(CommandLine cmd) + throws IOException, ServiceFailedException { + boolean forceFence = cmd.hasOption(FORCEFENCE); + boolean forceActive = cmd.hasOption(FORCEACTIVE); - try { - cmd = parser.parse(failoverOpts, argv); - forceFence = cmd.hasOption(FORCEFENCE); - forceActive = cmd.hasOption(FORCEACTIVE); - } catch (ParseException pe) { - errOut.println("failover: incorrect arguments"); - printUsage(errOut, "-failover"); - return -1; - } - int numOpts = cmd.getOptions() == null ? 0 : cmd.getOptions().length; final String[] args = cmd.getArgs(); - if (numOpts > 2 || args.length != 2) { + if (numOpts > 3 || args.length != 2) { errOut.println("failover: incorrect arguments"); printUsage(errOut, "-failover"); return -1; @@ -171,7 +202,30 @@ public abstract class HAAdmin extends Co HAServiceTarget fromNode = resolveTarget(args[0]); HAServiceTarget toNode = resolveTarget(args[1]); - FailoverController fc = new FailoverController(getConf()); + // Check that auto-failover is consistently configured for both nodes. + Preconditions.checkState( + fromNode.isAutoFailoverEnabled() == + toNode.isAutoFailoverEnabled(), + "Inconsistent auto-failover configs between %s and %s!", + fromNode, toNode); + + if (fromNode.isAutoFailoverEnabled()) { + if (forceFence || forceActive) { + // -forceActive doesn't make sense with auto-HA, since, if the node + // is not healthy, then its ZKFC will immediately quit the election + // again the next time a health check runs. + // + // -forceFence doesn't seem to have any real use cases with auto-HA + // so it isn't implemented. + errOut.println(FORCEFENCE + " and " + FORCEACTIVE + " flags not " + + "supported with auto-failover enabled."); + return -1; + } + return gracefulFailoverThroughZKFCs(toNode); + } + + FailoverController fc = new FailoverController(getConf(), + requestSource); try { fc.failover(fromNode, toNode, forceFence, forceActive); @@ -182,19 +236,44 @@ public abstract class HAAdmin extends Co } return 0; } + + + /** + * Initiate a graceful failover by talking to the target node's ZKFC. + * This sends an RPC to the ZKFC, which coordinates the failover. + * + * @param toNode the node to fail to + * @return status code (0 for success) + * @throws IOException if failover does not succeed + */ + private int gracefulFailoverThroughZKFCs(HAServiceTarget toNode) + throws IOException { + + int timeout = FailoverController.getRpcTimeoutToNewActive(getConf()); + ZKFCProtocol proxy = toNode.getZKFCProxy(getConf(), timeout); + try { + proxy.gracefulFailover(); + out.println("Failover to " + toNode + " successful"); + } catch (ServiceFailedException sfe) { + errOut.println("Failover failed: " + sfe.getLocalizedMessage()); + return -1; + } + + return 0; + } - private int checkHealth(final String[] argv) + private int checkHealth(final CommandLine cmd) throws IOException, ServiceFailedException { - if (argv.length != 2) { + String[] argv = cmd.getArgs(); + if (argv.length != 1) { errOut.println("checkHealth: incorrect number of arguments"); printUsage(errOut, "-checkHealth"); return -1; } - - HAServiceProtocol proto = resolveTarget(argv[1]).getProxy( + HAServiceProtocol proto = resolveTarget(argv[0]).getProxy( getConf(), rpcTimeoutForChecks); try { - HAServiceProtocolHelper.monitorHealth(proto); + HAServiceProtocolHelper.monitorHealth(proto, createReqInfo()); } catch (HealthCheckFailedException e) { errOut.println("Health check failed: " + e.getLocalizedMessage()); return -1; @@ -202,15 +281,16 @@ public abstract class HAAdmin extends Co return 0; } - private int getServiceState(final String[] argv) + private int getServiceState(final CommandLine cmd) throws IOException, ServiceFailedException { - if (argv.length != 2) { + String[] argv = cmd.getArgs(); + if (argv.length != 1) { errOut.println("getServiceState: incorrect number of arguments"); printUsage(errOut, "-getServiceState"); return -1; } - HAServiceProtocol proto = resolveTarget(argv[1]).getProxy( + HAServiceProtocol proto = resolveTarget(argv[0]).getProxy( getConf(), rpcTimeoutForChecks); out.println(proto.getServiceStatus().getState()); return 0; @@ -263,26 +343,101 @@ public abstract class HAAdmin extends Co printUsage(errOut); return -1; } + + if (!USAGE.containsKey(cmd)) { + errOut.println(cmd.substring(1) + ": Unknown command"); + printUsage(errOut); + return -1; + } + + Options opts = new Options(); + + // Add command-specific options + if ("-failover".equals(cmd)) { + addFailoverCliOpts(opts); + } + // Mutative commands take FORCEMANUAL option + if ("-transitionToActive".equals(cmd) || + "-transitionToStandby".equals(cmd) || + "-failover".equals(cmd)) { + opts.addOption(FORCEMANUAL, false, + "force manual control even if auto-failover is enabled"); + } + + CommandLine cmdLine = parseOpts(cmd, opts, argv); + if (cmdLine == null) { + // error already printed + return -1; + } + + if (cmdLine.hasOption(FORCEMANUAL)) { + if (!confirmForceManual()) { + LOG.fatal("Aborted"); + return -1; + } + // Instruct the NNs to honor this request even if they're + // configured for manual failover. + requestSource = RequestSource.REQUEST_BY_USER_FORCED; + } if ("-transitionToActive".equals(cmd)) { - return transitionToActive(argv); + return transitionToActive(cmdLine); } else if ("-transitionToStandby".equals(cmd)) { - return transitionToStandby(argv); + return transitionToStandby(cmdLine); } else if ("-failover".equals(cmd)) { - return failover(argv); + return failover(cmdLine); } else if ("-getServiceState".equals(cmd)) { - return getServiceState(argv); + return getServiceState(cmdLine); } else if ("-checkHealth".equals(cmd)) { - return checkHealth(argv); + return checkHealth(cmdLine); } else if ("-help".equals(cmd)) { return help(argv); } else { - errOut.println(cmd.substring(1) + ": Unknown command"); - printUsage(errOut); - return -1; + // we already checked command validity above, so getting here + // would be a coding error + throw new AssertionError("Should not get here, command: " + cmd); } } + private boolean confirmForceManual() throws IOException { + return ToolRunner.confirmPrompt( + "You have specified the " + FORCEMANUAL + " flag. This flag is " + + "dangerous, as it can induce a split-brain scenario that WILL " + + "CORRUPT your HDFS namespace, possibly irrecoverably.\n" + + "\n" + + "It is recommended not to use this flag, but instead to shut down the " + + "cluster and disable automatic failover if you prefer to manually " + + "manage your HA state.\n" + + "\n" + + "You may abort safely by answering 'n' or hitting ^C now.\n" + + "\n" + + "Are you sure you want to continue?"); + } + + /** + * Add CLI options which are specific to the failover command and no + * others. + */ + private void addFailoverCliOpts(Options failoverOpts) { + failoverOpts.addOption(FORCEFENCE, false, "force fencing"); + failoverOpts.addOption(FORCEACTIVE, false, "force failover"); + // Don't add FORCEMANUAL, since that's added separately for all commands + // that change state. + } + + private CommandLine parseOpts(String cmdName, Options opts, String[] argv) { + try { + // Strip off the first arg, since that's just the command name + argv = Arrays.copyOfRange(argv, 1, argv.length); + return new GnuParser().parse(opts, argv); + } catch (ParseException pe) { + errOut.println(cmdName.substring(1) + + ": incorrect arguments"); + printUsage(errOut, cmdName); + return null; + } + } + private int help(String[] argv) { if (argv.length != 2) { printUsage(errOut, "-help"); Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java?rev=1347804&r1=1347803&r2=1347804&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java Thu Jun 7 21:25:34 2012 @@ -60,6 +60,31 @@ public interface HAServiceProtocol { return name; } } + + public static enum RequestSource { + REQUEST_BY_USER, + REQUEST_BY_USER_FORCED, + REQUEST_BY_ZKFC; + } + + /** + * Information describing the source for a request to change state. + * This is used to differentiate requests from automatic vs CLI + * failover controllers, and in the future may include epoch + * information. + */ + public static class StateChangeRequestInfo { + private final RequestSource source; + + public StateChangeRequestInfo(RequestSource source) { + super(); + this.source = source; + } + + public RequestSource getSource() { + return source; + } + } /** * Monitor the health of service. This periodically called by the HA @@ -95,7 +120,8 @@ public interface HAServiceProtocol { * @throws IOException * if other errors happen */ - public void transitionToActive() throws ServiceFailedException, + public void transitionToActive(StateChangeRequestInfo reqInfo) + throws ServiceFailedException, AccessControlException, IOException; @@ -110,7 +136,8 @@ public interface HAServiceProtocol { * @throws IOException * if other errors happen */ - public void transitionToStandby() throws ServiceFailedException, + public void transitionToStandby(StateChangeRequestInfo reqInfo) + throws ServiceFailedException, AccessControlException, IOException; Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java?rev=1347804&r1=1347803&r2=1347804&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java Thu Jun 7 21:25:34 2012 @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.ipc.RemoteException; /** @@ -30,7 +31,8 @@ import org.apache.hadoop.ipc.RemoteExcep @InterfaceAudience.Public @InterfaceStability.Evolving public class HAServiceProtocolHelper { - public static void monitorHealth(HAServiceProtocol svc) + public static void monitorHealth(HAServiceProtocol svc, + StateChangeRequestInfo reqInfo) throws IOException { try { svc.monitorHealth(); @@ -39,19 +41,21 @@ public class HAServiceProtocolHelper { } } - public static void transitionToActive(HAServiceProtocol svc) + public static void transitionToActive(HAServiceProtocol svc, + StateChangeRequestInfo reqInfo) throws IOException { try { - svc.transitionToActive(); + svc.transitionToActive(reqInfo); } catch (RemoteException e) { throw e.unwrapRemoteException(ServiceFailedException.class); } } - public static void transitionToStandby(HAServiceProtocol svc) + public static void transitionToStandby(HAServiceProtocol svc, + StateChangeRequestInfo reqInfo) throws IOException { try { - svc.transitionToStandby(); + svc.transitionToStandby(reqInfo); } catch (RemoteException e) { throw e.unwrapRemoteException(ServiceFailedException.class); } Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java?rev=1347804&r1=1347803&r2=1347804&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java Thu Jun 7 21:25:34 2012 @@ -28,6 +28,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB; +import org.apache.hadoop.ha.protocolPB.ZKFCProtocolClientSideTranslatorPB; import org.apache.hadoop.net.NetUtils; import com.google.common.collect.Maps; @@ -49,6 +50,11 @@ public abstract class HAServiceTarget { public abstract InetSocketAddress getAddress(); /** + * @return the IPC address of the ZKFC on the target node + */ + public abstract InetSocketAddress getZKFCAddress(); + + /** * @return a Fencer implementation configured for this target node */ public abstract NodeFencer getFencer(); @@ -76,6 +82,20 @@ public abstract class HAServiceTarget { confCopy, factory, timeoutMs); } + /** + * @return a proxy to the ZKFC which is associated with this HA service. + */ + public ZKFCProtocol getZKFCProxy(Configuration conf, int timeoutMs) + throws IOException { + Configuration confCopy = new Configuration(conf); + // Lower the timeout so we quickly fail to connect + confCopy.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); + SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy); + return new ZKFCProtocolClientSideTranslatorPB( + getZKFCAddress(), + confCopy, factory, timeoutMs); + } + public final Map getFencingParameters() { Map ret = Maps.newHashMap(); addFencingParameters(ret); @@ -99,4 +119,11 @@ public abstract class HAServiceTarget { ret.put(HOST_SUBST_KEY, getAddress().getHostName()); ret.put(PORT_SUBST_KEY, String.valueOf(getAddress().getPort())); } + + /** + * @return true if auto failover should be considered enabled + */ + public boolean isAutoFailoverEnabled() { + return false; + } } Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java?rev=1347804&r1=1347803&r2=1347804&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java Thu Jun 7 21:25:34 2012 @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,7 +44,8 @@ import com.google.common.base.Preconditi * Classes which need callbacks should implement the {@link Callback} * interface. */ -class HealthMonitor { +@InterfaceAudience.Private +public class HealthMonitor { private static final Log LOG = LogFactory.getLog( HealthMonitor.class); @@ -75,7 +77,8 @@ class HealthMonitor { private HAServiceStatus lastServiceState = new HAServiceStatus( HAServiceState.INITIALIZING); - enum State { + @InterfaceAudience.Private + public enum State { /** * The health monitor is still starting up. */ Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java?rev=1347804&r1=1347803&r2=1347804&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java Thu Jun 7 21:25:34 2012 @@ -18,79 +18,143 @@ package org.apache.hadoop.ha; import java.io.IOException; +import java.net.InetSocketAddress; import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; +import java.util.Collections; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; +import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; +import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo; import org.apache.hadoop.ha.HealthMonitor.State; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.hadoop.util.ToolRunner; import org.apache.zookeeper.data.ACL; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; @InterfaceAudience.LimitedPrivate("HDFS") -public abstract class ZKFailoverController implements Tool { +public abstract class ZKFailoverController { static final Log LOG = LogFactory.getLog(ZKFailoverController.class); - // TODO: this should be namespace-scoped public static final String ZK_QUORUM_KEY = "ha.zookeeper.quorum"; private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms"; private static final int ZK_SESSION_TIMEOUT_DEFAULT = 5*1000; private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode"; + public static final String ZK_ACL_KEY = "ha.zookeeper.acl"; + private static final String ZK_ACL_DEFAULT = "world:anyone:rwcda"; + public static final String ZK_AUTH_KEY = "ha.zookeeper.auth"; static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha"; + /** + * All of the conf keys used by the ZKFC. This is used in order to allow + * them to be overridden on a per-nameservice or per-namenode basis. + */ + protected static final String[] ZKFC_CONF_KEYS = new String[] { + ZK_QUORUM_KEY, + ZK_SESSION_TIMEOUT_KEY, + ZK_PARENT_ZNODE_KEY, + ZK_ACL_KEY, + ZK_AUTH_KEY + }; + + /** Unable to format the parent znode in ZK */ static final int ERR_CODE_FORMAT_DENIED = 2; /** The parent znode doesn't exist in ZK */ static final int ERR_CODE_NO_PARENT_ZNODE = 3; /** Fencing is not properly configured */ static final int ERR_CODE_NO_FENCER = 4; + /** Automatic failover is not enabled */ + static final int ERR_CODE_AUTO_FAILOVER_NOT_ENABLED = 5; + /** Cannot connect to ZooKeeper */ + static final int ERR_CODE_NO_ZK = 6; - private Configuration conf; + protected Configuration conf; + private String zkQuorum; + protected final HAServiceTarget localTarget; private HealthMonitor healthMonitor; private ActiveStandbyElector elector; - - private HAServiceTarget localTarget; - - private String parentZnode; + protected ZKFCRpcServer rpcServer; private State lastHealthState = State.INITIALIZING; /** Set if a fatal error occurs */ private String fatalError = null; - @Override - public void setConf(Configuration conf) { + /** + * A future nanotime before which the ZKFC will not join the election. + * This is used during graceful failover. + */ + private long delayJoiningUntilNanotime = 0; + + /** Executor on which {@link #scheduleRecheck(long)} schedules events */ + private ScheduledExecutorService delayExecutor = + Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("ZKFC Delay timer #%d") + .build()); + + private ActiveAttemptRecord lastActiveAttemptRecord; + private Object activeAttemptRecordLock = new Object(); + + protected ZKFailoverController(Configuration conf, HAServiceTarget localTarget) { + this.localTarget = localTarget; this.conf = conf; - localTarget = getLocalTarget(); } protected abstract byte[] targetToData(HAServiceTarget target); - protected abstract HAServiceTarget getLocalTarget(); protected abstract HAServiceTarget dataToTarget(byte[] data); + protected abstract void loginAsFCUser() throws IOException; + protected abstract void checkRpcAdminAccess() + throws AccessControlException, IOException; + protected abstract InetSocketAddress getRpcAddressToBindTo(); + protected abstract PolicyProvider getPolicyProvider(); + /** + * Return the name of a znode inside the configured parent znode in which + * the ZKFC will do all of its work. This is so that multiple federated + * nameservices can run on the same ZK quorum without having to manually + * configure them to separate subdirectories. + */ + protected abstract String getScopeInsideParentNode(); - @Override - public Configuration getConf() { - return conf; + public HAServiceTarget getLocalTarget() { + return localTarget; } - - @Override + public int run(final String[] args) throws Exception { - // TODO: need to hook DFS here to find the NN keytab info, etc, - // similar to what DFSHAAdmin does. Annoying that this is in common. + if (!localTarget.isAutoFailoverEnabled()) { + LOG.fatal("Automatic failover is not enabled for " + localTarget + "." + + " Please ensure that automatic failover is enabled in the " + + "configuration before running the ZK failover controller."); + return ERR_CODE_AUTO_FAILOVER_NOT_ENABLED; + } + loginAsFCUser(); try { return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction() { @Override @@ -99,6 +163,10 @@ public abstract class ZKFailoverControll return doRun(args); } catch (Exception t) { throw new RuntimeException(t); + } finally { + if (elector != null) { + elector.terminateConnection(); + } } } }); @@ -107,6 +175,7 @@ public abstract class ZKFailoverControll } } + private int doRun(String[] args) throws HadoopIllegalArgumentException, IOException, InterruptedException { initZK(); @@ -129,11 +198,23 @@ public abstract class ZKFailoverControll } } - if (!elector.parentZNodeExists()) { - LOG.fatal("Unable to start failover controller. " + - "Parent znode does not exist.\n" + - "Run with -formatZK flag to initialize ZooKeeper."); - return ERR_CODE_NO_PARENT_ZNODE; + try { + if (!elector.parentZNodeExists()) { + LOG.fatal("Unable to start failover controller. " + + "Parent znode does not exist.\n" + + "Run with -formatZK flag to initialize ZooKeeper."); + return ERR_CODE_NO_PARENT_ZNODE; + } + } catch (IOException ioe) { + if (ioe.getCause() instanceof KeeperException.ConnectionLossException) { + LOG.fatal("Unable to start failover controller. Unable to connect " + + "to ZooKeeper quorum at " + zkQuorum + ". Please check the " + + "configured value for " + ZK_QUORUM_KEY + " and ensure that " + + "ZooKeeper is running."); + return ERR_CODE_NO_ZK; + } else { + throw ioe; + } } try { @@ -145,8 +226,18 @@ public abstract class ZKFailoverControll return ERR_CODE_NO_FENCER; } + initRPC(); initHM(); - mainLoop(); + startRPC(); + try { + mainLoop(); + } finally { + rpcServer.stopAndJoin(); + + elector.quitElection(true); + healthMonitor.shutdown(); + healthMonitor.join(); + } return 0; } @@ -181,6 +272,7 @@ public abstract class ZKFailoverControll } private boolean confirmFormat() { + String parentZnode = getParentZnode(); System.err.println( "===============================================\n" + "The configured parent znode " + parentZnode + " already exists.\n" + @@ -206,16 +298,40 @@ public abstract class ZKFailoverControll healthMonitor.addCallback(new HealthCallbacks()); healthMonitor.start(); } + + protected void initRPC() throws IOException { + InetSocketAddress bindAddr = getRpcAddressToBindTo(); + rpcServer = new ZKFCRpcServer(conf, bindAddr, this, getPolicyProvider()); + } + + protected void startRPC() throws IOException { + rpcServer.start(); + } + private void initZK() throws HadoopIllegalArgumentException, IOException { - String zkQuorum = conf.get(ZK_QUORUM_KEY); + zkQuorum = conf.get(ZK_QUORUM_KEY); int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY, ZK_SESSION_TIMEOUT_DEFAULT); - parentZnode = conf.get(ZK_PARENT_ZNODE_KEY, - ZK_PARENT_ZNODE_DEFAULT); - // TODO: need ZK ACL support in config, also maybe auth! - List zkAcls = Ids.OPEN_ACL_UNSAFE; + // Parse ACLs from configuration. + String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT); + zkAclConf = HAZKUtil.resolveConfIndirection(zkAclConf); + List zkAcls = HAZKUtil.parseACLs(zkAclConf); + if (zkAcls.isEmpty()) { + zkAcls = Ids.CREATOR_ALL_ACL; + } + + // Parse authentication from configuration. + String zkAuthConf = conf.get(ZK_AUTH_KEY); + zkAuthConf = HAZKUtil.resolveConfIndirection(zkAuthConf); + List zkAuths; + if (zkAuthConf != null) { + zkAuths = HAZKUtil.parseAuth(zkAuthConf); + } else { + zkAuths = Collections.emptyList(); + } + // Sanity check configuration. Preconditions.checkArgument(zkQuorum != null, "Missing required configuration '%s' for ZooKeeper quorum", ZK_QUORUM_KEY); @@ -224,9 +340,19 @@ public abstract class ZKFailoverControll elector = new ActiveStandbyElector(zkQuorum, - zkTimeout, parentZnode, zkAcls, new ElectorCallbacks()); + zkTimeout, getParentZnode(), zkAcls, zkAuths, + new ElectorCallbacks()); } + private String getParentZnode() { + String znode = conf.get(ZK_PARENT_ZNODE_KEY, + ZK_PARENT_ZNODE_DEFAULT); + if (!znode.endsWith("/")) { + znode += "/"; + } + return znode + getScopeInsideParentNode(); + } + private synchronized void mainLoop() throws InterruptedException { while (fatalError == null) { wait(); @@ -242,16 +368,30 @@ public abstract class ZKFailoverControll notifyAll(); } - private synchronized void becomeActive() { + private synchronized void becomeActive() throws ServiceFailedException { LOG.info("Trying to make " + localTarget + " active..."); try { HAServiceProtocolHelper.transitionToActive(localTarget.getProxy( - conf, FailoverController.getRpcTimeoutToNewActive(conf))); - LOG.info("Successfully transitioned " + localTarget + - " to active state"); + conf, FailoverController.getRpcTimeoutToNewActive(conf)), + createReqInfo()); + String msg = "Successfully transitioned " + localTarget + + " to active state"; + LOG.info(msg); + recordActiveAttempt(new ActiveAttemptRecord(true, msg)); + } catch (Throwable t) { - LOG.fatal("Couldn't make " + localTarget + " active", t); - elector.quitElection(true); + String msg = "Couldn't make " + localTarget + " active"; + LOG.fatal(msg, t); + + recordActiveAttempt(new ActiveAttemptRecord(false, msg + "\n" + + StringUtils.stringifyException(t))); + + if (t instanceof ServiceFailedException) { + throw (ServiceFailedException)t; + } else { + throw new ServiceFailedException("Couldn't transition to active", + t); + } /* * TODO: * we need to make sure that if we get fenced and then quickly restarted, @@ -264,12 +404,79 @@ public abstract class ZKFailoverControll } } + /** + * Store the results of the last attempt to become active. + * This is used so that, during manually initiated failover, + * we can report back the results of the attempt to become active + * to the initiator of the failover. + */ + private void recordActiveAttempt( + ActiveAttemptRecord record) { + synchronized (activeAttemptRecordLock) { + lastActiveAttemptRecord = record; + activeAttemptRecordLock.notifyAll(); + } + } + + /** + * Wait until one of the following events: + *
    + *
  • Another thread publishes the results of an attempt to become active + * using {@link #recordActiveAttempt(ActiveAttemptRecord)}
  • + *
  • The node enters bad health status
  • + *
  • The specified timeout elapses
  • + *
+ * + * @param timeoutMillis number of millis to wait + * @return the published record, or null if the timeout elapses or the + * service becomes unhealthy + * @throws InterruptedException if the thread is interrupted. + */ + private ActiveAttemptRecord waitForActiveAttempt(int timeoutMillis) + throws InterruptedException { + long st = System.nanoTime(); + long waitUntil = st + TimeUnit.NANOSECONDS.convert( + timeoutMillis, TimeUnit.MILLISECONDS); + + do { + // periodically check health state, because entering an + // unhealthy state could prevent us from ever attempting to + // become active. We can detect this and respond to the user + // immediately. + synchronized (this) { + if (lastHealthState != State.SERVICE_HEALTHY) { + // early out if service became unhealthy + return null; + } + } + + synchronized (activeAttemptRecordLock) { + if ((lastActiveAttemptRecord != null && + lastActiveAttemptRecord.nanoTime >= st)) { + return lastActiveAttemptRecord; + } + // Only wait 1sec so that we periodically recheck the health state + // above. + activeAttemptRecordLock.wait(1000); + } + } while (System.nanoTime() < waitUntil); + + // Timeout elapsed. + LOG.warn(timeoutMillis + "ms timeout elapsed waiting for an attempt " + + "to become active"); + return null; + } + + private StateChangeRequestInfo createReqInfo() { + return new StateChangeRequestInfo(RequestSource.REQUEST_BY_ZKFC); + } + private synchronized void becomeStandby() { LOG.info("ZK Election indicated that " + localTarget + " should become standby"); try { int timeout = FailoverController.getGracefulFenceTimeout(conf); - localTarget.getProxy(conf, timeout).transitionToStandby(); + localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo()); LOG.info("Successfully transitioned " + localTarget + " to standby state"); } catch (Exception e) { @@ -279,27 +486,336 @@ public abstract class ZKFailoverControll // at the same time. } } + + + private synchronized void fenceOldActive(byte[] data) { + HAServiceTarget target = dataToTarget(data); + + try { + doFence(target); + } catch (Throwable t) { + recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t))); + Throwables.propagate(t); + } + } + + private void doFence(HAServiceTarget target) { + LOG.info("Should fence: " + target); + boolean gracefulWorked = new FailoverController(conf, + RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target); + if (gracefulWorked) { + // It's possible that it's in standby but just about to go into active, + // no? Is there some race here? + LOG.info("Successfully transitioned " + target + " to standby " + + "state without fencing"); + return; + } + + try { + target.checkFencingConfigured(); + } catch (BadFencingConfigurationException e) { + LOG.error("Couldn't fence old active " + target, e); + recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active")); + throw new RuntimeException(e); + } + + if (!target.getFencer().fence(target)) { + throw new RuntimeException("Unable to fence " + target); + } + } + + + /** + * Request from graceful failover to cede active role. Causes + * this ZKFC to transition its local node to standby, then quit + * the election for the specified period of time, after which it + * will rejoin iff it is healthy. + */ + void cedeActive(final int millisToCede) + throws AccessControlException, ServiceFailedException, IOException { + try { + UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + doCedeActive(millisToCede); + return null; + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private void doCedeActive(int millisToCede) + throws AccessControlException, ServiceFailedException, IOException { + int timeout = FailoverController.getGracefulFenceTimeout(conf); + + // Lock elector to maintain lock ordering of elector -> ZKFC + synchronized (elector) { + synchronized (this) { + if (millisToCede <= 0) { + delayJoiningUntilNanotime = 0; + recheckElectability(); + return; + } + + LOG.info("Requested by " + UserGroupInformation.getCurrentUser() + + " at " + Server.getRemoteAddress() + " to cede active role."); + boolean needFence = false; + try { + localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo()); + LOG.info("Successfully ensured local node is in standby mode"); + } catch (IOException ioe) { + LOG.warn("Unable to transition local node to standby: " + + ioe.getLocalizedMessage()); + LOG.warn("Quitting election but indicating that fencing is " + + "necessary"); + needFence = true; + } + delayJoiningUntilNanotime = System.nanoTime() + + TimeUnit.MILLISECONDS.toNanos(millisToCede); + elector.quitElection(needFence); + } + } + recheckElectability(); + } + + /** + * Coordinate a graceful failover to this node. + * @throws ServiceFailedException if the node fails to become active + * @throws IOException some other error occurs + */ + void gracefulFailoverToYou() throws ServiceFailedException, IOException { + try { + UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + doGracefulFailover(); + return null; + } + + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + /** + * Coordinate a graceful failover. This proceeds in several phases: + * 1) Pre-flight checks: ensure that the local node is healthy, and + * thus a candidate for failover. + * 2) Determine the current active node. If it is the local node, no + * need to failover - return success. + * 3) Ask that node to yield from the election for a number of seconds. + * 4) Allow the normal election path to run in other threads. Wait until + * we either become unhealthy or we see an election attempt recorded by + * the normal code path. + * 5) Allow the old active to rejoin the election, so a future + * failback is possible. + */ + private void doGracefulFailover() + throws ServiceFailedException, IOException, InterruptedException { + int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2; + + // Phase 1: pre-flight checks + checkEligibleForFailover(); + + // Phase 2: determine old/current active node. Check that we're not + // ourselves active, etc. + HAServiceTarget oldActive = getCurrentActive(); + if (oldActive == null) { + // No node is currently active. So, if we aren't already + // active ourselves by means of a normal election, then there's + // probably something preventing us from becoming active. + throw new ServiceFailedException( + "No other node is currently active."); + } + + if (oldActive.getAddress().equals(localTarget.getAddress())) { + LOG.info("Local node " + localTarget + " is already active. " + + "No need to failover. Returning success."); + return; + } + + // Phase 3: ask the old active to yield from the election. + LOG.info("Asking " + oldActive + " to cede its active state for " + + timeout + "ms"); + ZKFCProtocol oldZkfc = oldActive.getZKFCProxy(conf, timeout); + oldZkfc.cedeActive(timeout); + + // Phase 4: wait for the normal election to make the local node + // active. + ActiveAttemptRecord attempt = waitForActiveAttempt(timeout + 60000); + + if (attempt == null) { + // We didn't even make an attempt to become active. + synchronized(this) { + if (lastHealthState != State.SERVICE_HEALTHY) { + throw new ServiceFailedException("Unable to become active. " + + "Service became unhealthy while trying to failover."); + } + } + + throw new ServiceFailedException("Unable to become active. " + + "Local node did not get an opportunity to do so from ZooKeeper, " + + "or the local node took too long to transition to active."); + } + + // Phase 5. At this point, we made some attempt to become active. So we + // can tell the old active to rejoin if it wants. This allows a quick + // fail-back if we immediately crash. + oldZkfc.cedeActive(-1); + + if (attempt.succeeded) { + LOG.info("Successfully became active. " + attempt.status); + } else { + // Propagate failure + String msg = "Failed to become active. " + attempt.status; + throw new ServiceFailedException(msg); + } + } + + /** + * Ensure that the local node is in a healthy state, and thus + * eligible for graceful failover. + * @throws ServiceFailedException if the node is unhealthy + */ + private synchronized void checkEligibleForFailover() + throws ServiceFailedException { + // Check health + if (this.getLastHealthState() != State.SERVICE_HEALTHY) { + throw new ServiceFailedException( + localTarget + " is not currently healthy. " + + "Cannot be failover target"); + } + } + + /** + * @return an {@link HAServiceTarget} for the current active node + * in the cluster, or null if no node is active. + * @throws IOException if a ZK-related issue occurs + * @throws InterruptedException if thread is interrupted + */ + private HAServiceTarget getCurrentActive() + throws IOException, InterruptedException { + synchronized (elector) { + synchronized (this) { + byte[] activeData; + try { + activeData = elector.getActiveData(); + } catch (ActiveNotFoundException e) { + return null; + } catch (KeeperException ke) { + throw new IOException( + "Unexpected ZooKeeper issue fetching active node info", ke); + } + + HAServiceTarget oldActive = dataToTarget(activeData); + return oldActive; + } + } + } + + /** + * Check the current state of the service, and join the election + * if it should be in the election. + */ + private void recheckElectability() { + // Maintain lock ordering of elector -> ZKFC + synchronized (elector) { + synchronized (this) { + boolean healthy = lastHealthState == State.SERVICE_HEALTHY; + + long remainingDelay = delayJoiningUntilNanotime - System.nanoTime(); + if (remainingDelay > 0) { + if (healthy) { + LOG.info("Would have joined master election, but this node is " + + "prohibited from doing so for " + + TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms"); + } + scheduleRecheck(remainingDelay); + return; + } + + switch (lastHealthState) { + case SERVICE_HEALTHY: + elector.joinElection(targetToData(localTarget)); + break; + + case INITIALIZING: + LOG.info("Ensuring that " + localTarget + " does not " + + "participate in active master election"); + elector.quitElection(false); + break; + + case SERVICE_UNHEALTHY: + case SERVICE_NOT_RESPONDING: + LOG.info("Quitting master election for " + localTarget + + " and marking that fencing is necessary"); + elector.quitElection(true); + break; + + case HEALTH_MONITOR_FAILED: + fatalError("Health monitor failed!"); + break; + + default: + throw new IllegalArgumentException("Unhandled state:" + lastHealthState); + } + } + } + } + + /** + * Schedule a call to {@link #recheckElectability()} in the future. + */ + private void scheduleRecheck(long whenNanos) { + delayExecutor.schedule( + new Runnable() { + @Override + public void run() { + try { + recheckElectability(); + } catch (Throwable t) { + fatalError("Failed to recheck electability: " + + StringUtils.stringifyException(t)); + } + } + }, + whenNanos, TimeUnit.NANOSECONDS); + } /** * @return the last health state passed to the FC * by the HealthMonitor. */ @VisibleForTesting - State getLastHealthState() { + synchronized State getLastHealthState() { return lastHealthState; } + + private synchronized void setLastHealthState(HealthMonitor.State newState) { + LOG.info("Local service " + localTarget + + " entered state: " + newState); + lastHealthState = newState; + } @VisibleForTesting ActiveStandbyElector getElectorForTests() { return elector; } + + @VisibleForTesting + ZKFCRpcServer getRpcServerForTests() { + return rpcServer; + } /** * Callbacks from elector */ class ElectorCallbacks implements ActiveStandbyElectorCallback { @Override - public void becomeActive() { + public void becomeActive() throws ServiceFailedException { ZKFailoverController.this.becomeActive(); } @@ -319,31 +835,13 @@ public abstract class ZKFailoverControll @Override public void fenceOldActive(byte[] data) { - HAServiceTarget target = dataToTarget(data); - - LOG.info("Should fence: " + target); - boolean gracefulWorked = new FailoverController(conf) - .tryGracefulFence(target); - if (gracefulWorked) { - // It's possible that it's in standby but just about to go into active, - // no? Is there some race here? - LOG.info("Successfully transitioned " + target + " to standby " + - "state without fencing"); - return; - } - - try { - target.checkFencingConfigured(); - } catch (BadFencingConfigurationException e) { - LOG.error("Couldn't fence old active " + target, e); - // TODO: see below todo - throw new RuntimeException(e); - } - - if (!target.getFencer().fence(target)) { - // TODO: this will end up in some kind of tight loop, - // won't it? We need some kind of backoff - throw new RuntimeException("Unable to fence " + target); + ZKFailoverController.this.fenceOldActive(data); + } + + @Override + public String toString() { + synchronized (ZKFailoverController.this) { + return "Elector callbacks for " + localTarget; } } } @@ -354,36 +852,21 @@ public abstract class ZKFailoverControll class HealthCallbacks implements HealthMonitor.Callback { @Override public void enteredState(HealthMonitor.State newState) { - LOG.info("Local service " + localTarget + - " entered state: " + newState); - switch (newState) { - case SERVICE_HEALTHY: - LOG.info("Joining master election for " + localTarget); - elector.joinElection(targetToData(localTarget)); - break; - - case INITIALIZING: - LOG.info("Ensuring that " + localTarget + " does not " + - "participate in active master election"); - elector.quitElection(false); - break; - - case SERVICE_UNHEALTHY: - case SERVICE_NOT_RESPONDING: - LOG.info("Quitting master election for " + localTarget + - " and marking that fencing is necessary"); - elector.quitElection(true); - break; - - case HEALTH_MONITOR_FAILED: - fatalError("Health monitor failed!"); - break; - - default: - throw new IllegalArgumentException("Unhandled state:" + newState); - } - - lastHealthState = newState; + setLastHealthState(newState); + recheckElectability(); } } + + private static class ActiveAttemptRecord { + private final boolean succeeded; + private final String status; + private final long nanoTime; + + public ActiveAttemptRecord(boolean succeeded, String status) { + this.succeeded = succeeded; + this.status = status; + this.nanoTime = System.nanoTime(); + } + } + }