hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1346682 [2/3] - in /hadoop/common/branches/HDFS-3092/hadoop-common-project: ./ hadoop-annotations/ hadoop-auth-examples/ hadoop-auth/ hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ hadoop-common/ hadoop-common/de...
Date Wed, 06 Jun 2012 00:17:54 GMT
Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java Wed Jun  6 00:17:38 2012
@@ -60,6 +60,31 @@ public interface HAServiceProtocol {
       return name;
     }
   }
+  
+  public static enum RequestSource {
+    REQUEST_BY_USER,
+    REQUEST_BY_USER_FORCED,
+    REQUEST_BY_ZKFC;
+  }
+  
+  /**
+   * Information describing the source for a request to change state.
+   * This is used to differentiate requests from automatic vs CLI
+   * failover controllers, and in the future may include epoch
+   * information.
+   */
+  public static class StateChangeRequestInfo {
+    private final RequestSource source;
+
+    public StateChangeRequestInfo(RequestSource source) {
+      super();
+      this.source = source;
+    }
+
+    public RequestSource getSource() {
+      return source;
+    }
+  }
 
   /**
    * Monitor the health of service. This periodically called by the HA
@@ -95,7 +120,8 @@ public interface HAServiceProtocol {
    * @throws IOException
    *           if other errors happen
    */
-  public void transitionToActive() throws ServiceFailedException,
+  public void transitionToActive(StateChangeRequestInfo reqInfo)
+                                   throws ServiceFailedException,
                                           AccessControlException,
                                           IOException;
 
@@ -110,7 +136,8 @@ public interface HAServiceProtocol {
    * @throws IOException
    *           if other errors happen
    */
-  public void transitionToStandby() throws ServiceFailedException,
+  public void transitionToStandby(StateChangeRequestInfo reqInfo)
+                                    throws ServiceFailedException,
                                            AccessControlException,
                                            IOException;
 

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java Wed Jun  6 00:17:38 2012
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
 import org.apache.hadoop.ipc.RemoteException;
 
 /**
@@ -30,7 +31,8 @@ import org.apache.hadoop.ipc.RemoteExcep
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class HAServiceProtocolHelper {
-  public static void monitorHealth(HAServiceProtocol svc)
+  public static void monitorHealth(HAServiceProtocol svc,
+      StateChangeRequestInfo reqInfo)
       throws IOException {
     try {
       svc.monitorHealth();
@@ -39,19 +41,21 @@ public class HAServiceProtocolHelper {
     }
   }
 
-  public static void transitionToActive(HAServiceProtocol svc)
+  public static void transitionToActive(HAServiceProtocol svc,
+      StateChangeRequestInfo reqInfo)
       throws IOException {
     try {
-      svc.transitionToActive();
+      svc.transitionToActive(reqInfo);
     } catch (RemoteException e) {
       throw e.unwrapRemoteException(ServiceFailedException.class);
     }
   }
 
-  public static void transitionToStandby(HAServiceProtocol svc)
+  public static void transitionToStandby(HAServiceProtocol svc,
+      StateChangeRequestInfo reqInfo)
       throws IOException {
     try {
-      svc.transitionToStandby();
+      svc.transitionToStandby(reqInfo);
     } catch (RemoteException e) {
       throw e.unwrapRemoteException(ServiceFailedException.class);
     }

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java Wed Jun  6 00:17:38 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ha.protocolPB.ZKFCProtocolClientSideTranslatorPB;
 import org.apache.hadoop.net.NetUtils;
 
 import com.google.common.collect.Maps;
@@ -49,6 +50,11 @@ public abstract class HAServiceTarget {
   public abstract InetSocketAddress getAddress();
 
   /**
+   * @return the IPC address of the ZKFC on the target node
+   */
+  public abstract InetSocketAddress getZKFCAddress();
+
+  /**
    * @return a Fencer implementation configured for this target node
    */
   public abstract NodeFencer getFencer();
@@ -76,6 +82,20 @@ public abstract class HAServiceTarget {
         confCopy, factory, timeoutMs);
   }
   
+  /**
+   * @return a proxy to the ZKFC which is associated with this HA service.
+   */
+  public ZKFCProtocol getZKFCProxy(Configuration conf, int timeoutMs)
+      throws IOException {
+    Configuration confCopy = new Configuration(conf);
+    // Lower the timeout so we quickly fail to connect
+    confCopy.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
+    SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy);
+    return new ZKFCProtocolClientSideTranslatorPB(
+        getZKFCAddress(),
+        confCopy, factory, timeoutMs);
+  }
+  
   public final Map<String, String> getFencingParameters() {
     Map<String, String> ret = Maps.newHashMap();
     addFencingParameters(ret);
@@ -99,4 +119,11 @@ public abstract class HAServiceTarget {
     ret.put(HOST_SUBST_KEY, getAddress().getHostName());
     ret.put(PORT_SUBST_KEY, String.valueOf(getAddress().getPort()));
   }
+
+  /**
+   * @return true if auto failover should be considered enabled
+   */
+  public boolean isAutoFailoverEnabled() {
+    return false;
+  }
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java Wed Jun  6 00:17:38 2012
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,7 +44,8 @@ import com.google.common.base.Preconditi
  * Classes which need callbacks should implement the {@link Callback}
  * interface.
  */
-class HealthMonitor {
+@InterfaceAudience.Private
+public class HealthMonitor {
   private static final Log LOG = LogFactory.getLog(
       HealthMonitor.class);
 
@@ -75,7 +77,8 @@ class HealthMonitor {
   private HAServiceStatus lastServiceState = new HAServiceStatus(
       HAServiceState.INITIALIZING);
   
-  enum State {
+  @InterfaceAudience.Private
+  public enum State {
     /**
      * The health monitor is still starting up.
      */

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java Wed Jun  6 00:17:38 2012
@@ -18,79 +18,143 @@
 package org.apache.hadoop.ha;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
 import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
+import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
 import org.apache.hadoop.ha.HealthMonitor.State;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.zookeeper.data.ACL;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 @InterfaceAudience.LimitedPrivate("HDFS")
-public abstract class ZKFailoverController implements Tool {
+public abstract class ZKFailoverController {
 
   static final Log LOG = LogFactory.getLog(ZKFailoverController.class);
   
-  // TODO: this should be namespace-scoped
   public static final String ZK_QUORUM_KEY = "ha.zookeeper.quorum";
   private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms";
   private static final int ZK_SESSION_TIMEOUT_DEFAULT = 5*1000;
   private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode";
+  public static final String ZK_ACL_KEY = "ha.zookeeper.acl";
+  private static final String ZK_ACL_DEFAULT = "world:anyone:rwcda";
+  public static final String ZK_AUTH_KEY = "ha.zookeeper.auth";
   static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha";
 
+  /**
+   * All of the conf keys used by the ZKFC. This is used in order to allow
+   * them to be overridden on a per-nameservice or per-namenode basis.
+   */
+  protected static final String[] ZKFC_CONF_KEYS = new String[] {
+    ZK_QUORUM_KEY,
+    ZK_SESSION_TIMEOUT_KEY,
+    ZK_PARENT_ZNODE_KEY,
+    ZK_ACL_KEY,
+    ZK_AUTH_KEY
+  };
+  
+
   /** Unable to format the parent znode in ZK */
   static final int ERR_CODE_FORMAT_DENIED = 2;
   /** The parent znode doesn't exist in ZK */
   static final int ERR_CODE_NO_PARENT_ZNODE = 3;
   /** Fencing is not properly configured */
   static final int ERR_CODE_NO_FENCER = 4;
+  /** Automatic failover is not enabled */
+  static final int ERR_CODE_AUTO_FAILOVER_NOT_ENABLED = 5;
+  /** Cannot connect to ZooKeeper */
+  static final int ERR_CODE_NO_ZK = 6;
   
-  private Configuration conf;
+  protected Configuration conf;
+  private String zkQuorum;
+  protected final HAServiceTarget localTarget;
 
   private HealthMonitor healthMonitor;
   private ActiveStandbyElector elector;
-
-  private HAServiceTarget localTarget;
-
-  private String parentZnode;
+  protected ZKFCRpcServer rpcServer;
 
   private State lastHealthState = State.INITIALIZING;
 
   /** Set if a fatal error occurs */
   private String fatalError = null;
 
-  @Override
-  public void setConf(Configuration conf) {
+  /**
+   * A future nanotime before which the ZKFC will not join the election.
+   * This is used during graceful failover.
+   */
+  private long delayJoiningUntilNanotime = 0;
+
+  /** Executor on which {@link #scheduleRecheck(long)} schedules events */
+  private ScheduledExecutorService delayExecutor =
+    Executors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("ZKFC Delay timer #%d")
+            .build());
+
+  private ActiveAttemptRecord lastActiveAttemptRecord;
+  private Object activeAttemptRecordLock = new Object();
+
+  protected ZKFailoverController(Configuration conf, HAServiceTarget localTarget) {
+    this.localTarget = localTarget;
     this.conf = conf;
-    localTarget = getLocalTarget();
   }
   
 
   protected abstract byte[] targetToData(HAServiceTarget target);
-  protected abstract HAServiceTarget getLocalTarget();  
   protected abstract HAServiceTarget dataToTarget(byte[] data);
+  protected abstract void loginAsFCUser() throws IOException;
+  protected abstract void checkRpcAdminAccess()
+      throws AccessControlException, IOException;
+  protected abstract InetSocketAddress getRpcAddressToBindTo();
+  protected abstract PolicyProvider getPolicyProvider();
 
+  /**
+   * Return the name of a znode inside the configured parent znode in which
+   * the ZKFC will do all of its work. This is so that multiple federated
+   * nameservices can run on the same ZK quorum without having to manually
+   * configure them to separate subdirectories.
+   */
+  protected abstract String getScopeInsideParentNode();
 
-  @Override
-  public Configuration getConf() {
-    return conf;
+  public HAServiceTarget getLocalTarget() {
+    return localTarget;
   }
-
-  @Override
+  
   public int run(final String[] args) throws Exception {
-    // TODO: need to hook DFS here to find the NN keytab info, etc,
-    // similar to what DFSHAAdmin does. Annoying that this is in common.
+    if (!localTarget.isAutoFailoverEnabled()) {
+      LOG.fatal("Automatic failover is not enabled for " + localTarget + "." +
+          " Please ensure that automatic failover is enabled in the " +
+          "configuration before running the ZK failover controller.");
+      return ERR_CODE_AUTO_FAILOVER_NOT_ENABLED;
+    }
+    loginAsFCUser();
     try {
       return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
         @Override
@@ -99,6 +163,10 @@ public abstract class ZKFailoverControll
             return doRun(args);
           } catch (Exception t) {
             throw new RuntimeException(t);
+          } finally {
+            if (elector != null) {
+              elector.terminateConnection();
+            }
           }
         }
       });
@@ -107,6 +175,7 @@ public abstract class ZKFailoverControll
     }
   }
   
+
   private int doRun(String[] args)
       throws HadoopIllegalArgumentException, IOException, InterruptedException {
     initZK();
@@ -129,11 +198,23 @@ public abstract class ZKFailoverControll
       }
     }
     
-    if (!elector.parentZNodeExists()) {
-      LOG.fatal("Unable to start failover controller. " +
-          "Parent znode does not exist.\n" +
-          "Run with -formatZK flag to initialize ZooKeeper.");
-      return ERR_CODE_NO_PARENT_ZNODE;
+    try {
+      if (!elector.parentZNodeExists()) {
+        LOG.fatal("Unable to start failover controller. " +
+            "Parent znode does not exist.\n" +
+            "Run with -formatZK flag to initialize ZooKeeper.");
+        return ERR_CODE_NO_PARENT_ZNODE;
+      }
+    } catch (IOException ioe) {
+      if (ioe.getCause() instanceof KeeperException.ConnectionLossException) {
+        LOG.fatal("Unable to start failover controller. Unable to connect " +
+            "to ZooKeeper quorum at " + zkQuorum + ". Please check the " +
+            "configured value for " + ZK_QUORUM_KEY + " and ensure that " +
+            "ZooKeeper is running.");
+        return ERR_CODE_NO_ZK;
+      } else {
+        throw ioe;
+      }
     }
 
     try {
@@ -145,8 +226,18 @@ public abstract class ZKFailoverControll
       return ERR_CODE_NO_FENCER;
     }
 
+    initRPC();
     initHM();
-    mainLoop();
+    startRPC();
+    try {
+      mainLoop();
+    } finally {
+      rpcServer.stopAndJoin();
+      
+      elector.quitElection(true);
+      healthMonitor.shutdown();
+      healthMonitor.join();
+    }
     return 0;
   }
 
@@ -181,6 +272,7 @@ public abstract class ZKFailoverControll
   }
 
   private boolean confirmFormat() {
+    String parentZnode = getParentZnode();
     System.err.println(
         "===============================================\n" +
         "The configured parent znode " + parentZnode + " already exists.\n" +
@@ -206,16 +298,40 @@ public abstract class ZKFailoverControll
     healthMonitor.addCallback(new HealthCallbacks());
     healthMonitor.start();
   }
+  
+  protected void initRPC() throws IOException {
+    InetSocketAddress bindAddr = getRpcAddressToBindTo();
+    rpcServer = new ZKFCRpcServer(conf, bindAddr, this, getPolicyProvider());
+  }
+
+  protected void startRPC() throws IOException {
+    rpcServer.start();
+  }
+
 
   private void initZK() throws HadoopIllegalArgumentException, IOException {
-    String zkQuorum = conf.get(ZK_QUORUM_KEY);
+    zkQuorum = conf.get(ZK_QUORUM_KEY);
     int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
         ZK_SESSION_TIMEOUT_DEFAULT);
-    parentZnode = conf.get(ZK_PARENT_ZNODE_KEY,
-        ZK_PARENT_ZNODE_DEFAULT);
-    // TODO: need ZK ACL support in config, also maybe auth!
-    List<ACL> zkAcls = Ids.OPEN_ACL_UNSAFE;
+    // Parse ACLs from configuration.
+    String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT);
+    zkAclConf = HAZKUtil.resolveConfIndirection(zkAclConf);
+    List<ACL> zkAcls = HAZKUtil.parseACLs(zkAclConf);
+    if (zkAcls.isEmpty()) {
+      zkAcls = Ids.CREATOR_ALL_ACL;
+    }
+    
+    // Parse authentication from configuration.
+    String zkAuthConf = conf.get(ZK_AUTH_KEY);
+    zkAuthConf = HAZKUtil.resolveConfIndirection(zkAuthConf);
+    List<ZKAuthInfo> zkAuths;
+    if (zkAuthConf != null) {
+      zkAuths = HAZKUtil.parseAuth(zkAuthConf);
+    } else {
+      zkAuths = Collections.emptyList();
+    }
 
+    // Sanity check configuration.
     Preconditions.checkArgument(zkQuorum != null,
         "Missing required configuration '%s' for ZooKeeper quorum",
         ZK_QUORUM_KEY);
@@ -224,9 +340,19 @@ public abstract class ZKFailoverControll
     
 
     elector = new ActiveStandbyElector(zkQuorum,
-        zkTimeout, parentZnode, zkAcls, new ElectorCallbacks());
+        zkTimeout, getParentZnode(), zkAcls, zkAuths,
+        new ElectorCallbacks());
   }
   
+  private String getParentZnode() {
+    String znode = conf.get(ZK_PARENT_ZNODE_KEY,
+        ZK_PARENT_ZNODE_DEFAULT);
+    if (!znode.endsWith("/")) {
+      znode += "/";
+    }
+    return znode + getScopeInsideParentNode();
+  }
+
   private synchronized void mainLoop() throws InterruptedException {
     while (fatalError == null) {
       wait();
@@ -242,16 +368,30 @@ public abstract class ZKFailoverControll
     notifyAll();
   }
   
-  private synchronized void becomeActive() {
+  private synchronized void becomeActive() throws ServiceFailedException {
     LOG.info("Trying to make " + localTarget + " active...");
     try {
       HAServiceProtocolHelper.transitionToActive(localTarget.getProxy(
-          conf, FailoverController.getRpcTimeoutToNewActive(conf)));
-      LOG.info("Successfully transitioned " + localTarget +
-          " to active state");
+          conf, FailoverController.getRpcTimeoutToNewActive(conf)),
+          createReqInfo());
+      String msg = "Successfully transitioned " + localTarget +
+          " to active state";
+      LOG.info(msg);
+      recordActiveAttempt(new ActiveAttemptRecord(true, msg));
+
     } catch (Throwable t) {
-      LOG.fatal("Couldn't make " + localTarget + " active", t);
-      elector.quitElection(true);
+      String msg = "Couldn't make " + localTarget + " active";
+      LOG.fatal(msg, t);
+      
+      recordActiveAttempt(new ActiveAttemptRecord(false, msg + "\n" +
+          StringUtils.stringifyException(t)));
+
+      if (t instanceof ServiceFailedException) {
+        throw (ServiceFailedException)t;
+      } else {
+        throw new ServiceFailedException("Couldn't transition to active",
+            t);
+      }
 /*
 * TODO:
 * we need to make sure that if we get fenced and then quickly restarted,
@@ -264,12 +404,79 @@ public abstract class ZKFailoverControll
     }
   }
 
+  /**
+   * Store the results of the last attempt to become active.
+   * This is used so that, during manually initiated failover,
+   * we can report back the results of the attempt to become active
+   * to the initiator of the failover.
+   */
+  private void recordActiveAttempt(
+      ActiveAttemptRecord record) {
+    synchronized (activeAttemptRecordLock) {
+      lastActiveAttemptRecord = record;
+      activeAttemptRecordLock.notifyAll();
+    }
+  }
+
+  /**
+   * Wait until one of the following events:
+   * <ul>
+   * <li>Another thread publishes the results of an attempt to become active
+   * using {@link #recordActiveAttempt(ActiveAttemptRecord)}</li>
+   * <li>The node enters bad health status</li>
+   * <li>The specified timeout elapses</li>
+   * </ul>
+   * 
+   * @param timeoutMillis number of millis to wait
+   * @return the published record, or null if the timeout elapses or the
+   * service becomes unhealthy 
+   * @throws InterruptedException if the thread is interrupted.
+   */
+  private ActiveAttemptRecord waitForActiveAttempt(int timeoutMillis)
+      throws InterruptedException {
+    long st = System.nanoTime();
+    long waitUntil = st + TimeUnit.NANOSECONDS.convert(
+        timeoutMillis, TimeUnit.MILLISECONDS);
+    
+    do {
+      // periodically check health state, because entering an
+      // unhealthy state could prevent us from ever attempting to
+      // become active. We can detect this and respond to the user
+      // immediately.
+      synchronized (this) {
+        if (lastHealthState != State.SERVICE_HEALTHY) {
+          // early out if service became unhealthy
+          return null;
+        }
+      }
+
+      synchronized (activeAttemptRecordLock) {
+        if ((lastActiveAttemptRecord != null &&
+            lastActiveAttemptRecord.nanoTime >= st)) {
+          return lastActiveAttemptRecord;
+        }
+        // Only wait 1sec so that we periodically recheck the health state
+        // above.
+        activeAttemptRecordLock.wait(1000);
+      }
+    } while (System.nanoTime() < waitUntil);
+    
+    // Timeout elapsed.
+    LOG.warn(timeoutMillis + "ms timeout elapsed waiting for an attempt " +
+        "to become active");
+    return null;
+  }
+
+  private StateChangeRequestInfo createReqInfo() {
+    return new StateChangeRequestInfo(RequestSource.REQUEST_BY_ZKFC);
+  }
+
   private synchronized void becomeStandby() {
     LOG.info("ZK Election indicated that " + localTarget +
         " should become standby");
     try {
       int timeout = FailoverController.getGracefulFenceTimeout(conf);
-      localTarget.getProxy(conf, timeout).transitionToStandby();
+      localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo());
       LOG.info("Successfully transitioned " + localTarget +
           " to standby state");
     } catch (Exception e) {
@@ -279,27 +486,336 @@ public abstract class ZKFailoverControll
       // at the same time.
     }
   }
+  
+
+  private synchronized void fenceOldActive(byte[] data) {
+    HAServiceTarget target = dataToTarget(data);
+    
+    try {
+      doFence(target);
+    } catch (Throwable t) {
+      recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t)));
+      Throwables.propagate(t);
+    }
+  }
+  
+  private void doFence(HAServiceTarget target) {
+    LOG.info("Should fence: " + target);
+    boolean gracefulWorked = new FailoverController(conf,
+        RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target);
+    if (gracefulWorked) {
+      // It's possible that it's in standby but just about to go into active,
+      // no? Is there some race here?
+      LOG.info("Successfully transitioned " + target + " to standby " +
+          "state without fencing");
+      return;
+    }
+    
+    try {
+      target.checkFencingConfigured();
+    } catch (BadFencingConfigurationException e) {
+      LOG.error("Couldn't fence old active " + target, e);
+      recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active"));
+      throw new RuntimeException(e);
+    }
+    
+    if (!target.getFencer().fence(target)) {
+      throw new RuntimeException("Unable to fence " + target);
+    }
+  }
+
+
+  /**
+   * Request from graceful failover to cede active role. Causes
+   * this ZKFC to transition its local node to standby, then quit
+   * the election for the specified period of time, after which it
+   * will rejoin iff it is healthy.
+   */
+  void cedeActive(final int millisToCede)
+      throws AccessControlException, ServiceFailedException, IOException {
+    try {
+      UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          doCedeActive(millisToCede);
+          return null;
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+  
+  private void doCedeActive(int millisToCede) 
+      throws AccessControlException, ServiceFailedException, IOException {
+    int timeout = FailoverController.getGracefulFenceTimeout(conf);
+
+    // Lock elector to maintain lock ordering of elector -> ZKFC
+    synchronized (elector) {
+      synchronized (this) {
+        if (millisToCede <= 0) {
+          delayJoiningUntilNanotime = 0;
+          recheckElectability();
+          return;
+        }
+  
+        LOG.info("Requested by " + UserGroupInformation.getCurrentUser() +
+            " at " + Server.getRemoteAddress() + " to cede active role.");
+        boolean needFence = false;
+        try {
+          localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo());
+          LOG.info("Successfully ensured local node is in standby mode");
+        } catch (IOException ioe) {
+          LOG.warn("Unable to transition local node to standby: " +
+              ioe.getLocalizedMessage());
+          LOG.warn("Quitting election but indicating that fencing is " +
+              "necessary");
+          needFence = true;
+        }
+        delayJoiningUntilNanotime = System.nanoTime() +
+            TimeUnit.MILLISECONDS.toNanos(millisToCede);
+        elector.quitElection(needFence);
+      }
+    }
+    recheckElectability();
+  }
+  
+  /**
+   * Coordinate a graceful failover to this node.
+   * @throws ServiceFailedException if the node fails to become active
+   * @throws IOException some other error occurs
+   */
+  void gracefulFailoverToYou() throws ServiceFailedException, IOException {
+    try {
+      UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          doGracefulFailover();
+          return null;
+        }
+        
+      });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Coordinate a graceful failover. This proceeds in several phases:
+   * 1) Pre-flight checks: ensure that the local node is healthy, and
+   * thus a candidate for failover.
+   * 2) Determine the current active node. If it is the local node, no
+   * need to failover - return success.
+   * 3) Ask that node to yield from the election for a number of seconds.
+   * 4) Allow the normal election path to run in other threads. Wait until
+   * we either become unhealthy or we see an election attempt recorded by
+   * the normal code path.
+   * 5) Allow the old active to rejoin the election, so a future
+   * failback is possible.
+   */
+  private void doGracefulFailover()
+      throws ServiceFailedException, IOException, InterruptedException {
+    int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2;
+    
+    // Phase 1: pre-flight checks
+    checkEligibleForFailover();
+    
+    // Phase 2: determine old/current active node. Check that we're not
+    // ourselves active, etc.
+    HAServiceTarget oldActive = getCurrentActive();
+    if (oldActive == null) {
+      // No node is currently active. So, if we aren't already
+      // active ourselves by means of a normal election, then there's
+      // probably something preventing us from becoming active.
+      throw new ServiceFailedException(
+          "No other node is currently active.");
+    }
+    
+    if (oldActive.getAddress().equals(localTarget.getAddress())) {
+      LOG.info("Local node " + localTarget + " is already active. " +
+          "No need to failover. Returning success.");
+      return;
+    }
+    
+    // Phase 3: ask the old active to yield from the election.
+    LOG.info("Asking " + oldActive + " to cede its active state for " +
+        timeout + "ms");
+    ZKFCProtocol oldZkfc = oldActive.getZKFCProxy(conf, timeout);
+    oldZkfc.cedeActive(timeout);
+
+    // Phase 4: wait for the normal election to make the local node
+    // active.
+    ActiveAttemptRecord attempt = waitForActiveAttempt(timeout + 60000);
+    
+    if (attempt == null) {
+      // We didn't even make an attempt to become active.
+      synchronized(this) {
+        if (lastHealthState != State.SERVICE_HEALTHY) {
+          throw new ServiceFailedException("Unable to become active. " +
+            "Service became unhealthy while trying to failover.");          
+        }
+      }
+      
+      throw new ServiceFailedException("Unable to become active. " +
+          "Local node did not get an opportunity to do so from ZooKeeper, " +
+          "or the local node took too long to transition to active.");
+    }
+
+    // Phase 5. At this point, we made some attempt to become active. So we
+    // can tell the old active to rejoin if it wants. This allows a quick
+    // fail-back if we immediately crash.
+    oldZkfc.cedeActive(-1);
+    
+    if (attempt.succeeded) {
+      LOG.info("Successfully became active. " + attempt.status);
+    } else {
+      // Propagate failure
+      String msg = "Failed to become active. " + attempt.status;
+      throw new ServiceFailedException(msg);
+    }
+  }
+
+  /**
+   * Ensure that the local node is in a healthy state, and thus
+   * eligible for graceful failover.
+   * @throws ServiceFailedException if the node is unhealthy
+   */
+  private synchronized void checkEligibleForFailover()
+      throws ServiceFailedException {
+    // Check health
+    if (this.getLastHealthState() != State.SERVICE_HEALTHY) {
+      throw new ServiceFailedException(
+          localTarget + " is not currently healthy. " +
+          "Cannot be failover target");
+    }
+  }
+
+  /**
+   * @return an {@link HAServiceTarget} for the current active node
+   * in the cluster, or null if no node is active.
+   * @throws IOException if a ZK-related issue occurs
+   * @throws InterruptedException if thread is interrupted 
+   */
+  private HAServiceTarget getCurrentActive()
+      throws IOException, InterruptedException {
+    synchronized (elector) {
+      synchronized (this) {
+        byte[] activeData;
+        try {
+          activeData = elector.getActiveData();
+        } catch (ActiveNotFoundException e) {
+          return null;
+        } catch (KeeperException ke) {
+          throw new IOException(
+              "Unexpected ZooKeeper issue fetching active node info", ke);
+        }
+        
+        HAServiceTarget oldActive = dataToTarget(activeData);
+        return oldActive;
+      }
+    }
+  }
+
+  /**
+   * Check the current state of the service, and join the election
+   * if it should be in the election.
+   */
+  private void recheckElectability() {
+    // Maintain lock ordering of elector -> ZKFC
+    synchronized (elector) {
+      synchronized (this) {
+        boolean healthy = lastHealthState == State.SERVICE_HEALTHY;
+    
+        long remainingDelay = delayJoiningUntilNanotime - System.nanoTime(); 
+        if (remainingDelay > 0) {
+          if (healthy) {
+            LOG.info("Would have joined master election, but this node is " +
+                "prohibited from doing so for " +
+                TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms");
+          }
+          scheduleRecheck(remainingDelay);
+          return;
+        }
+    
+        switch (lastHealthState) {
+        case SERVICE_HEALTHY:
+          elector.joinElection(targetToData(localTarget));
+          break;
+          
+        case INITIALIZING:
+          LOG.info("Ensuring that " + localTarget + " does not " +
+              "participate in active master election");
+          elector.quitElection(false);
+          break;
+    
+        case SERVICE_UNHEALTHY:
+        case SERVICE_NOT_RESPONDING:
+          LOG.info("Quitting master election for " + localTarget +
+              " and marking that fencing is necessary");
+          elector.quitElection(true);
+          break;
+          
+        case HEALTH_MONITOR_FAILED:
+          fatalError("Health monitor failed!");
+          break;
+          
+        default:
+          throw new IllegalArgumentException("Unhandled state:" + lastHealthState);
+        }
+      }
+    }
+  }
+  
+  /**
+   * Schedule a call to {@link #recheckElectability()} in the future.
+   */
+  private void scheduleRecheck(long whenNanos) {
+    delayExecutor.schedule(
+        new Runnable() {
+          @Override
+          public void run() {
+            try {
+              recheckElectability();
+            } catch (Throwable t) {
+              fatalError("Failed to recheck electability: " +
+                  StringUtils.stringifyException(t));
+            }
+          }
+        },
+        whenNanos, TimeUnit.NANOSECONDS);
+  }
 
   /**
    * @return the last health state passed to the FC
    * by the HealthMonitor.
    */
   @VisibleForTesting
-  State getLastHealthState() {
+  synchronized State getLastHealthState() {
     return lastHealthState;
   }
+
+  private synchronized void setLastHealthState(HealthMonitor.State newState) {
+    LOG.info("Local service " + localTarget +
+        " entered state: " + newState);
+    lastHealthState = newState;
+  }
   
   @VisibleForTesting
   ActiveStandbyElector getElectorForTests() {
     return elector;
   }
+  
+  @VisibleForTesting
+  ZKFCRpcServer getRpcServerForTests() {
+    return rpcServer;
+  }
 
   /**
    * Callbacks from elector
    */
   class ElectorCallbacks implements ActiveStandbyElectorCallback {
     @Override
-    public void becomeActive() {
+    public void becomeActive() throws ServiceFailedException {
       ZKFailoverController.this.becomeActive();
     }
 
@@ -319,31 +835,13 @@ public abstract class ZKFailoverControll
 
     @Override
     public void fenceOldActive(byte[] data) {
-      HAServiceTarget target = dataToTarget(data);
-      
-      LOG.info("Should fence: " + target);
-      boolean gracefulWorked = new FailoverController(conf)
-          .tryGracefulFence(target);
-      if (gracefulWorked) {
-        // It's possible that it's in standby but just about to go into active,
-        // no? Is there some race here?
-        LOG.info("Successfully transitioned " + target + " to standby " +
-            "state without fencing");
-        return;
-      }
-      
-      try {
-        target.checkFencingConfigured();
-      } catch (BadFencingConfigurationException e) {
-        LOG.error("Couldn't fence old active " + target, e);
-        // TODO: see below todo
-        throw new RuntimeException(e);
-      }
-      
-      if (!target.getFencer().fence(target)) {
-        // TODO: this will end up in some kind of tight loop,
-        // won't it? We need some kind of backoff
-        throw new RuntimeException("Unable to fence " + target);
+      ZKFailoverController.this.fenceOldActive(data);
+    }
+    
+    @Override
+    public String toString() {
+      synchronized (ZKFailoverController.this) {
+        return "Elector callbacks for " + localTarget;
       }
     }
   }
@@ -354,36 +852,21 @@ public abstract class ZKFailoverControll
   class HealthCallbacks implements HealthMonitor.Callback {
     @Override
     public void enteredState(HealthMonitor.State newState) {
-      LOG.info("Local service " + localTarget +
-          " entered state: " + newState);
-      switch (newState) {
-      case SERVICE_HEALTHY:
-        LOG.info("Joining master election for " + localTarget);
-        elector.joinElection(targetToData(localTarget));
-        break;
-        
-      case INITIALIZING:
-        LOG.info("Ensuring that " + localTarget + " does not " +
-            "participate in active master election");
-        elector.quitElection(false);
-        break;
-
-      case SERVICE_UNHEALTHY:
-      case SERVICE_NOT_RESPONDING:
-        LOG.info("Quitting master election for " + localTarget +
-            " and marking that fencing is necessary");
-        elector.quitElection(true);
-        break;
-        
-      case HEALTH_MONITOR_FAILED:
-        fatalError("Health monitor failed!");
-        break;
-        
-      default:
-        throw new IllegalArgumentException("Unhandled state:" + newState);
-      }
-      
-      lastHealthState = newState;
+      setLastHealthState(newState);
+      recheckElectability();
     }
   }
+  
+  private static class ActiveAttemptRecord {
+    private final boolean succeeded;
+    private final String status;
+    private final long nanoTime;
+    
+    public ActiveAttemptRecord(boolean succeeded, String status) {
+      this.succeeded = succeeded;
+      this.status = status;
+      this.nanoTime = System.nanoTime();
+    }
+  }
+
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java Wed Jun  6 00:17:38 2012
@@ -30,13 +30,14 @@ import org.apache.hadoop.ha.HAServicePro
 import org.apache.hadoop.ha.HAServiceStatus;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusRequestProto;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusResponseProto;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAStateChangeRequestInfoProto;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HARequestSource;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthRequestProto;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToActiveRequestProto;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyRequestProto;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -57,10 +58,6 @@ public class HAServiceProtocolClientSide
   private final static RpcController NULL_CONTROLLER = null;
   private final static MonitorHealthRequestProto MONITOR_HEALTH_REQ = 
       MonitorHealthRequestProto.newBuilder().build();
-  private final static TransitionToActiveRequestProto TRANSITION_TO_ACTIVE_REQ = 
-      TransitionToActiveRequestProto.newBuilder().build();
-  private final static TransitionToStandbyRequestProto TRANSITION_TO_STANDBY_REQ = 
-      TransitionToStandbyRequestProto.newBuilder().build();
   private final static GetServiceStatusRequestProto GET_SERVICE_STATUS_REQ = 
       GetServiceStatusRequestProto.newBuilder().build();
   
@@ -94,18 +91,25 @@ public class HAServiceProtocolClientSide
   }
 
   @Override
-  public void transitionToActive() throws IOException {
+  public void transitionToActive(StateChangeRequestInfo reqInfo) throws IOException {
     try {
-      rpcProxy.transitionToActive(NULL_CONTROLLER, TRANSITION_TO_ACTIVE_REQ);
+      TransitionToActiveRequestProto req =
+          TransitionToActiveRequestProto.newBuilder()
+            .setReqInfo(convert(reqInfo)).build();
+
+      rpcProxy.transitionToActive(NULL_CONTROLLER, req);
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
   }
 
   @Override
-  public void transitionToStandby() throws IOException {
+  public void transitionToStandby(StateChangeRequestInfo reqInfo) throws IOException {
     try {
-      rpcProxy.transitionToStandby(NULL_CONTROLLER, TRANSITION_TO_STANDBY_REQ);
+      TransitionToStandbyRequestProto req =
+        TransitionToStandbyRequestProto.newBuilder()
+          .setReqInfo(convert(reqInfo)).build();
+      rpcProxy.transitionToStandby(NULL_CONTROLLER, req);
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
@@ -143,6 +147,27 @@ public class HAServiceProtocolClientSide
     }
   }
   
+  private HAStateChangeRequestInfoProto convert(StateChangeRequestInfo reqInfo) {
+    HARequestSource src;
+    switch (reqInfo.getSource()) {
+    case REQUEST_BY_USER:
+      src = HARequestSource.REQUEST_BY_USER;
+      break;
+    case REQUEST_BY_USER_FORCED:
+      src = HARequestSource.REQUEST_BY_USER_FORCED;
+      break;
+    case REQUEST_BY_ZKFC:
+      src = HARequestSource.REQUEST_BY_ZKFC;
+      break;
+    default:
+      throw new IllegalArgumentException("Bad source: " + reqInfo.getSource());
+    }
+    return HAStateChangeRequestInfoProto.newBuilder()
+        .setReqSource(src)
+        .build();
+  }
+
+
   @Override
   public void close() {
     RPC.stopProxy(rpcProxy);

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java Wed Jun  6 00:17:38 2012
@@ -19,12 +19,17 @@ package org.apache.hadoop.ha.protocolPB;
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
 import org.apache.hadoop.ha.HAServiceStatus;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusRequestProto;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusResponseProto;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAStateChangeRequestInfoProto;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthRequestProto;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthResponseProto;
@@ -56,6 +61,8 @@ public class HAServiceProtocolServerSide
       TransitionToActiveResponseProto.newBuilder().build();
   private static final TransitionToStandbyResponseProto TRANSITION_TO_STANDBY_RESP = 
       TransitionToStandbyResponseProto.newBuilder().build();
+  private static final Log LOG = LogFactory.getLog(
+      HAServiceProtocolServerSideTranslatorPB.class);
   
   public HAServiceProtocolServerSideTranslatorPB(HAServiceProtocol server) {
     this.server = server;
@@ -71,13 +78,33 @@ public class HAServiceProtocolServerSide
       throw new ServiceException(e);
     }
   }
+  
+  private StateChangeRequestInfo convert(HAStateChangeRequestInfoProto proto) {
+    RequestSource src;
+    switch (proto.getReqSource()) {
+    case REQUEST_BY_USER:
+      src = RequestSource.REQUEST_BY_USER;
+      break;
+    case REQUEST_BY_USER_FORCED:
+      src = RequestSource.REQUEST_BY_USER_FORCED;
+      break;
+    case REQUEST_BY_ZKFC:
+      src = RequestSource.REQUEST_BY_ZKFC;
+      break;
+    default:
+      LOG.warn("Unknown request source: " + proto.getReqSource());
+      src = null;
+    }
+    
+    return new StateChangeRequestInfo(src);
+  }
 
   @Override
   public TransitionToActiveResponseProto transitionToActive(
       RpcController controller, TransitionToActiveRequestProto request)
       throws ServiceException {
     try {
-      server.transitionToActive();
+      server.transitionToActive(convert(request.getReqInfo()));
       return TRANSITION_TO_ACTIVE_RESP;
     } catch(IOException e) {
       throw new ServiceException(e);
@@ -89,7 +116,7 @@ public class HAServiceProtocolServerSide
       RpcController controller, TransitionToStandbyRequestProto request)
       throws ServiceException {
     try {
-      server.transitionToStandby();
+      server.transitionToStandby(convert(request.getReqInfo()));
       return TRANSITION_TO_STANDBY_RESP;
     } catch(IOException e) {
       throw new ServiceException(e);

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java Wed Jun  6 00:17:38 2012
@@ -96,7 +96,7 @@ public class HttpServer implements Filte
   // The ServletContext attribute where the daemon Configuration
   // gets stored.
   public static final String CONF_CONTEXT_ATTRIBUTE = "hadoop.conf";
-  static final String ADMINS_ACL = "admins.acl";
+  public static final String ADMINS_ACL = "admins.acl";
   public static final String SPNEGO_FILTER = "SpnegoFilter";
 
   public static final String BIND_ADDRESS = "bind.address";
@@ -792,7 +792,7 @@ public class HttpServer implements Filte
    * 
    * @param servletContext
    * @param request
-   * @param response
+   * @param response used to send the error response if user does not have admin access.
    * @return true if admin-authorized, false otherwise
    * @throws IOException
    */
@@ -814,18 +814,33 @@ public class HttpServer implements Filte
                          "authorized to access this page.");
       return false;
     }
+    
+    if (servletContext.getAttribute(ADMINS_ACL) != null &&
+        !userHasAdministratorAccess(servletContext, remoteUser)) {
+      response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "User "
+          + remoteUser + " is unauthorized to access this page.");
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * Get the admin ACLs from the given ServletContext and check if the given
+   * user is in the ACL.
+   * 
+   * @param servletContext the context containing the admin ACL.
+   * @param remoteUser the remote user to check for.
+   * @return true if the user is present in the ACL, false if no ACL is set or
+   *         the user is not present
+   */
+  public static boolean userHasAdministratorAccess(ServletContext servletContext,
+      String remoteUser) {
     AccessControlList adminsAcl = (AccessControlList) servletContext
         .getAttribute(ADMINS_ACL);
     UserGroupInformation remoteUserUGI =
         UserGroupInformation.createRemoteUser(remoteUser);
-    if (adminsAcl != null) {
-      if (!adminsAcl.isUserAllowed(remoteUserUGI)) {
-        response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "User "
-            + remoteUser + " is unauthorized to access this page.");
-        return false;
-      }
-    }
-    return true;
+    return adminsAcl != null && adminsAcl.isUserAllowed(remoteUserUGI);
   }
 
   /**

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/lib/StaticUserWebFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/lib/StaticUserWebFilter.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/lib/StaticUserWebFilter.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/lib/StaticUserWebFilter.java Wed Jun  6 00:17:38 2012
@@ -37,15 +37,15 @@ import org.apache.hadoop.http.FilterInit
 
 import javax.servlet.Filter;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
+
 /**
  * Provides a servlet filter that pretends to authenticate a fake user (Dr.Who)
  * so that the web UI is usable for a secure cluster without authentication.
  */
 public class StaticUserWebFilter extends FilterInitializer {
   static final String DEPRECATED_UGI_KEY = "dfs.web.ugi";
-  
-  static final String USERNAME_KEY = "hadoop.http.staticuser.user";
-  static final String USERNAME_DEFAULT = "dr.who";
 
   private static final Log LOG = LogFactory.getLog(StaticUserWebFilter.class);
 
@@ -112,7 +112,7 @@ public class StaticUserWebFilter extends
 
     @Override
     public void init(FilterConfig conf) throws ServletException {
-      this.username = conf.getInitParameter(USERNAME_KEY);
+      this.username = conf.getInitParameter(HADOOP_HTTP_STATIC_USER);
       this.user = new User(username);
     }
     
@@ -123,7 +123,7 @@ public class StaticUserWebFilter extends
     HashMap<String, String> options = new HashMap<String, String>();
     
     String username = getUsernameFromConf(conf);
-    options.put(USERNAME_KEY, username);
+    options.put(HADOOP_HTTP_STATIC_USER, username);
 
     container.addFilter("static_user_filter", 
                         StaticUserFilter.class.getName(), 
@@ -139,11 +139,12 @@ public class StaticUserWebFilter extends
       // We can't use the normal configuration deprecation mechanism here
       // since we need to split out the username from the configured UGI.
       LOG.warn(DEPRECATED_UGI_KEY + " should not be used. Instead, use " + 
-               USERNAME_KEY + ".");
+          HADOOP_HTTP_STATIC_USER + ".");
       String[] parts = oldStyleUgi.split(",");
       return parts[0];
     } else {
-      return conf.get(USERNAME_KEY, USERNAME_DEFAULT);
+      return conf.get(HADOOP_HTTP_STATIC_USER,
+        DEFAULT_HADOOP_HTTP_STATIC_USER);
     }
   }
 

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java Wed Jun  6 00:17:38 2012
@@ -807,7 +807,7 @@ public class SequenceFile {
   }
   
   /** Write key/value pairs to a sequence-format file. */
-  public static class Writer implements java.io.Closeable {
+  public static class Writer implements java.io.Closeable, Syncable {
     private Configuration conf;
     FSDataOutputStream out;
     boolean ownOutputStream = true;
@@ -1193,13 +1193,31 @@ public class SequenceFile {
       }
     }
 
-    /** flush all currently written data to the file system */
+    /**
+     * flush all currently written data to the file system
+     * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
+     */
+    @Deprecated
     public void syncFs() throws IOException {
       if (out != null) {
         out.hflush();  // flush contents to file system
       }
     }
 
+    @Override
+    public void hsync() throws IOException {
+      if (out != null) {
+        out.hsync();
+      }
+    }
+
+    @Override
+    public void hflush() throws IOException {
+      if (out != null) {
+        out.hflush();
+      }
+    }
+    
     /** Returns the configuration of this file. */
     Configuration getConf() { return conf; }
     

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Text.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Text.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Text.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Text.java Wed Jun  6 00:17:38 2012
@@ -236,6 +236,11 @@ public class Text extends BinaryComparab
 
   /**
    * Clear the string to empty.
+   *
+   * <em>Note</em>: For performance reasons, this call does not clear the
+   * underlying byte array that is retrievable via {@link #getBytes()}.
+   * In order to free the byte-array memory, call {@link #set(byte[])}
+   * with an empty byte array (For example, <code>new byte[0]</code>).
    */
   public void clear() {
     length = 0;

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Writable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Writable.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Writable.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Writable.java Wed Jun  6 00:17:38 2012
@@ -39,20 +39,23 @@ import org.apache.hadoop.classification.
  * <p>Example:</p>
  * <p><blockquote><pre>
  *     public class MyWritable implements Writable {
- *       // Some data     
+ *       // Some data
  *       private int counter;
  *       private long timestamp;
- *       
+ *
+ *       // Default constructor to allow (de)serialization
+ *       MyWritable() { }
+ *
  *       public void write(DataOutput out) throws IOException {
  *         out.writeInt(counter);
  *         out.writeLong(timestamp);
  *       }
- *       
+ *
  *       public void readFields(DataInput in) throws IOException {
  *         counter = in.readInt();
  *         timestamp = in.readLong();
  *       }
- *       
+ *
  *       public static MyWritable read(DataInput in) throws IOException {
  *         MyWritable w = new MyWritable();
  *         w.readFields(in);

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java Wed Jun  6 00:17:38 2012
@@ -109,8 +109,12 @@ public class CompressionCodecFactory {
     List<Class<? extends CompressionCodec>> result
       = new ArrayList<Class<? extends CompressionCodec>>();
     // Add codec classes discovered via service loading
-    for (CompressionCodec codec : CODEC_PROVIDERS) {
-      result.add(codec.getClass());
+    synchronized (CODEC_PROVIDERS) {
+      // CODEC_PROVIDERS is a lazy collection. Synchronize so it is
+      // thread-safe. See HADOOP-8406.
+      for (CompressionCodec codec : CODEC_PROVIDERS) {
+        result.add(codec.getClass());
+      }
     }
     // Add codec classes from configuration
     String codecsString = conf.get("io.compression.codecs");

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Wed Jun  6 00:17:38 2012
@@ -53,6 +53,8 @@ import org.apache.hadoop.fs.CommonConfig
 import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
 import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto;
+import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto;
+import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -845,24 +847,24 @@ public class Client {
       touch();
       
       try {
-        int id = in.readInt();                    // try to read an id
-
+        RpcResponseHeaderProto response = 
+            RpcResponseHeaderProto.parseDelimitedFrom(in);
+        int callId = response.getCallId();
         if (LOG.isDebugEnabled())
-          LOG.debug(getName() + " got value #" + id);
-
-        Call call = calls.get(id);
+          LOG.debug(getName() + " got value #" + callId);
 
-        int state = in.readInt();     // read call status
-        if (state == Status.SUCCESS.state) {
+        Call call = calls.get(callId);
+        RpcStatusProto status = response.getStatus();
+        if (status == RpcStatusProto.SUCCESS) {
           Writable value = ReflectionUtils.newInstance(valueClass, conf);
           value.readFields(in);                 // read value
           call.setRpcResponse(value);
-          calls.remove(id);
-        } else if (state == Status.ERROR.state) {
+          calls.remove(callId);
+        } else if (status == RpcStatusProto.ERROR) {
           call.setException(new RemoteException(WritableUtils.readString(in),
                                                 WritableUtils.readString(in)));
-          calls.remove(id);
-        } else if (state == Status.FATAL.state) {
+          calls.remove(callId);
+        } else if (status == RpcStatusProto.FATAL) {
           // Close the connection
           markClosed(new RemoteException(WritableUtils.readString(in), 
                                          WritableUtils.readString(in)));

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Wed Jun  6 00:17:38 2012
@@ -396,24 +396,44 @@ public class ProtobufRpcEngine implement
        * it is.</li>
        * </ol>
        */
-      public Writable call(RPC.Server server, String protocol,
+      public Writable call(RPC.Server server, String connectionProtocolName,
           Writable writableRequest, long receiveTime) throws Exception {
         RpcRequestWritable request = (RpcRequestWritable) writableRequest;
         HadoopRpcRequestProto rpcRequest = request.message;
         String methodName = rpcRequest.getMethodName();
-        String protoName = rpcRequest.getDeclaringClassProtocolName();
+        
+        
+        /** 
+         * RPCs for a particular interface (ie protocol) are done using a
+         * IPC connection that is setup using rpcProxy.
+         * The rpcProxy's has a declared protocol name that is 
+         * sent form client to server at connection time. 
+         * 
+         * Each Rpc call also sends a protocol name 
+         * (called declaringClassprotocolName). This name is usually the same
+         * as the connection protocol name except in some cases. 
+         * For example metaProtocols such ProtocolInfoProto which get info
+         * about the protocol reuse the connection but need to indicate that
+         * the actual protocol is different (i.e. the protocol is
+         * ProtocolInfoProto) since they reuse the connection; in this case
+         * the declaringClassProtocolName field is set to the ProtocolInfoProto.
+         */
+
+        String declaringClassProtoName = 
+            rpcRequest.getDeclaringClassProtocolName();
         long clientVersion = rpcRequest.getClientProtocolVersion();
         if (server.verbose)
-          LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
+          LOG.info("Call: connectionProtocolName=" + connectionProtocolName + 
+              ", method=" + methodName);
         
-        ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,
-            clientVersion);
+        ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, 
+                              declaringClassProtoName, clientVersion);
         BlockingService service = (BlockingService) protocolImpl.protocolImpl;
         MethodDescriptor methodDescriptor = service.getDescriptorForType()
             .findMethodByName(methodName);
         if (methodDescriptor == null) {
-          String msg = "Unknown method " + methodName + " called on " + protocol
-              + " protocol.";
+          String msg = "Unknown method " + methodName + " called on " 
+                                + connectionProtocolName + " protocol.";
           LOG.warn(msg);
           throw new RpcServerException(msg);
         }

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Wed Jun  6 00:17:38 2012
@@ -1339,7 +1339,7 @@ public abstract class Server {
               + CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
               + ") is configured as simple. Please configure another method "
               + "like kerberos or digest.");
-            setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
+            setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL,
                 null, ae.getClass().getName(), ae.getMessage());
             responder.doRespond(authFailedCall);
             throw ae;
@@ -1420,7 +1420,7 @@ public abstract class Server {
         Call fakeCall =  new Call(-1, null, this);
         // Versions 3 and greater can interpret this exception
         // response in the same manner
-        setupResponse(buffer, fakeCall, Status.FATAL,
+        setupResponseOldVersionFatal(buffer, fakeCall,
             null, VersionMismatch.class.getName(), errMsg);
 
         responder.doRespond(fakeCall);
@@ -1443,7 +1443,7 @@ public abstract class Server {
       ByteArrayOutputStream buffer = new ByteArrayOutputStream();
 
       Call fakeCall = new Call(-1, null, this);
-      setupResponse(buffer, fakeCall, Status.FATAL, null,
+      setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, null,
           IpcException.class.getName(), errMsg);
       responder.doRespond(fakeCall);
     }
@@ -1579,7 +1579,7 @@ public abstract class Server {
             new Call(header.getCallId(), null, this);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
 
-        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+        setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null,
             IOException.class.getName(),
             "Unknown rpc kind "  + header.getRpcKind());
         responder.doRespond(readParamsFailedCall);
@@ -1597,7 +1597,7 @@ public abstract class Server {
             new Call(header.getCallId(), null, this);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
 
-        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+        setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null,
             t.getClass().getName(),
             "IPC server unable to read call parameters: " + t.getMessage());
         responder.doRespond(readParamsFailedCall);
@@ -1627,7 +1627,7 @@ public abstract class Server {
         rpcMetrics.incrAuthorizationSuccesses();
       } catch (AuthorizationException ae) {
         rpcMetrics.incrAuthorizationFailures();
-        setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
+        setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL, null,
             ae.getClass().getName(), ae.getMessage());
         responder.doRespond(authFailedCall);
         return false;
@@ -1725,8 +1725,8 @@ public abstract class Server {
             // responder.doResponse() since setupResponse may use
             // SASL to encrypt response data and SASL enforces
             // its own message ordering.
-            setupResponse(buf, call, (error == null) ? Status.SUCCESS
-                : Status.ERROR, value, errorClass, error);
+            setupResponse(buf, call, (error == null) ? RpcStatusProto.SUCCESS
+                : RpcStatusProto.ERROR, value, errorClass, error);
             
             // Discard the large buf and reset it back to smaller size 
             // to free up heap
@@ -1859,41 +1859,80 @@ public abstract class Server {
   /**
    * Setup response for the IPC Call.
    * 
-   * @param response buffer to serialize the response into
+   * @param responseBuf buffer to serialize the response into
    * @param call {@link Call} to which we are setting up the response
-   * @param status {@link Status} of the IPC call
+   * @param status of the IPC call
    * @param rv return value for the IPC Call, if the call was successful
    * @param errorClass error class, if the the call failed
    * @param error error message, if the call failed
    * @throws IOException
    */
-  private void setupResponse(ByteArrayOutputStream response, 
-                             Call call, Status status, 
+  private void setupResponse(ByteArrayOutputStream responseBuf,
+                             Call call, RpcStatusProto status, 
                              Writable rv, String errorClass, String error) 
   throws IOException {
-    response.reset();
-    DataOutputStream out = new DataOutputStream(response);
-    out.writeInt(call.callId);                // write call id
-    out.writeInt(status.state);           // write status
+    responseBuf.reset();
+    DataOutputStream out = new DataOutputStream(responseBuf);
+    RpcResponseHeaderProto.Builder response =  
+        RpcResponseHeaderProto.newBuilder();
+    response.setCallId(call.callId);
+    response.setStatus(status);
 
-    if (status == Status.SUCCESS) {
+
+    if (status == RpcStatusProto.SUCCESS) {
       try {
+        response.build().writeDelimitedTo(out);
         rv.write(out);
       } catch (Throwable t) {
         LOG.warn("Error serializing call response for call " + call, t);
         // Call back to same function - this is OK since the
         // buffer is reset at the top, and since status is changed
         // to ERROR it won't infinite loop.
-        setupResponse(response, call, Status.ERROR,
+        setupResponse(responseBuf, call, RpcStatusProto.ERROR,
             null, t.getClass().getName(),
             StringUtils.stringifyException(t));
         return;
       }
     } else {
+      if (status == RpcStatusProto.FATAL) {
+        response.setServerIpcVersionNum(Server.CURRENT_VERSION);
+      }
+      response.build().writeDelimitedTo(out);
       WritableUtils.writeString(out, errorClass);
       WritableUtils.writeString(out, error);
     }
     if (call.connection.useWrap) {
+      wrapWithSasl(responseBuf, call);
+    }
+    call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray()));
+  }
+  
+  /**
+   * Setup response for the IPC Call on Fatal Error from a 
+   * client that is using old version of Hadoop.
+   * The response is serialized using the previous protocol's response
+   * layout.
+   * 
+   * @param response buffer to serialize the response into
+   * @param call {@link Call} to which we are setting up the response
+   * @param rv return value for the IPC Call, if the call was successful
+   * @param errorClass error class, if the the call failed
+   * @param error error message, if the call failed
+   * @throws IOException
+   */
+  private void setupResponseOldVersionFatal(ByteArrayOutputStream response, 
+                             Call call,
+                             Writable rv, String errorClass, String error) 
+  throws IOException {
+    final int OLD_VERSION_FATAL_STATUS = -1;
+    response.reset();
+    DataOutputStream out = new DataOutputStream(response);
+    out.writeInt(call.callId);                // write call id
+    out.writeInt(OLD_VERSION_FATAL_STATUS);   // write FATAL_STATUS
+    WritableUtils.writeString(out, errorClass);
+    WritableUtils.writeString(out, error);
+
+    if (call.connection.useWrap) {
       wrapWithSasl(response, call);
     }
     call.setResponse(ByteBuffer.wrap(response.toByteArray()));

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java Wed Jun  6 00:17:38 2012
@@ -34,6 +34,7 @@ import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import javax.management.ReflectionException;
+import javax.management.RuntimeErrorException;
 import javax.management.RuntimeMBeanException;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeType;
@@ -317,6 +318,11 @@ public class JMXJsonServlet extends Http
         LOG.error("getting attribute "+attName+" of "+oname+" threw an exception", e);
       }
       return;
+    } catch (RuntimeErrorException e) {
+      // RuntimeErrorException happens when an unexpected failure occurs in getAttribute
+      // for example https://issues.apache.org/jira/browse/DAEMON-120
+      LOG.debug("getting attribute "+attName+" of "+oname+" threw an exception", e);
+      return;
     } catch (AttributeNotFoundException e) {
       //Ignored the attribute was not found, which should never happen because the bean
       //just told us that it has this attribute, but if this happens just don't output

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/Node.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/Node.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/Node.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/Node.java Wed Jun  6 00:17:38 2012
@@ -40,6 +40,7 @@ public interface Node {
    * @param location the location
    */
   public void setNetworkLocation(String location);
+
   /** @return this node's name */
   public String getName();
 

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java Wed Jun  6 00:17:38 2012
@@ -110,7 +110,7 @@ public class NodeBase implements Node {
    * @return the path of a node
    */
   public static String getPath(Node node) {
-    return node.getNetworkLocation()+PATH_SEPARATOR_STR+node.getName();
+    return node.getNetworkLocation() + PATH_SEPARATOR_STR + node.getName();
   }
   
   /** @return this node's path as its string representation */

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java Wed Jun  6 00:17:38 2012
@@ -18,10 +18,15 @@
 
 package org.apache.hadoop.security.token;
 
+import com.google.common.collect.Maps;
+
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.ServiceLoader;
 
 import org.apache.commons.codec.binary.Base64;
@@ -37,6 +42,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * The client-side form of the token.
@@ -45,6 +51,9 @@ import org.apache.hadoop.io.WritableUtil
 @InterfaceStability.Evolving
 public class Token<T extends TokenIdentifier> implements Writable {
   public static final Log LOG = LogFactory.getLog(Token.class);
+  
+  private static Map<Text, Class<? extends TokenIdentifier>> tokenKindMap;
+  
   private byte[] identifier;
   private byte[] password;
   private Text kind;
@@ -100,13 +109,49 @@ public class Token<T extends TokenIdenti
   }
 
   /**
-   * Get the token identifier
-   * @return the token identifier
+   * Get the token identifier's byte representation
+   * @return the token identifier's byte representation
    */
   public byte[] getIdentifier() {
     return identifier;
   }
   
+  private static synchronized Class<? extends TokenIdentifier>
+      getClassForIdentifier(Text kind) {
+    if (tokenKindMap == null) {
+      tokenKindMap = Maps.newHashMap();
+      for (TokenIdentifier id : ServiceLoader.load(TokenIdentifier.class)) {
+        tokenKindMap.put(id.getKind(), id.getClass());
+      }
+    }
+    Class<? extends TokenIdentifier> cls = tokenKindMap.get(kind);
+    if (cls == null) {
+      LOG.warn("Cannot find class for token kind " + kind);
+       return null;
+    }
+    return cls;
+  }
+  
+  /**
+   * Get the token identifier object, or null if it could not be constructed
+   * (because the class could not be loaded, for example).
+   * @return the token identifier, or null
+   * @throws IOException 
+   */
+  @SuppressWarnings("unchecked")
+  public T decodeIdentifier() throws IOException {
+    Class<? extends TokenIdentifier> cls = getClassForIdentifier(getKind());
+    if (cls == null) {
+      return null;
+    }
+    TokenIdentifier tokenIdentifier = ReflectionUtils.newInstance(cls, null);
+    ByteArrayInputStream buf = new ByteArrayInputStream(identifier);
+    DataInputStream in = new DataInputStream(buf);  
+    tokenIdentifier.readFields(in);
+    in.close();
+    return (T) tokenIdentifier;
+  }
+  
   /**
    * Get the token password/secret
    * @return the token password/secret
@@ -260,16 +305,31 @@ public class Token<T extends TokenIdenti
       buffer.append(num);
     }
   }
+  
+  private void identifierToString(StringBuilder buffer) {
+    T id = null;
+    try {
+      id = decodeIdentifier();
+    } catch (IOException e) {
+      // handle in the finally block
+    } finally {
+      if (id != null) {
+        buffer.append("(").append(id).append(")");
+      } else {
+        addBinaryBuffer(buffer, identifier);
+      }
+    }
+  }
 
   @Override
   public String toString() {
     StringBuilder buffer = new StringBuilder();
-    buffer.append("Ident: ");
-    addBinaryBuffer(buffer, identifier);
-    buffer.append(", Kind: ");
+    buffer.append("Kind: ");
     buffer.append(kind.toString());
     buffer.append(", Service: ");
     buffer.append(service.toString());
+    buffer.append(", Ident: ");
+    identifierToString(buffer);
     return buffer.toString();
   }
   

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-env.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-env.sh?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-env.sh (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-env.sh Wed Jun  6 00:17:38 2012
@@ -48,10 +48,14 @@ done
 export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true $HADOOP_CLIENT_OPTS"
 
 # Command specific options appended to HADOOP_OPTS when specified
-export HADOOP_NAMENODE_OPTS="-Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=INFO,RFAAUDIT $HADOOP_NAMENODE_OPTS"
+export HADOOP_NAMENODE_OPTS="-Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,NullAppender} $HADOOP_NAMENODE_OPTS"
 export HADOOP_DATANODE_OPTS="-Dhadoop.security.logger=ERROR,RFAS $HADOOP_DATANODE_OPTS"
 
-export HADOOP_SECONDARYNAMENODE_OPTS="-Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=INFO,RFAAUDIT $HADOOP_SECONDARYNAMENODE_OPTS"
+export HADOOP_SECONDARYNAMENODE_OPTS="-Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,NullAppender} $HADOOP_SECONDARYNAMENODE_OPTS"
+
+# The ZKFC does not need a large heap, and keeping it small avoids
+# any potential for long GC pauses
+export HADOOP_ZKFC_OPTS="-Xmx256m $HADOOP_ZKFC_OPTS"
 
 # The following applies to multiple commands (fs, dfs, fsck, distcp etc)
 export HADOOP_CLIENT_OPTS="-Xmx128m $HADOOP_CLIENT_OPTS"

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml Wed Jun  6 00:17:38 2012
@@ -223,6 +223,12 @@
     <description>ACL for HAService protocol used by HAAdmin to manage the
       active and stand-by states of namenode.</description>
   </property>
+  <property>
+    <name>security.zkfc.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for access to the ZK Failover Controller
+    </description>
+  </property>
 
    <property>
       <name>security.mrhs.client.protocol.acl</name>

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/log4j.properties?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/log4j.properties (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/log4j.properties Wed Jun  6 00:17:38 2012
@@ -102,7 +102,7 @@ log4j.appender.TLA.layout.ConversionPatt
 #
 #Security appender
 #
-hadoop.security.logger=INFO,console
+hadoop.security.logger=INFO,NullAppender
 hadoop.security.log.maxfilesize=256MB
 hadoop.security.log.maxbackupindex=20
 log4j.category.SecurityLogger=${hadoop.security.logger}
@@ -126,7 +126,7 @@ log4j.appender.DRFAS.DatePattern=.yyyy-M
 #
 # hdfs audit logging
 #
-hdfs.audit.logger=INFO,console
+hdfs.audit.logger=INFO,NullAppender
 hdfs.audit.log.maxfilesize=256MB
 hdfs.audit.log.maxbackupindex=20
 log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}
@@ -141,7 +141,7 @@ log4j.appender.RFAAUDIT.MaxBackupIndex=$
 #
 # mapred audit logging
 #
-mapred.audit.logger=INFO,console
+mapred.audit.logger=INFO,NullAppender
 mapred.audit.log.maxfilesize=256MB
 mapred.audit.log.maxbackupindex=20
 log4j.logger.org.apache.hadoop.mapred.AuditLogger=${mapred.audit.logger}

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto Wed Jun  6 00:17:38 2012
@@ -27,6 +27,16 @@ enum HAServiceStateProto {
   STANDBY = 2;
 }
 
+enum HARequestSource {
+  REQUEST_BY_USER = 0;
+  REQUEST_BY_USER_FORCED = 1;
+  REQUEST_BY_ZKFC = 2;
+}
+
+message HAStateChangeRequestInfoProto {
+  required HARequestSource reqSource = 1;
+}
+
 /**
  * void request
  */
@@ -43,6 +53,7 @@ message MonitorHealthResponseProto { 
  * void request
  */
 message TransitionToActiveRequestProto { 
+  required HAStateChangeRequestInfoProto reqInfo = 1;
 }
 
 /**
@@ -55,6 +66,7 @@ message TransitionToActiveResponseProto 
  * void request
  */
 message TransitionToStandbyRequestProto { 
+  required HAStateChangeRequestInfoProto reqInfo = 1;
 }
 
 /**



Mime
View raw message