hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1347804 [1/2] - in /hadoop/common/branches/branch-2/hadoop-common-project: ./ hadoop-auth/ hadoop-common/ hadoop-common/dev-support/ hadoop-common/src/main/bin/ hadoop-common/src/main/docs/ hadoop-common/src/main/java/ hadoop-common/src/ma...
Date Thu, 07 Jun 2012 21:25:37 GMT
Author: todd
Date: Thu Jun  7 21:25:34 2012
New Revision: 1347804

URL: http://svn.apache.org/viewvc?rev=1347804&view=rev
Log:
Merge HDFS-3042 (automatic failover) to branch-2 from trunk

Added:
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
      - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAZKUtil.java
      - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAZKUtil.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java
      - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java
      - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java
      - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java
      - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java
      - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto
      - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
      - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java
      - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
      - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHAZKUtil.java
      - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHAZKUtil.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java
      - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ZKFCTestUtil.java
      - copied unchanged from r1342112, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ZKFCTestUtil.java
Modified:
    hadoop/common/branches/branch-2/hadoop-common-project/   (props changed)
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-auth/   (props changed)
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/   (props changed)
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt   (props changed)
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/docs/   (props changed)
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/   (props changed)
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-env.sh
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/core/   (props changed)
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestFailoverController.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java

Propchange: hadoop/common/branches/branch-2/hadoop-common-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project:r1342112

Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-auth/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-auth:r1342112

Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common:r1342112

Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.txt:r1306184-1342109
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1342112

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml?rev=1347804&r1=1347803&r2=1347804&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml Thu Jun  7 21:25:34 2012
@@ -290,5 +290,9 @@
       <!-- protobuf generated code -->
       <Class name="~org\.apache\.hadoop\.ha\.proto\.HAServiceProtocolProtos.*"/>
     </Match>
+    <Match>
+      <!-- protobuf generated code -->
+      <Class name="~org\.apache\.hadoop\.ha\.proto\.ZKFCProtocolProtos.*"/>
+    </Match>
 
  </FindBugsFilter>

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh?rev=1347804&r1=1347803&r2=1347804&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh Thu Jun  7 21:25:34 2012
@@ -141,7 +141,7 @@ case $startStop in
     echo starting $command, logging to $log
     cd "$HADOOP_PREFIX"
     case $command in
-      namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer)
+      namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer|zkfc)
         if [ -z "$HADOOP_HDFS_HOME" ]; then
           hdfsScript="$HADOOP_PREFIX"/bin/hdfs
         else

Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/docs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/docs:r1342112
  Merged /hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/docs:r1306184-1342109

Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java:r1306184-1342109
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1342112

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1347804&r1=1347803&r2=1347804&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java Thu Jun  7 21:25:34 2012
@@ -117,6 +117,8 @@ public class CommonConfigurationKeys ext
       "security.refresh.user.mappings.protocol.acl";
   public static final String 
   SECURITY_HA_SERVICE_PROTOCOL_ACL = "security.ha.service.protocol.acl";
+  public static final String 
+  SECURITY_ZKFC_PROTOCOL_ACL = "security.zkfc.protocol.acl";
   
   public static final String HADOOP_SECURITY_TOKEN_SERVICE_USE_IP =
       "hadoop.security.token.service.use_ip";

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1347804&r1=1347803&r2=1347804&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Thu Jun  7 21:25:34 2012
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.KeeperException;
@@ -81,9 +82,15 @@ public class ActiveStandbyElector implem
    */
   public interface ActiveStandbyElectorCallback {
     /**
-     * This method is called when the app becomes the active leader
+     * This method is called when the app becomes the active leader.
+     * If the service fails to become active, it should throw
+     * ServiceFailedException. This will cause the elector to
+     * sleep for a short period, then re-join the election.
+     * 
+     * Callback implementations are expected to manage their own
+     * timeouts (e.g. when making an RPC to a remote node).
      */
-    void becomeActive();
+    void becomeActive() throws ServiceFailedException;
 
     /**
      * This method is called when the app becomes a standby
@@ -134,7 +141,8 @@ public class ActiveStandbyElector implem
 
   public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
 
-  private static final int NUM_RETRIES = 3;
+  static int NUM_RETRIES = 3;
+  private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000;
 
   private static enum ConnectionState {
     DISCONNECTED, CONNECTED, TERMINATED
@@ -154,6 +162,7 @@ public class ActiveStandbyElector implem
   private final String zkHostPort;
   private final int zkSessionTimeout;
   private final List<ACL> zkAcl;
+  private final List<ZKAuthInfo> zkAuthInfo;
   private byte[] appData;
   private final String zkLockFilePath;
   private final String zkBreadCrumbPath;
@@ -185,6 +194,8 @@ public class ActiveStandbyElector implem
    *          znode under which to create the lock
    * @param acl
    *          ZooKeeper ACL's
+   * @param authInfo a list of authentication credentials to add to the
+   *                 ZK connection
    * @param app
    *          reference to callback interface object
    * @throws IOException
@@ -192,6 +203,7 @@ public class ActiveStandbyElector implem
    */
   public ActiveStandbyElector(String zookeeperHostPorts,
       int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
+      List<ZKAuthInfo> authInfo,
       ActiveStandbyElectorCallback app) throws IOException,
       HadoopIllegalArgumentException {
     if (app == null || acl == null || parentZnodeName == null
@@ -201,6 +213,7 @@ public class ActiveStandbyElector implem
     zkHostPort = zookeeperHostPorts;
     zkSessionTimeout = zookeeperSessionTimeout;
     zkAcl = acl;
+    zkAuthInfo = authInfo;
     appClient = app;
     znodeWorkingDir = parentZnodeName;
     zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
@@ -227,8 +240,6 @@ public class ActiveStandbyElector implem
   public synchronized void joinElection(byte[] data)
       throws HadoopIllegalArgumentException {
     
-    LOG.debug("Attempting active election");
-
     if (data == null) {
       throw new HadoopIllegalArgumentException("data cannot be null");
     }
@@ -236,6 +247,7 @@ public class ActiveStandbyElector implem
     appData = new byte[data.length];
     System.arraycopy(data, 0, appData, 0, data.length);
 
+    LOG.debug("Attempting active election for " + this);
     joinElectionInternal();
   }
   
@@ -259,6 +271,9 @@ public class ActiveStandbyElector implem
    */
   public synchronized void ensureParentZNode()
       throws IOException, InterruptedException {
+    Preconditions.checkState(!wantToBeInElection,
+        "ensureParentZNode() may not be called while in the election");
+
     String pathParts[] = znodeWorkingDir.split("/");
     Preconditions.checkArgument(pathParts.length >= 1 &&
         "".equals(pathParts[0]),
@@ -292,6 +307,9 @@ public class ActiveStandbyElector implem
    */
   public synchronized void clearParentZNode()
       throws IOException, InterruptedException {
+    Preconditions.checkState(!wantToBeInElection,
+        "clearParentZNode() may not be called while in the election");
+
     try {
       LOG.info("Recursively deleting " + znodeWorkingDir + " from ZK...");
 
@@ -360,7 +378,7 @@ public class ActiveStandbyElector implem
         createConnection();
       }
       Stat stat = new Stat();
-      return zkClient.getData(zkLockFilePath, false, stat);
+      return getDataWithRetries(zkLockFilePath, false, stat);
     } catch(KeeperException e) {
       Code code = e.code();
       if (isNodeDoesNotExist(code)) {
@@ -380,13 +398,17 @@ public class ActiveStandbyElector implem
       String name) {
     if (isStaleClient(ctx)) return;
     LOG.debug("CreateNode result: " + rc + " for path: " + path
-        + " connectionState: " + zkConnectionState);
+        + " connectionState: " + zkConnectionState +
+        "  for " + this);
 
     Code code = Code.get(rc);
     if (isSuccess(code)) {
       // we successfully created the znode. we are the leader. start monitoring
-      becomeActive();
-      monitorActiveStatus();
+      if (becomeActive()) {
+        monitorActiveStatus();
+      } else {
+        reJoinElectionAfterFailureToBecomeActive();
+      }
       return;
     }
 
@@ -433,8 +455,13 @@ public class ActiveStandbyElector implem
   public synchronized void processResult(int rc, String path, Object ctx,
       Stat stat) {
     if (isStaleClient(ctx)) return;
+    
+    assert wantToBeInElection :
+        "Got a StatNode result after quitting election";
+    
     LOG.debug("StatNode result: " + rc + " for path: " + path
-        + " connectionState: " + zkConnectionState);
+        + " connectionState: " + zkConnectionState + " for " + this);
+        
 
     Code code = Code.get(rc);
     if (isSuccess(code)) {
@@ -442,7 +469,9 @@ public class ActiveStandbyElector implem
       // creation was retried
       if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
         // we own the lock znode. so we are the leader
-        becomeActive();
+        if (!becomeActive()) {
+          reJoinElectionAfterFailureToBecomeActive();
+        }
       } else {
         // we dont own the lock znode. so we are a standby.
         becomeStandby();
@@ -470,20 +499,37 @@ public class ActiveStandbyElector implem
       }
       errorMessage = errorMessage
           + ". Not retrying further znode monitoring connection errors.";
+    } else if (isSessionExpired(code)) {
+      // This isn't fatal - the client Watcher will re-join the election
+      LOG.warn("Lock monitoring failed because session was lost");
+      return;
     }
 
     fatalError(errorMessage);
   }
 
   /**
-   * interface implementation of Zookeeper watch events (connection and node)
+   * We failed to become active. Re-join the election, but
+   * sleep for a few seconds after terminating our existing
+   * session, so that other nodes have a chance to become active.
+   * The failure to become active is already logged inside
+   * becomeActive().
+   */
+  private void reJoinElectionAfterFailureToBecomeActive() {
+    reJoinElection(SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE);
+  }
+
+  /**
+   * interface implementation of Zookeeper watch events (connection and node),
+   * proxied by {@link WatcherWithClientRef}.
    */
   synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
     Event.EventType eventType = event.getType();
     if (isStaleClient(zk)) return;
     LOG.debug("Watcher event type: " + eventType + " with state:"
         + event.getState() + " for path:" + event.getPath()
-        + " connectionState: " + zkConnectionState);
+        + " connectionState: " + zkConnectionState
+        + " for " + this);
 
     if (eventType == Event.EventType.None) {
       // the connection state has changed
@@ -494,7 +540,8 @@ public class ActiveStandbyElector implem
         // be undone
         ConnectionState prevConnectionState = zkConnectionState;
         zkConnectionState = ConnectionState.CONNECTED;
-        if (prevConnectionState == ConnectionState.DISCONNECTED) {
+        if (prevConnectionState == ConnectionState.DISCONNECTED &&
+            wantToBeInElection) {
           monitorActiveStatus();
         }
         break;
@@ -511,7 +558,7 @@ public class ActiveStandbyElector implem
         // call listener to reconnect
         LOG.info("Session expired. Entering neutral mode and rejoining...");
         enterNeutralMode();
-        reJoinElection();
+        reJoinElection(0);
         break;
       default:
         fatalError("Unexpected Zookeeper watch event state: "
@@ -559,16 +606,21 @@ public class ActiveStandbyElector implem
   protected synchronized ZooKeeper getNewZooKeeper() throws IOException {
     ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
     zk.register(new WatcherWithClientRef(zk));
+    for (ZKAuthInfo auth : zkAuthInfo) {
+      zk.addAuthInfo(auth.getScheme(), auth.getAuth());
+    }
     return zk;
   }
 
   private void fatalError(String errorMessage) {
+    LOG.fatal(errorMessage);
     reset();
     appClient.notifyFatalError(errorMessage);
   }
 
   private void monitorActiveStatus() {
-    LOG.debug("Monitoring active leader");
+    assert wantToBeInElection;
+    LOG.debug("Monitoring active leader for " + this);
     statRetryCount = 0;
     monitorLockNodeAsync();
   }
@@ -586,7 +638,7 @@ public class ActiveStandbyElector implem
     createLockNodeAsync();
   }
 
-  private void reJoinElection() {
+  private void reJoinElection(int sleepTime) {
     LOG.info("Trying to re-establish ZK session");
     
     // Some of the test cases rely on expiring the ZK sessions and
@@ -599,12 +651,30 @@ public class ActiveStandbyElector implem
     sessionReestablishLockForTests.lock();
     try {
       terminateConnection();
+      sleepFor(sleepTime);
+      
       joinElectionInternal();
     } finally {
       sessionReestablishLockForTests.unlock();
     }
   }
-  
+
+  /**
+   * Sleep for the given number of milliseconds.
+   * This is non-static, and separated out, so that unit tests
+   * can override the behavior not to sleep.
+   */
+  @VisibleForTesting
+  protected void sleepFor(int sleepMs) {
+    if (sleepMs > 0) {
+      try {
+        Thread.sleep(sleepMs);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
   @VisibleForTesting
   void preventSessionReestablishmentForTests() {
     sessionReestablishLockForTests.lock();
@@ -616,8 +686,12 @@ public class ActiveStandbyElector implem
   }
   
   @VisibleForTesting
-  long getZKSessionIdForTests() {
-    return zkClient.getSessionId();
+  synchronized long getZKSessionIdForTests() {
+    if (zkClient != null) {
+      return zkClient.getSessionId();
+    } else {
+      return -1;
+    }
   }
   
   @VisibleForTesting
@@ -629,17 +703,13 @@ public class ActiveStandbyElector implem
     int connectionRetryCount = 0;
     boolean success = false;
     while(!success && connectionRetryCount < NUM_RETRIES) {
-      LOG.debug("Establishing zookeeper connection");
+      LOG.debug("Establishing zookeeper connection for " + this);
       try {
         createConnection();
         success = true;
       } catch(IOException e) {
         LOG.warn(e);
-        try {
-          Thread.sleep(5000);
-        } catch(InterruptedException e1) {
-          LOG.warn(e1);
-        }
+        sleepFor(5000);
       }
       ++connectionRetryCount;
     }
@@ -647,14 +717,24 @@ public class ActiveStandbyElector implem
   }
 
   private void createConnection() throws IOException {
+    if (zkClient != null) {
+      try {
+        zkClient.close();
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while closing ZK",
+            e);
+      }
+      zkClient = null;
+    }
     zkClient = getNewZooKeeper();
+    LOG.debug("Created new connection for " + this);
   }
   
-  private void terminateConnection() {
+  void terminateConnection() {
     if (zkClient == null) {
       return;
     }
-    LOG.debug("Terminating ZK connection");
+    LOG.debug("Terminating ZK connection for " + this);
     ZooKeeper tempZk = zkClient;
     zkClient = null;
     try {
@@ -670,20 +750,24 @@ public class ActiveStandbyElector implem
     terminateConnection();
   }
 
-  private void becomeActive() {
+  private boolean becomeActive() {
     assert wantToBeInElection;
-    if (state != State.ACTIVE) {
-      try {
-        Stat oldBreadcrumbStat = fenceOldActive();
-        writeBreadCrumbNode(oldBreadcrumbStat);
-      } catch (Exception e) {
-        LOG.warn("Exception handling the winning of election", e);
-        reJoinElection();
-        return;
-      }
-      LOG.debug("Becoming active");
-      state = State.ACTIVE;
+    if (state == State.ACTIVE) {
+      // already active
+      return true;
+    }
+    try {
+      Stat oldBreadcrumbStat = fenceOldActive();
+      writeBreadCrumbNode(oldBreadcrumbStat);
+      
+      LOG.debug("Becoming active for " + this);
       appClient.becomeActive();
+      state = State.ACTIVE;
+      return true;
+    } catch (Exception e) {
+      LOG.warn("Exception handling the winning of election", e);
+      // Caller will handle quitting and rejoining the election.
+      return false;
     }
   }
 
@@ -779,7 +863,7 @@ public class ActiveStandbyElector implem
 
   private void becomeStandby() {
     if (state != State.STANDBY) {
-      LOG.debug("Becoming standby");
+      LOG.debug("Becoming standby for " + this);
       state = State.STANDBY;
       appClient.becomeStandby();
     }
@@ -787,7 +871,7 @@ public class ActiveStandbyElector implem
 
   private void enterNeutralMode() {
     if (state != State.NEUTRAL) {
-      LOG.debug("Entering neutral mode");
+      LOG.debug("Entering neutral mode for " + this);
       state = State.NEUTRAL;
       appClient.enterNeutralMode();
     }
@@ -814,6 +898,15 @@ public class ActiveStandbyElector implem
     });
   }
 
+  private byte[] getDataWithRetries(final String path, final boolean watch,
+      final Stat stat) throws InterruptedException, KeeperException {
+    return zkDoWithRetries(new ZKAction<byte[]>() {
+      public byte[] run() throws KeeperException, InterruptedException {
+        return zkClient.getData(path, watch, stat);
+      }
+    });
+  }
+
   private Stat setDataWithRetries(final String path, final byte[] data,
       final int version) throws InterruptedException, KeeperException {
     return zkDoWithRetries(new ZKAction<Stat>() {
@@ -884,8 +977,14 @@ public class ActiveStandbyElector implem
 
     @Override
     public void process(WatchedEvent event) {
-      ActiveStandbyElector.this.processWatchEvent(
-          zk, event);
+      try {
+        ActiveStandbyElector.this.processWatchEvent(
+            zk, event);
+      } catch (Throwable t) {
+        fatalError(
+            "Failed to process watcher event " + event + ": " +
+            StringUtils.stringifyException(t));
+      }
     }
   }
 
@@ -913,5 +1012,13 @@ public class ActiveStandbyElector implem
     }
     return false;
   }
+  
+  @Override
+  public String toString() {
+    return "elector id=" + System.identityHashCode(this) +
+      " appData=" +
+      ((appData == null) ? "null" : StringUtils.byteToHexString(appData)) + 
+      " cb=" + appClient;
+  }
 
 }

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java?rev=1347804&r1=1347803&r2=1347804&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java Thu Jun  7 21:25:34 2012
@@ -27,6 +27,8 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
 import org.apache.hadoop.ipc.RPC;
 
 import com.google.common.base.Preconditions;
@@ -48,9 +50,12 @@ public class FailoverController {
   
   private final Configuration conf;
 
+  private final RequestSource requestSource;
   
-  public FailoverController(Configuration conf) {
+  public FailoverController(Configuration conf,
+      RequestSource source) {
     this.conf = conf;
+    this.requestSource = source;
     
     this.gracefulFenceTimeout = getGracefulFenceTimeout(conf);
     this.rpcTimeoutToNewActive = getRpcTimeoutToNewActive(conf);
@@ -100,7 +105,7 @@ public class FailoverController {
       toSvcStatus = toSvc.getServiceStatus();
     } catch (IOException e) {
       String msg = "Unable to get service state for " + target;
-      LOG.error(msg, e);
+      LOG.error(msg + ": " + e.getLocalizedMessage());
       throw new FailoverFailedException(msg, e);
     }
 
@@ -122,7 +127,7 @@ public class FailoverController {
     }
 
     try {
-      HAServiceProtocolHelper.monitorHealth(toSvc);
+      HAServiceProtocolHelper.monitorHealth(toSvc, createReqInfo());
     } catch (HealthCheckFailedException hce) {
       throw new FailoverFailedException(
           "Can't failover to an unhealthy service", hce);
@@ -132,7 +137,10 @@ public class FailoverController {
     }
   }
   
-  
+  private StateChangeRequestInfo createReqInfo() {
+    return new StateChangeRequestInfo(requestSource);
+  }
+
   /**
    * Try to get the HA state of the node at the given address. This
    * function is guaranteed to be "quick" -- ie it has a short timeout
@@ -143,7 +151,7 @@ public class FailoverController {
     HAServiceProtocol proxy = null;
     try {
       proxy = svc.getProxy(conf, gracefulFenceTimeout);
-      proxy.transitionToStandby();
+      proxy.transitionToStandby(createReqInfo());
       return true;
     } catch (ServiceFailedException sfe) {
       LOG.warn("Unable to gracefully make " + svc + " standby (" +
@@ -198,7 +206,8 @@ public class FailoverController {
     Throwable cause = null;
     try {
       HAServiceProtocolHelper.transitionToActive(
-          toSvc.getProxy(conf, rpcTimeoutToNewActive));
+          toSvc.getProxy(conf, rpcTimeoutToNewActive),
+          createReqInfo());
     } catch (ServiceFailedException sfe) {
       LOG.error("Unable to make " + toSvc + " active (" +
           sfe.getMessage() + "). Failing back.");

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java?rev=1347804&r1=1347803&r2=1347804&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java Thu Jun  7 21:25:34 2012
@@ -19,11 +19,11 @@ package org.apache.hadoop.ha;
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.Arrays;
 import java.util.Map;
 
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
@@ -33,9 +33,12 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 
 /**
@@ -49,6 +52,13 @@ public abstract class HAAdmin extends Co
   
   private static final String FORCEFENCE  = "forcefence";
   private static final String FORCEACTIVE = "forceactive";
+  
+  /**
+   * Undocumented flag which allows an administrator to use manual failover
+   * state transitions even when auto-failover is enabled. This is an unsafe
+   * operation, which is why it is not documented in the usage below.
+   */
+  private static final String FORCEMANUAL = "forcemanual";
   private static final Log LOG = LogFactory.getLog(HAAdmin.class);
 
   private int rpcTimeoutForChecks = -1;
@@ -79,6 +89,7 @@ public abstract class HAAdmin extends Co
   /** Output stream for errors, for use in tests */
   protected PrintStream errOut = System.err;
   PrintStream out = System.out;
+  private RequestSource requestSource = RequestSource.REQUEST_BY_USER;
 
   protected abstract HAServiceTarget resolveTarget(String string);
 
@@ -106,63 +117,83 @@ public abstract class HAAdmin extends Co
     errOut.println("Usage: HAAdmin [" + cmd + " " + usage.args + "]");
   }
 
-  private int transitionToActive(final String[] argv)
+  private int transitionToActive(final CommandLine cmd)
       throws IOException, ServiceFailedException {
-    if (argv.length != 2) {
+    String[] argv = cmd.getArgs();
+    if (argv.length != 1) {
       errOut.println("transitionToActive: incorrect number of arguments");
       printUsage(errOut, "-transitionToActive");
       return -1;
     }
-    
-    HAServiceProtocol proto = resolveTarget(argv[1]).getProxy(
+    HAServiceTarget target = resolveTarget(argv[0]);
+    if (!checkManualStateManagementOK(target)) {
+      return -1;
+    }
+    HAServiceProtocol proto = target.getProxy(
         getConf(), 0);
-    HAServiceProtocolHelper.transitionToActive(proto);
+    HAServiceProtocolHelper.transitionToActive(proto, createReqInfo());
     return 0;
   }
 
-  private int transitionToStandby(final String[] argv)
+  private int transitionToStandby(final CommandLine cmd)
       throws IOException, ServiceFailedException {
-    if (argv.length != 2) {
+    String[] argv = cmd.getArgs();
+    if (argv.length != 1) {
       errOut.println("transitionToStandby: incorrect number of arguments");
       printUsage(errOut, "-transitionToStandby");
       return -1;
     }
     
-    HAServiceProtocol proto = resolveTarget(argv[1]).getProxy(
+    HAServiceTarget target = resolveTarget(argv[0]);
+    if (!checkManualStateManagementOK(target)) {
+      return -1;
+    }
+    HAServiceProtocol proto = target.getProxy(
         getConf(), 0);
-    HAServiceProtocolHelper.transitionToStandby(proto);
+    HAServiceProtocolHelper.transitionToStandby(proto, createReqInfo());
     return 0;
   }
+  /**
+   * Ensure that we are allowed to manually manage the HA state of the target
+   * service. If automatic failover is configured, then the automatic
+   * failover controllers should be doing state management, and it is generally
+   * an error to use the HAAdmin command line to do so.
+   * 
+   * @param target the target to check
+   * @return true if manual state management is allowed
+   */
+  private boolean checkManualStateManagementOK(HAServiceTarget target) {
+    if (target.isAutoFailoverEnabled()) {
+      if (requestSource != RequestSource.REQUEST_BY_USER_FORCED) {
+        errOut.println(
+            "Automatic failover is enabled for " + target + "\n" +
+            "Refusing to manually manage HA state, since it may cause\n" +
+            "a split-brain scenario or other incorrect state.\n" +
+            "If you are very sure you know what you are doing, please \n" +
+            "specify the " + FORCEMANUAL + " flag.");
+        return false;
+      } else {
+        LOG.warn("Proceeding with manual HA state management even though\n" +
+            "automatic failover is enabled for " + target);
+        return true;
+      }
+    }
+    return true;
+  }
 
-  private int failover(final String[] argv)
-      throws IOException, ServiceFailedException {
-    boolean forceFence = false;
-    boolean forceActive = false;
-
-    Options failoverOpts = new Options();
-    // "-failover" isn't really an option but we need to add
-    // it to appease CommandLineParser
-    failoverOpts.addOption("failover", false, "failover");
-    failoverOpts.addOption(FORCEFENCE, false, "force fencing");
-    failoverOpts.addOption(FORCEACTIVE, false, "force failover");
+  private StateChangeRequestInfo createReqInfo() {
+    return new StateChangeRequestInfo(requestSource);
+  }
 
-    CommandLineParser parser = new GnuParser();
-    CommandLine cmd;
+  private int failover(CommandLine cmd)
+      throws IOException, ServiceFailedException {
+    boolean forceFence = cmd.hasOption(FORCEFENCE);
+    boolean forceActive = cmd.hasOption(FORCEACTIVE);
 
-    try {
-      cmd = parser.parse(failoverOpts, argv);
-      forceFence = cmd.hasOption(FORCEFENCE);
-      forceActive = cmd.hasOption(FORCEACTIVE);
-    } catch (ParseException pe) {
-      errOut.println("failover: incorrect arguments");
-      printUsage(errOut, "-failover");
-      return -1;
-    }
-    
     int numOpts = cmd.getOptions() == null ? 0 : cmd.getOptions().length;
     final String[] args = cmd.getArgs();
 
-    if (numOpts > 2 || args.length != 2) {
+    if (numOpts > 3 || args.length != 2) {
       errOut.println("failover: incorrect arguments");
       printUsage(errOut, "-failover");
       return -1;
@@ -171,7 +202,30 @@ public abstract class HAAdmin extends Co
     HAServiceTarget fromNode = resolveTarget(args[0]);
     HAServiceTarget toNode = resolveTarget(args[1]);
     
-    FailoverController fc = new FailoverController(getConf());
+    // Check that auto-failover is consistently configured for both nodes.
+    Preconditions.checkState(
+        fromNode.isAutoFailoverEnabled() ==
+          toNode.isAutoFailoverEnabled(),
+          "Inconsistent auto-failover configs between %s and %s!",
+          fromNode, toNode);
+    
+    if (fromNode.isAutoFailoverEnabled()) {
+      if (forceFence || forceActive) {
+        // -forceActive doesn't make sense with auto-HA, since, if the node
+        // is not healthy, then its ZKFC will immediately quit the election
+        // again the next time a health check runs.
+        //
+        // -forceFence doesn't seem to have any real use cases with auto-HA
+        // so it isn't implemented.
+        errOut.println(FORCEFENCE + " and " + FORCEACTIVE + " flags not " +
+            "supported with auto-failover enabled.");
+        return -1;
+      }
+      return gracefulFailoverThroughZKFCs(toNode);
+    }
+    
+    FailoverController fc = new FailoverController(getConf(),
+        requestSource);
     
     try {
       fc.failover(fromNode, toNode, forceFence, forceActive); 
@@ -182,19 +236,44 @@ public abstract class HAAdmin extends Co
     }
     return 0;
   }
+  
+
+  /**
+   * Initiate a graceful failover by talking to the target node's ZKFC.
+   * This sends an RPC to the ZKFC, which coordinates the failover.
+   * 
+   * @param toNode the node to fail to
+   * @return status code (0 for success)
+   * @throws IOException if failover does not succeed
+   */
+  private int gracefulFailoverThroughZKFCs(HAServiceTarget toNode)
+      throws IOException {
+
+    int timeout = FailoverController.getRpcTimeoutToNewActive(getConf());
+    ZKFCProtocol proxy = toNode.getZKFCProxy(getConf(), timeout);
+    try {
+      proxy.gracefulFailover();
+      out.println("Failover to " + toNode + " successful");
+    } catch (ServiceFailedException sfe) {
+      errOut.println("Failover failed: " + sfe.getLocalizedMessage());
+      return -1;
+    }
+
+    return 0;
+  }
 
-  private int checkHealth(final String[] argv)
+  private int checkHealth(final CommandLine cmd)
       throws IOException, ServiceFailedException {
-    if (argv.length != 2) {
+    String[] argv = cmd.getArgs();
+    if (argv.length != 1) {
       errOut.println("checkHealth: incorrect number of arguments");
       printUsage(errOut, "-checkHealth");
       return -1;
     }
-    
-    HAServiceProtocol proto = resolveTarget(argv[1]).getProxy(
+    HAServiceProtocol proto = resolveTarget(argv[0]).getProxy(
         getConf(), rpcTimeoutForChecks);
     try {
-      HAServiceProtocolHelper.monitorHealth(proto);
+      HAServiceProtocolHelper.monitorHealth(proto, createReqInfo());
     } catch (HealthCheckFailedException e) {
       errOut.println("Health check failed: " + e.getLocalizedMessage());
       return -1;
@@ -202,15 +281,16 @@ public abstract class HAAdmin extends Co
     return 0;
   }
 
-  private int getServiceState(final String[] argv)
+  private int getServiceState(final CommandLine cmd)
       throws IOException, ServiceFailedException {
-    if (argv.length != 2) {
+    String[] argv = cmd.getArgs();
+    if (argv.length != 1) {
       errOut.println("getServiceState: incorrect number of arguments");
       printUsage(errOut, "-getServiceState");
       return -1;
     }
 
-    HAServiceProtocol proto = resolveTarget(argv[1]).getProxy(
+    HAServiceProtocol proto = resolveTarget(argv[0]).getProxy(
         getConf(), rpcTimeoutForChecks);
     out.println(proto.getServiceStatus().getState());
     return 0;
@@ -263,26 +343,101 @@ public abstract class HAAdmin extends Co
       printUsage(errOut);
       return -1;
     }
+    
+    if (!USAGE.containsKey(cmd)) {
+      errOut.println(cmd.substring(1) + ": Unknown command");
+      printUsage(errOut);
+      return -1;
+    }
+    
+    Options opts = new Options();
+
+    // Add command-specific options
+    if ("-failover".equals(cmd)) {
+      addFailoverCliOpts(opts);
+    }
+    // Mutative commands take FORCEMANUAL option
+    if ("-transitionToActive".equals(cmd) ||
+        "-transitionToStandby".equals(cmd) ||
+        "-failover".equals(cmd)) {
+      opts.addOption(FORCEMANUAL, false,
+          "force manual control even if auto-failover is enabled");
+    }
+         
+    CommandLine cmdLine = parseOpts(cmd, opts, argv);
+    if (cmdLine == null) {
+      // error already printed
+      return -1;
+    }
+    
+    if (cmdLine.hasOption(FORCEMANUAL)) {
+      if (!confirmForceManual()) {
+        LOG.fatal("Aborted");
+        return -1;
+      }
+      // Instruct the NNs to honor this request even if they're
+      // configured for manual failover.
+      requestSource = RequestSource.REQUEST_BY_USER_FORCED;
+    }
 
     if ("-transitionToActive".equals(cmd)) {
-      return transitionToActive(argv);
+      return transitionToActive(cmdLine);
     } else if ("-transitionToStandby".equals(cmd)) {
-      return transitionToStandby(argv);
+      return transitionToStandby(cmdLine);
     } else if ("-failover".equals(cmd)) {
-      return failover(argv);
+      return failover(cmdLine);
     } else if ("-getServiceState".equals(cmd)) {
-      return getServiceState(argv);
+      return getServiceState(cmdLine);
     } else if ("-checkHealth".equals(cmd)) {
-      return checkHealth(argv);
+      return checkHealth(cmdLine);
     } else if ("-help".equals(cmd)) {
       return help(argv);
     } else {
-      errOut.println(cmd.substring(1) + ": Unknown command");
-      printUsage(errOut);
-      return -1;
+      // we already checked command validity above, so getting here
+      // would be a coding error
+      throw new AssertionError("Should not get here, command: " + cmd);
     } 
   }
   
+  private boolean confirmForceManual() throws IOException {
+     return ToolRunner.confirmPrompt(
+        "You have specified the " + FORCEMANUAL + " flag. This flag is " +
+        "dangerous, as it can induce a split-brain scenario that WILL " +
+        "CORRUPT your HDFS namespace, possibly irrecoverably.\n" +
+        "\n" +
+        "It is recommended not to use this flag, but instead to shut down the " +
+        "cluster and disable automatic failover if you prefer to manually " +
+        "manage your HA state.\n" +
+        "\n" +
+        "You may abort safely by answering 'n' or hitting ^C now.\n" +
+        "\n" +
+        "Are you sure you want to continue?");
+  }
+
+  /**
+   * Add CLI options which are specific to the failover command and no
+   * others.
+   */
+  private void addFailoverCliOpts(Options failoverOpts) {
+    failoverOpts.addOption(FORCEFENCE, false, "force fencing");
+    failoverOpts.addOption(FORCEACTIVE, false, "force failover");
+    // Don't add FORCEMANUAL, since that's added separately for all commands
+    // that change state.
+  }
+  
+  private CommandLine parseOpts(String cmdName, Options opts, String[] argv) {
+    try {
+      // Strip off the first arg, since that's just the command name
+      argv = Arrays.copyOfRange(argv, 1, argv.length); 
+      return new GnuParser().parse(opts, argv);
+    } catch (ParseException pe) {
+      errOut.println(cmdName.substring(1) +
+          ": incorrect arguments");
+      printUsage(errOut, cmdName);
+      return null;
+    }
+  }
+  
   private int help(String[] argv) {
     if (argv.length != 2) {
       printUsage(errOut, "-help");

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java?rev=1347804&r1=1347803&r2=1347804&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java Thu Jun  7 21:25:34 2012
@@ -60,6 +60,31 @@ public interface HAServiceProtocol {
       return name;
     }
   }
+  
+  public static enum RequestSource {
+    REQUEST_BY_USER,
+    REQUEST_BY_USER_FORCED,
+    REQUEST_BY_ZKFC;
+  }
+  
+  /**
+   * Information describing the source for a request to change state.
+   * This is used to differentiate requests from automatic vs CLI
+   * failover controllers, and in the future may include epoch
+   * information.
+   */
+  public static class StateChangeRequestInfo {
+    private final RequestSource source;
+
+    public StateChangeRequestInfo(RequestSource source) {
+      super();
+      this.source = source;
+    }
+
+    public RequestSource getSource() {
+      return source;
+    }
+  }
 
   /**
    * Monitor the health of service. This periodically called by the HA
@@ -95,7 +120,8 @@ public interface HAServiceProtocol {
    * @throws IOException
    *           if other errors happen
    */
-  public void transitionToActive() throws ServiceFailedException,
+  public void transitionToActive(StateChangeRequestInfo reqInfo)
+                                   throws ServiceFailedException,
                                           AccessControlException,
                                           IOException;
 
@@ -110,7 +136,8 @@ public interface HAServiceProtocol {
    * @throws IOException
    *           if other errors happen
    */
-  public void transitionToStandby() throws ServiceFailedException,
+  public void transitionToStandby(StateChangeRequestInfo reqInfo)
+                                    throws ServiceFailedException,
                                            AccessControlException,
                                            IOException;
 

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java?rev=1347804&r1=1347803&r2=1347804&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java Thu Jun  7 21:25:34 2012
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
 import org.apache.hadoop.ipc.RemoteException;
 
 /**
@@ -30,7 +31,8 @@ import org.apache.hadoop.ipc.RemoteExcep
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class HAServiceProtocolHelper {
-  public static void monitorHealth(HAServiceProtocol svc)
+  public static void monitorHealth(HAServiceProtocol svc,
+      StateChangeRequestInfo reqInfo)
       throws IOException {
     try {
       svc.monitorHealth();
@@ -39,19 +41,21 @@ public class HAServiceProtocolHelper {
     }
   }
 
-  public static void transitionToActive(HAServiceProtocol svc)
+  public static void transitionToActive(HAServiceProtocol svc,
+      StateChangeRequestInfo reqInfo)
       throws IOException {
     try {
-      svc.transitionToActive();
+      svc.transitionToActive(reqInfo);
     } catch (RemoteException e) {
       throw e.unwrapRemoteException(ServiceFailedException.class);
     }
   }
 
-  public static void transitionToStandby(HAServiceProtocol svc)
+  public static void transitionToStandby(HAServiceProtocol svc,
+      StateChangeRequestInfo reqInfo)
       throws IOException {
     try {
-      svc.transitionToStandby();
+      svc.transitionToStandby(reqInfo);
     } catch (RemoteException e) {
       throw e.unwrapRemoteException(ServiceFailedException.class);
     }

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java?rev=1347804&r1=1347803&r2=1347804&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java Thu Jun  7 21:25:34 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ha.protocolPB.ZKFCProtocolClientSideTranslatorPB;
 import org.apache.hadoop.net.NetUtils;
 
 import com.google.common.collect.Maps;
@@ -49,6 +50,11 @@ public abstract class HAServiceTarget {
   public abstract InetSocketAddress getAddress();
 
   /**
+   * @return the IPC address of the ZKFC on the target node
+   */
+  public abstract InetSocketAddress getZKFCAddress();
+
+  /**
    * @return a Fencer implementation configured for this target node
    */
   public abstract NodeFencer getFencer();
@@ -76,6 +82,20 @@ public abstract class HAServiceTarget {
         confCopy, factory, timeoutMs);
   }
   
+  /**
+   * @return a proxy to the ZKFC which is associated with this HA service.
+   */
+  public ZKFCProtocol getZKFCProxy(Configuration conf, int timeoutMs)
+      throws IOException {
+    Configuration confCopy = new Configuration(conf);
+    // Lower the timeout so we quickly fail to connect
+    confCopy.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
+    SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy);
+    return new ZKFCProtocolClientSideTranslatorPB(
+        getZKFCAddress(),
+        confCopy, factory, timeoutMs);
+  }
+  
   public final Map<String, String> getFencingParameters() {
     Map<String, String> ret = Maps.newHashMap();
     addFencingParameters(ret);
@@ -99,4 +119,11 @@ public abstract class HAServiceTarget {
     ret.put(HOST_SUBST_KEY, getAddress().getHostName());
     ret.put(PORT_SUBST_KEY, String.valueOf(getAddress().getPort()));
   }
+
+  /**
+   * @return true if auto failover should be considered enabled
+   */
+  public boolean isAutoFailoverEnabled() {
+    return false;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java?rev=1347804&r1=1347803&r2=1347804&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java Thu Jun  7 21:25:34 2012
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,7 +44,8 @@ import com.google.common.base.Preconditi
  * Classes which need callbacks should implement the {@link Callback}
  * interface.
  */
-class HealthMonitor {
+@InterfaceAudience.Private
+public class HealthMonitor {
   private static final Log LOG = LogFactory.getLog(
       HealthMonitor.class);
 
@@ -75,7 +77,8 @@ class HealthMonitor {
   private HAServiceStatus lastServiceState = new HAServiceStatus(
       HAServiceState.INITIALIZING);
   
-  enum State {
+  @InterfaceAudience.Private
+  public enum State {
     /**
      * The health monitor is still starting up.
      */

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java?rev=1347804&r1=1347803&r2=1347804&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java Thu Jun  7 21:25:34 2012
@@ -18,79 +18,143 @@
 package org.apache.hadoop.ha;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
 import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
+import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
 import org.apache.hadoop.ha.HealthMonitor.State;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.zookeeper.data.ACL;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 @InterfaceAudience.LimitedPrivate("HDFS")
-public abstract class ZKFailoverController implements Tool {
+public abstract class ZKFailoverController {
 
   static final Log LOG = LogFactory.getLog(ZKFailoverController.class);
   
-  // TODO: this should be namespace-scoped
   public static final String ZK_QUORUM_KEY = "ha.zookeeper.quorum";
   private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms";
   private static final int ZK_SESSION_TIMEOUT_DEFAULT = 5*1000;
   private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode";
+  public static final String ZK_ACL_KEY = "ha.zookeeper.acl";
+  private static final String ZK_ACL_DEFAULT = "world:anyone:rwcda";
+  public static final String ZK_AUTH_KEY = "ha.zookeeper.auth";
   static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha";
 
+  /**
+   * All of the conf keys used by the ZKFC. This is used in order to allow
+   * them to be overridden on a per-nameservice or per-namenode basis.
+   */
+  protected static final String[] ZKFC_CONF_KEYS = new String[] {
+    ZK_QUORUM_KEY,
+    ZK_SESSION_TIMEOUT_KEY,
+    ZK_PARENT_ZNODE_KEY,
+    ZK_ACL_KEY,
+    ZK_AUTH_KEY
+  };
+  
+
   /** Unable to format the parent znode in ZK */
   static final int ERR_CODE_FORMAT_DENIED = 2;
   /** The parent znode doesn't exist in ZK */
   static final int ERR_CODE_NO_PARENT_ZNODE = 3;
   /** Fencing is not properly configured */
   static final int ERR_CODE_NO_FENCER = 4;
+  /** Automatic failover is not enabled */
+  static final int ERR_CODE_AUTO_FAILOVER_NOT_ENABLED = 5;
+  /** Cannot connect to ZooKeeper */
+  static final int ERR_CODE_NO_ZK = 6;
   
-  private Configuration conf;
+  protected Configuration conf;
+  private String zkQuorum;
+  protected final HAServiceTarget localTarget;
 
   private HealthMonitor healthMonitor;
   private ActiveStandbyElector elector;
-
-  private HAServiceTarget localTarget;
-
-  private String parentZnode;
+  protected ZKFCRpcServer rpcServer;
 
   private State lastHealthState = State.INITIALIZING;
 
   /** Set if a fatal error occurs */
   private String fatalError = null;
 
-  @Override
-  public void setConf(Configuration conf) {
+  /**
+   * A future nanotime before which the ZKFC will not join the election.
+   * This is used during graceful failover.
+   */
+  private long delayJoiningUntilNanotime = 0;
+
+  /** Executor on which {@link #scheduleRecheck(long)} schedules events */
+  private ScheduledExecutorService delayExecutor =
+    Executors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("ZKFC Delay timer #%d")
+            .build());
+
+  private ActiveAttemptRecord lastActiveAttemptRecord;
+  private Object activeAttemptRecordLock = new Object();
+
+  protected ZKFailoverController(Configuration conf, HAServiceTarget localTarget) {
+    this.localTarget = localTarget;
     this.conf = conf;
-    localTarget = getLocalTarget();
   }
   
 
   protected abstract byte[] targetToData(HAServiceTarget target);
-  protected abstract HAServiceTarget getLocalTarget();  
   protected abstract HAServiceTarget dataToTarget(byte[] data);
+  protected abstract void loginAsFCUser() throws IOException;
+  protected abstract void checkRpcAdminAccess()
+      throws AccessControlException, IOException;
+  protected abstract InetSocketAddress getRpcAddressToBindTo();
+  protected abstract PolicyProvider getPolicyProvider();
 
+  /**
+   * Return the name of a znode inside the configured parent znode in which
+   * the ZKFC will do all of its work. This is so that multiple federated
+   * nameservices can run on the same ZK quorum without having to manually
+   * configure them to separate subdirectories.
+   */
+  protected abstract String getScopeInsideParentNode();
 
-  @Override
-  public Configuration getConf() {
-    return conf;
+  public HAServiceTarget getLocalTarget() {
+    return localTarget;
   }
-
-  @Override
+  
   public int run(final String[] args) throws Exception {
-    // TODO: need to hook DFS here to find the NN keytab info, etc,
-    // similar to what DFSHAAdmin does. Annoying that this is in common.
+    if (!localTarget.isAutoFailoverEnabled()) {
+      LOG.fatal("Automatic failover is not enabled for " + localTarget + "." +
+          " Please ensure that automatic failover is enabled in the " +
+          "configuration before running the ZK failover controller.");
+      return ERR_CODE_AUTO_FAILOVER_NOT_ENABLED;
+    }
+    loginAsFCUser();
     try {
       return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
         @Override
@@ -99,6 +163,10 @@ public abstract class ZKFailoverControll
             return doRun(args);
           } catch (Exception t) {
             throw new RuntimeException(t);
+          } finally {
+            if (elector != null) {
+              elector.terminateConnection();
+            }
           }
         }
       });
@@ -107,6 +175,7 @@ public abstract class ZKFailoverControll
     }
   }
   
+
   private int doRun(String[] args)
       throws HadoopIllegalArgumentException, IOException, InterruptedException {
     initZK();
@@ -129,11 +198,23 @@ public abstract class ZKFailoverControll
       }
     }
     
-    if (!elector.parentZNodeExists()) {
-      LOG.fatal("Unable to start failover controller. " +
-          "Parent znode does not exist.\n" +
-          "Run with -formatZK flag to initialize ZooKeeper.");
-      return ERR_CODE_NO_PARENT_ZNODE;
+    try {
+      if (!elector.parentZNodeExists()) {
+        LOG.fatal("Unable to start failover controller. " +
+            "Parent znode does not exist.\n" +
+            "Run with -formatZK flag to initialize ZooKeeper.");
+        return ERR_CODE_NO_PARENT_ZNODE;
+      }
+    } catch (IOException ioe) {
+      if (ioe.getCause() instanceof KeeperException.ConnectionLossException) {
+        LOG.fatal("Unable to start failover controller. Unable to connect " +
+            "to ZooKeeper quorum at " + zkQuorum + ". Please check the " +
+            "configured value for " + ZK_QUORUM_KEY + " and ensure that " +
+            "ZooKeeper is running.");
+        return ERR_CODE_NO_ZK;
+      } else {
+        throw ioe;
+      }
     }
 
     try {
@@ -145,8 +226,18 @@ public abstract class ZKFailoverControll
       return ERR_CODE_NO_FENCER;
     }
 
+    initRPC();
     initHM();
-    mainLoop();
+    startRPC();
+    try {
+      mainLoop();
+    } finally {
+      rpcServer.stopAndJoin();
+      
+      elector.quitElection(true);
+      healthMonitor.shutdown();
+      healthMonitor.join();
+    }
     return 0;
   }
 
@@ -181,6 +272,7 @@ public abstract class ZKFailoverControll
   }
 
   private boolean confirmFormat() {
+    String parentZnode = getParentZnode();
     System.err.println(
         "===============================================\n" +
         "The configured parent znode " + parentZnode + " already exists.\n" +
@@ -206,16 +298,40 @@ public abstract class ZKFailoverControll
     healthMonitor.addCallback(new HealthCallbacks());
     healthMonitor.start();
   }
+  
+  protected void initRPC() throws IOException {
+    InetSocketAddress bindAddr = getRpcAddressToBindTo();
+    rpcServer = new ZKFCRpcServer(conf, bindAddr, this, getPolicyProvider());
+  }
+
+  protected void startRPC() throws IOException {
+    rpcServer.start();
+  }
+
 
   private void initZK() throws HadoopIllegalArgumentException, IOException {
-    String zkQuorum = conf.get(ZK_QUORUM_KEY);
+    zkQuorum = conf.get(ZK_QUORUM_KEY);
     int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
         ZK_SESSION_TIMEOUT_DEFAULT);
-    parentZnode = conf.get(ZK_PARENT_ZNODE_KEY,
-        ZK_PARENT_ZNODE_DEFAULT);
-    // TODO: need ZK ACL support in config, also maybe auth!
-    List<ACL> zkAcls = Ids.OPEN_ACL_UNSAFE;
+    // Parse ACLs from configuration.
+    String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT);
+    zkAclConf = HAZKUtil.resolveConfIndirection(zkAclConf);
+    List<ACL> zkAcls = HAZKUtil.parseACLs(zkAclConf);
+    if (zkAcls.isEmpty()) {
+      zkAcls = Ids.CREATOR_ALL_ACL;
+    }
+    
+    // Parse authentication from configuration.
+    String zkAuthConf = conf.get(ZK_AUTH_KEY);
+    zkAuthConf = HAZKUtil.resolveConfIndirection(zkAuthConf);
+    List<ZKAuthInfo> zkAuths;
+    if (zkAuthConf != null) {
+      zkAuths = HAZKUtil.parseAuth(zkAuthConf);
+    } else {
+      zkAuths = Collections.emptyList();
+    }
 
+    // Sanity check configuration.
     Preconditions.checkArgument(zkQuorum != null,
         "Missing required configuration '%s' for ZooKeeper quorum",
         ZK_QUORUM_KEY);
@@ -224,9 +340,19 @@ public abstract class ZKFailoverControll
     
 
     elector = new ActiveStandbyElector(zkQuorum,
-        zkTimeout, parentZnode, zkAcls, new ElectorCallbacks());
+        zkTimeout, getParentZnode(), zkAcls, zkAuths,
+        new ElectorCallbacks());
   }
   
+  private String getParentZnode() {
+    String znode = conf.get(ZK_PARENT_ZNODE_KEY,
+        ZK_PARENT_ZNODE_DEFAULT);
+    if (!znode.endsWith("/")) {
+      znode += "/";
+    }
+    return znode + getScopeInsideParentNode();
+  }
+
   private synchronized void mainLoop() throws InterruptedException {
     while (fatalError == null) {
       wait();
@@ -242,16 +368,30 @@ public abstract class ZKFailoverControll
     notifyAll();
   }
   
-  private synchronized void becomeActive() {
+  private synchronized void becomeActive() throws ServiceFailedException {
     LOG.info("Trying to make " + localTarget + " active...");
     try {
       HAServiceProtocolHelper.transitionToActive(localTarget.getProxy(
-          conf, FailoverController.getRpcTimeoutToNewActive(conf)));
-      LOG.info("Successfully transitioned " + localTarget +
-          " to active state");
+          conf, FailoverController.getRpcTimeoutToNewActive(conf)),
+          createReqInfo());
+      String msg = "Successfully transitioned " + localTarget +
+          " to active state";
+      LOG.info(msg);
+      recordActiveAttempt(new ActiveAttemptRecord(true, msg));
+
     } catch (Throwable t) {
-      LOG.fatal("Couldn't make " + localTarget + " active", t);
-      elector.quitElection(true);
+      String msg = "Couldn't make " + localTarget + " active";
+      LOG.fatal(msg, t);
+      
+      recordActiveAttempt(new ActiveAttemptRecord(false, msg + "\n" +
+          StringUtils.stringifyException(t)));
+
+      if (t instanceof ServiceFailedException) {
+        throw (ServiceFailedException)t;
+      } else {
+        throw new ServiceFailedException("Couldn't transition to active",
+            t);
+      }
 /*
 * TODO:
 * we need to make sure that if we get fenced and then quickly restarted,
@@ -264,12 +404,79 @@ public abstract class ZKFailoverControll
     }
   }
 
+  /**
+   * Store the results of the last attempt to become active.
+   * This is used so that, during manually initiated failover,
+   * we can report back the results of the attempt to become active
+   * to the initiator of the failover.
+   */
+  private void recordActiveAttempt(
+      ActiveAttemptRecord record) {
+    synchronized (activeAttemptRecordLock) {
+      lastActiveAttemptRecord = record;
+      activeAttemptRecordLock.notifyAll();
+    }
+  }
+
+  /**
+   * Wait until one of the following events:
+   * <ul>
+   * <li>Another thread publishes the results of an attempt to become active
+   * using {@link #recordActiveAttempt(ActiveAttemptRecord)}</li>
+   * <li>The node enters bad health status</li>
+   * <li>The specified timeout elapses</li>
+   * </ul>
+   * 
+   * @param timeoutMillis number of millis to wait
+   * @return the published record, or null if the timeout elapses or the
+   * service becomes unhealthy 
+   * @throws InterruptedException if the thread is interrupted.
+   */
+  private ActiveAttemptRecord waitForActiveAttempt(int timeoutMillis)
+      throws InterruptedException {
+    long st = System.nanoTime();
+    long waitUntil = st + TimeUnit.NANOSECONDS.convert(
+        timeoutMillis, TimeUnit.MILLISECONDS);
+    
+    do {
+      // periodically check health state, because entering an
+      // unhealthy state could prevent us from ever attempting to
+      // become active. We can detect this and respond to the user
+      // immediately.
+      synchronized (this) {
+        if (lastHealthState != State.SERVICE_HEALTHY) {
+          // early out if service became unhealthy
+          return null;
+        }
+      }
+
+      synchronized (activeAttemptRecordLock) {
+        if ((lastActiveAttemptRecord != null &&
+            lastActiveAttemptRecord.nanoTime >= st)) {
+          return lastActiveAttemptRecord;
+        }
+        // Only wait 1sec so that we periodically recheck the health state
+        // above.
+        activeAttemptRecordLock.wait(1000);
+      }
+    } while (System.nanoTime() < waitUntil);
+    
+    // Timeout elapsed.
+    LOG.warn(timeoutMillis + "ms timeout elapsed waiting for an attempt " +
+        "to become active");
+    return null;
+  }
+
+  private StateChangeRequestInfo createReqInfo() {
+    return new StateChangeRequestInfo(RequestSource.REQUEST_BY_ZKFC);
+  }
+
   private synchronized void becomeStandby() {
     LOG.info("ZK Election indicated that " + localTarget +
         " should become standby");
     try {
       int timeout = FailoverController.getGracefulFenceTimeout(conf);
-      localTarget.getProxy(conf, timeout).transitionToStandby();
+      localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo());
       LOG.info("Successfully transitioned " + localTarget +
           " to standby state");
     } catch (Exception e) {
@@ -279,27 +486,336 @@ public abstract class ZKFailoverControll
       // at the same time.
     }
   }
+  
+
+  private synchronized void fenceOldActive(byte[] data) {
+    HAServiceTarget target = dataToTarget(data);
+    
+    try {
+      doFence(target);
+    } catch (Throwable t) {
+      recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t)));
+      Throwables.propagate(t);
+    }
+  }
+  
+  private void doFence(HAServiceTarget target) {
+    LOG.info("Should fence: " + target);
+    boolean gracefulWorked = new FailoverController(conf,
+        RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target);
+    if (gracefulWorked) {
+      // It's possible that it's in standby but just about to go into active,
+      // no? Is there some race here?
+      LOG.info("Successfully transitioned " + target + " to standby " +
+          "state without fencing");
+      return;
+    }
+    
+    try {
+      target.checkFencingConfigured();
+    } catch (BadFencingConfigurationException e) {
+      LOG.error("Couldn't fence old active " + target, e);
+      recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active"));
+      throw new RuntimeException(e);
+    }
+    
+    if (!target.getFencer().fence(target)) {
+      throw new RuntimeException("Unable to fence " + target);
+    }
+  }
+
+
+  /**
+   * Request from graceful failover to cede active role. Causes
+   * this ZKFC to transition its local node to standby, then quit
+   * the election for the specified period of time, after which it
+   * will rejoin iff it is healthy.
+   */
+  void cedeActive(final int millisToCede)
+      throws AccessControlException, ServiceFailedException, IOException {
+    try {
+      UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          doCedeActive(millisToCede);
+          return null;
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+  
+  private void doCedeActive(int millisToCede) 
+      throws AccessControlException, ServiceFailedException, IOException {
+    int timeout = FailoverController.getGracefulFenceTimeout(conf);
+
+    // Lock elector to maintain lock ordering of elector -> ZKFC
+    synchronized (elector) {
+      synchronized (this) {
+        if (millisToCede <= 0) {
+          delayJoiningUntilNanotime = 0;
+          recheckElectability();
+          return;
+        }
+  
+        LOG.info("Requested by " + UserGroupInformation.getCurrentUser() +
+            " at " + Server.getRemoteAddress() + " to cede active role.");
+        boolean needFence = false;
+        try {
+          localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo());
+          LOG.info("Successfully ensured local node is in standby mode");
+        } catch (IOException ioe) {
+          LOG.warn("Unable to transition local node to standby: " +
+              ioe.getLocalizedMessage());
+          LOG.warn("Quitting election but indicating that fencing is " +
+              "necessary");
+          needFence = true;
+        }
+        delayJoiningUntilNanotime = System.nanoTime() +
+            TimeUnit.MILLISECONDS.toNanos(millisToCede);
+        elector.quitElection(needFence);
+      }
+    }
+    recheckElectability();
+  }
+  
+  /**
+   * Coordinate a graceful failover to this node.
+   * @throws ServiceFailedException if the node fails to become active
+   * @throws IOException some other error occurs
+   */
+  void gracefulFailoverToYou() throws ServiceFailedException, IOException {
+    try {
+      UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          doGracefulFailover();
+          return null;
+        }
+        
+      });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Coordinate a graceful failover. This proceeds in several phases:
+   * 1) Pre-flight checks: ensure that the local node is healthy, and
+   * thus a candidate for failover.
+   * 2) Determine the current active node. If it is the local node, no
+   * need to failover - return success.
+   * 3) Ask that node to yield from the election for a number of seconds.
+   * 4) Allow the normal election path to run in other threads. Wait until
+   * we either become unhealthy or we see an election attempt recorded by
+   * the normal code path.
+   * 5) Allow the old active to rejoin the election, so a future
+   * failback is possible.
+   */
+  private void doGracefulFailover()
+      throws ServiceFailedException, IOException, InterruptedException {
+    int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2;
+    
+    // Phase 1: pre-flight checks
+    checkEligibleForFailover();
+    
+    // Phase 2: determine old/current active node. Check that we're not
+    // ourselves active, etc.
+    HAServiceTarget oldActive = getCurrentActive();
+    if (oldActive == null) {
+      // No node is currently active. So, if we aren't already
+      // active ourselves by means of a normal election, then there's
+      // probably something preventing us from becoming active.
+      throw new ServiceFailedException(
+          "No other node is currently active.");
+    }
+    
+    if (oldActive.getAddress().equals(localTarget.getAddress())) {
+      LOG.info("Local node " + localTarget + " is already active. " +
+          "No need to failover. Returning success.");
+      return;
+    }
+    
+    // Phase 3: ask the old active to yield from the election.
+    LOG.info("Asking " + oldActive + " to cede its active state for " +
+        timeout + "ms");
+    ZKFCProtocol oldZkfc = oldActive.getZKFCProxy(conf, timeout);
+    oldZkfc.cedeActive(timeout);
+
+    // Phase 4: wait for the normal election to make the local node
+    // active.
+    ActiveAttemptRecord attempt = waitForActiveAttempt(timeout + 60000);
+    
+    if (attempt == null) {
+      // We didn't even make an attempt to become active.
+      synchronized(this) {
+        if (lastHealthState != State.SERVICE_HEALTHY) {
+          throw new ServiceFailedException("Unable to become active. " +
+            "Service became unhealthy while trying to failover.");          
+        }
+      }
+      
+      throw new ServiceFailedException("Unable to become active. " +
+          "Local node did not get an opportunity to do so from ZooKeeper, " +
+          "or the local node took too long to transition to active.");
+    }
+
+    // Phase 5. At this point, we made some attempt to become active. So we
+    // can tell the old active to rejoin if it wants. This allows a quick
+    // fail-back if we immediately crash.
+    oldZkfc.cedeActive(-1);
+    
+    if (attempt.succeeded) {
+      LOG.info("Successfully became active. " + attempt.status);
+    } else {
+      // Propagate failure
+      String msg = "Failed to become active. " + attempt.status;
+      throw new ServiceFailedException(msg);
+    }
+  }
+
+  /**
+   * Ensure that the local node is in a healthy state, and thus
+   * eligible for graceful failover.
+   * @throws ServiceFailedException if the node is unhealthy
+   */
+  private synchronized void checkEligibleForFailover()
+      throws ServiceFailedException {
+    // Check health
+    if (this.getLastHealthState() != State.SERVICE_HEALTHY) {
+      throw new ServiceFailedException(
+          localTarget + " is not currently healthy. " +
+          "Cannot be failover target");
+    }
+  }
+
+  /**
+   * @return an {@link HAServiceTarget} for the current active node
+   * in the cluster, or null if no node is active.
+   * @throws IOException if a ZK-related issue occurs
+   * @throws InterruptedException if thread is interrupted 
+   */
+  private HAServiceTarget getCurrentActive()
+      throws IOException, InterruptedException {
+    synchronized (elector) {
+      synchronized (this) {
+        byte[] activeData;
+        try {
+          activeData = elector.getActiveData();
+        } catch (ActiveNotFoundException e) {
+          return null;
+        } catch (KeeperException ke) {
+          throw new IOException(
+              "Unexpected ZooKeeper issue fetching active node info", ke);
+        }
+        
+        HAServiceTarget oldActive = dataToTarget(activeData);
+        return oldActive;
+      }
+    }
+  }
+
+  /**
+   * Check the current state of the service, and join the election
+   * if it should be in the election.
+   */
+  private void recheckElectability() {
+    // Maintain lock ordering of elector -> ZKFC
+    synchronized (elector) {
+      synchronized (this) {
+        boolean healthy = lastHealthState == State.SERVICE_HEALTHY;
+    
+        long remainingDelay = delayJoiningUntilNanotime - System.nanoTime(); 
+        if (remainingDelay > 0) {
+          if (healthy) {
+            LOG.info("Would have joined master election, but this node is " +
+                "prohibited from doing so for " +
+                TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms");
+          }
+          scheduleRecheck(remainingDelay);
+          return;
+        }
+    
+        switch (lastHealthState) {
+        case SERVICE_HEALTHY:
+          elector.joinElection(targetToData(localTarget));
+          break;
+          
+        case INITIALIZING:
+          LOG.info("Ensuring that " + localTarget + " does not " +
+              "participate in active master election");
+          elector.quitElection(false);
+          break;
+    
+        case SERVICE_UNHEALTHY:
+        case SERVICE_NOT_RESPONDING:
+          LOG.info("Quitting master election for " + localTarget +
+              " and marking that fencing is necessary");
+          elector.quitElection(true);
+          break;
+          
+        case HEALTH_MONITOR_FAILED:
+          fatalError("Health monitor failed!");
+          break;
+          
+        default:
+          throw new IllegalArgumentException("Unhandled state:" + lastHealthState);
+        }
+      }
+    }
+  }
+  
+  /**
+   * Schedule a call to {@link #recheckElectability()} in the future.
+   */
+  private void scheduleRecheck(long whenNanos) {
+    delayExecutor.schedule(
+        new Runnable() {
+          @Override
+          public void run() {
+            try {
+              recheckElectability();
+            } catch (Throwable t) {
+              fatalError("Failed to recheck electability: " +
+                  StringUtils.stringifyException(t));
+            }
+          }
+        },
+        whenNanos, TimeUnit.NANOSECONDS);
+  }
 
   /**
    * @return the last health state passed to the FC
    * by the HealthMonitor.
    */
   @VisibleForTesting
-  State getLastHealthState() {
+  synchronized State getLastHealthState() {
     return lastHealthState;
   }
+
+  private synchronized void setLastHealthState(HealthMonitor.State newState) {
+    LOG.info("Local service " + localTarget +
+        " entered state: " + newState);
+    lastHealthState = newState;
+  }
   
   @VisibleForTesting
   ActiveStandbyElector getElectorForTests() {
     return elector;
   }
+  
+  @VisibleForTesting
+  ZKFCRpcServer getRpcServerForTests() {
+    return rpcServer;
+  }
 
   /**
    * Callbacks from elector
    */
   class ElectorCallbacks implements ActiveStandbyElectorCallback {
     @Override
-    public void becomeActive() {
+    public void becomeActive() throws ServiceFailedException {
       ZKFailoverController.this.becomeActive();
     }
 
@@ -319,31 +835,13 @@ public abstract class ZKFailoverControll
 
     @Override
     public void fenceOldActive(byte[] data) {
-      HAServiceTarget target = dataToTarget(data);
-      
-      LOG.info("Should fence: " + target);
-      boolean gracefulWorked = new FailoverController(conf)
-          .tryGracefulFence(target);
-      if (gracefulWorked) {
-        // It's possible that it's in standby but just about to go into active,
-        // no? Is there some race here?
-        LOG.info("Successfully transitioned " + target + " to standby " +
-            "state without fencing");
-        return;
-      }
-      
-      try {
-        target.checkFencingConfigured();
-      } catch (BadFencingConfigurationException e) {
-        LOG.error("Couldn't fence old active " + target, e);
-        // TODO: see below todo
-        throw new RuntimeException(e);
-      }
-      
-      if (!target.getFencer().fence(target)) {
-        // TODO: this will end up in some kind of tight loop,
-        // won't it? We need some kind of backoff
-        throw new RuntimeException("Unable to fence " + target);
+      ZKFailoverController.this.fenceOldActive(data);
+    }
+    
+    @Override
+    public String toString() {
+      synchronized (ZKFailoverController.this) {
+        return "Elector callbacks for " + localTarget;
       }
     }
   }
@@ -354,36 +852,21 @@ public abstract class ZKFailoverControll
   class HealthCallbacks implements HealthMonitor.Callback {
     @Override
     public void enteredState(HealthMonitor.State newState) {
-      LOG.info("Local service " + localTarget +
-          " entered state: " + newState);
-      switch (newState) {
-      case SERVICE_HEALTHY:
-        LOG.info("Joining master election for " + localTarget);
-        elector.joinElection(targetToData(localTarget));
-        break;
-        
-      case INITIALIZING:
-        LOG.info("Ensuring that " + localTarget + " does not " +
-            "participate in active master election");
-        elector.quitElection(false);
-        break;
-
-      case SERVICE_UNHEALTHY:
-      case SERVICE_NOT_RESPONDING:
-        LOG.info("Quitting master election for " + localTarget +
-            " and marking that fencing is necessary");
-        elector.quitElection(true);
-        break;
-        
-      case HEALTH_MONITOR_FAILED:
-        fatalError("Health monitor failed!");
-        break;
-        
-      default:
-        throw new IllegalArgumentException("Unhandled state:" + newState);
-      }
-      
-      lastHealthState = newState;
+      setLastHealthState(newState);
+      recheckElectability();
     }
   }
+  
+  private static class ActiveAttemptRecord {
+    private final boolean succeeded;
+    private final String status;
+    private final long nanoTime;
+    
+    public ActiveAttemptRecord(boolean succeeded, String status) {
+      this.succeeded = succeeded;
+      this.status = status;
+      this.nanoTime = System.nanoTime();
+    }
+  }
+
 }



Mime
View raw message