Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 12F9E99A0 for ; Wed, 6 Jun 2012 00:18:24 +0000 (UTC) Received: (qmail 22107 invoked by uid 500); 6 Jun 2012 00:18:23 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 22003 invoked by uid 500); 6 Jun 2012 00:18:23 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 21836 invoked by uid 99); 6 Jun 2012 00:18:23 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Jun 2012 00:18:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Jun 2012 00:18:18 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 44B602388A33; Wed, 6 Jun 2012 00:17:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1346682 [2/3] - in /hadoop/common/branches/HDFS-3092/hadoop-common-project: ./ hadoop-annotations/ hadoop-auth-examples/ hadoop-auth/ hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ hadoop-common/ hadoop-common/de... Date: Wed, 06 Jun 2012 00:17:54 -0000 To: common-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120606001758.44B602388A33@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java Wed Jun 6 00:17:38 2012 @@ -60,6 +60,31 @@ public interface HAServiceProtocol { return name; } } + + public static enum RequestSource { + REQUEST_BY_USER, + REQUEST_BY_USER_FORCED, + REQUEST_BY_ZKFC; + } + + /** + * Information describing the source for a request to change state. + * This is used to differentiate requests from automatic vs CLI + * failover controllers, and in the future may include epoch + * information. + */ + public static class StateChangeRequestInfo { + private final RequestSource source; + + public StateChangeRequestInfo(RequestSource source) { + super(); + this.source = source; + } + + public RequestSource getSource() { + return source; + } + } /** * Monitor the health of service. This periodically called by the HA @@ -95,7 +120,8 @@ public interface HAServiceProtocol { * @throws IOException * if other errors happen */ - public void transitionToActive() throws ServiceFailedException, + public void transitionToActive(StateChangeRequestInfo reqInfo) + throws ServiceFailedException, AccessControlException, IOException; @@ -110,7 +136,8 @@ public interface HAServiceProtocol { * @throws IOException * if other errors happen */ - public void transitionToStandby() throws ServiceFailedException, + public void transitionToStandby(StateChangeRequestInfo reqInfo) + throws ServiceFailedException, AccessControlException, IOException; Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java Wed Jun 6 00:17:38 2012 @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.ipc.RemoteException; /** @@ -30,7 +31,8 @@ import org.apache.hadoop.ipc.RemoteExcep @InterfaceAudience.Public @InterfaceStability.Evolving public class HAServiceProtocolHelper { - public static void monitorHealth(HAServiceProtocol svc) + public static void monitorHealth(HAServiceProtocol svc, + StateChangeRequestInfo reqInfo) throws IOException { try { svc.monitorHealth(); @@ -39,19 +41,21 @@ public class HAServiceProtocolHelper { } } - public static void transitionToActive(HAServiceProtocol svc) + public static void transitionToActive(HAServiceProtocol svc, + StateChangeRequestInfo reqInfo) throws IOException { try { - svc.transitionToActive(); + svc.transitionToActive(reqInfo); } catch (RemoteException e) { throw e.unwrapRemoteException(ServiceFailedException.class); } } - public static void transitionToStandby(HAServiceProtocol svc) + public static void transitionToStandby(HAServiceProtocol svc, + StateChangeRequestInfo reqInfo) throws IOException { try { - svc.transitionToStandby(); + svc.transitionToStandby(reqInfo); } catch (RemoteException e) { throw e.unwrapRemoteException(ServiceFailedException.class); } Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java Wed Jun 6 00:17:38 2012 @@ -28,6 +28,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB; +import org.apache.hadoop.ha.protocolPB.ZKFCProtocolClientSideTranslatorPB; import org.apache.hadoop.net.NetUtils; import com.google.common.collect.Maps; @@ -49,6 +50,11 @@ public abstract class HAServiceTarget { public abstract InetSocketAddress getAddress(); /** + * @return the IPC address of the ZKFC on the target node + */ + public abstract InetSocketAddress getZKFCAddress(); + + /** * @return a Fencer implementation configured for this target node */ public abstract NodeFencer getFencer(); @@ -76,6 +82,20 @@ public abstract class HAServiceTarget { confCopy, factory, timeoutMs); } + /** + * @return a proxy to the ZKFC which is associated with this HA service. + */ + public ZKFCProtocol getZKFCProxy(Configuration conf, int timeoutMs) + throws IOException { + Configuration confCopy = new Configuration(conf); + // Lower the timeout so we quickly fail to connect + confCopy.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); + SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy); + return new ZKFCProtocolClientSideTranslatorPB( + getZKFCAddress(), + confCopy, factory, timeoutMs); + } + public final Map getFencingParameters() { Map ret = Maps.newHashMap(); addFencingParameters(ret); @@ -99,4 +119,11 @@ public abstract class HAServiceTarget { ret.put(HOST_SUBST_KEY, getAddress().getHostName()); ret.put(PORT_SUBST_KEY, String.valueOf(getAddress().getPort())); } + + /** + * @return true if auto failover should be considered enabled + */ + public boolean isAutoFailoverEnabled() { + return false; + } } Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java Wed Jun 6 00:17:38 2012 @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,7 +44,8 @@ import com.google.common.base.Preconditi * Classes which need callbacks should implement the {@link Callback} * interface. */ -class HealthMonitor { +@InterfaceAudience.Private +public class HealthMonitor { private static final Log LOG = LogFactory.getLog( HealthMonitor.class); @@ -75,7 +77,8 @@ class HealthMonitor { private HAServiceStatus lastServiceState = new HAServiceStatus( HAServiceState.INITIALIZING); - enum State { + @InterfaceAudience.Private + public enum State { /** * The health monitor is still starting up. */ Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java Wed Jun 6 00:17:38 2012 @@ -18,79 +18,143 @@ package org.apache.hadoop.ha; import java.io.IOException; +import java.net.InetSocketAddress; import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; +import java.util.Collections; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; +import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; +import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo; import org.apache.hadoop.ha.HealthMonitor.State; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.hadoop.util.ToolRunner; import org.apache.zookeeper.data.ACL; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; @InterfaceAudience.LimitedPrivate("HDFS") -public abstract class ZKFailoverController implements Tool { +public abstract class ZKFailoverController { static final Log LOG = LogFactory.getLog(ZKFailoverController.class); - // TODO: this should be namespace-scoped public static final String ZK_QUORUM_KEY = "ha.zookeeper.quorum"; private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms"; private static final int ZK_SESSION_TIMEOUT_DEFAULT = 5*1000; private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode"; + public static final String ZK_ACL_KEY = "ha.zookeeper.acl"; + private static final String ZK_ACL_DEFAULT = "world:anyone:rwcda"; + public static final String ZK_AUTH_KEY = "ha.zookeeper.auth"; static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha"; + /** + * All of the conf keys used by the ZKFC. This is used in order to allow + * them to be overridden on a per-nameservice or per-namenode basis. + */ + protected static final String[] ZKFC_CONF_KEYS = new String[] { + ZK_QUORUM_KEY, + ZK_SESSION_TIMEOUT_KEY, + ZK_PARENT_ZNODE_KEY, + ZK_ACL_KEY, + ZK_AUTH_KEY + }; + + /** Unable to format the parent znode in ZK */ static final int ERR_CODE_FORMAT_DENIED = 2; /** The parent znode doesn't exist in ZK */ static final int ERR_CODE_NO_PARENT_ZNODE = 3; /** Fencing is not properly configured */ static final int ERR_CODE_NO_FENCER = 4; + /** Automatic failover is not enabled */ + static final int ERR_CODE_AUTO_FAILOVER_NOT_ENABLED = 5; + /** Cannot connect to ZooKeeper */ + static final int ERR_CODE_NO_ZK = 6; - private Configuration conf; + protected Configuration conf; + private String zkQuorum; + protected final HAServiceTarget localTarget; private HealthMonitor healthMonitor; private ActiveStandbyElector elector; - - private HAServiceTarget localTarget; - - private String parentZnode; + protected ZKFCRpcServer rpcServer; private State lastHealthState = State.INITIALIZING; /** Set if a fatal error occurs */ private String fatalError = null; - @Override - public void setConf(Configuration conf) { + /** + * A future nanotime before which the ZKFC will not join the election. + * This is used during graceful failover. + */ + private long delayJoiningUntilNanotime = 0; + + /** Executor on which {@link #scheduleRecheck(long)} schedules events */ + private ScheduledExecutorService delayExecutor = + Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("ZKFC Delay timer #%d") + .build()); + + private ActiveAttemptRecord lastActiveAttemptRecord; + private Object activeAttemptRecordLock = new Object(); + + protected ZKFailoverController(Configuration conf, HAServiceTarget localTarget) { + this.localTarget = localTarget; this.conf = conf; - localTarget = getLocalTarget(); } protected abstract byte[] targetToData(HAServiceTarget target); - protected abstract HAServiceTarget getLocalTarget(); protected abstract HAServiceTarget dataToTarget(byte[] data); + protected abstract void loginAsFCUser() throws IOException; + protected abstract void checkRpcAdminAccess() + throws AccessControlException, IOException; + protected abstract InetSocketAddress getRpcAddressToBindTo(); + protected abstract PolicyProvider getPolicyProvider(); + /** + * Return the name of a znode inside the configured parent znode in which + * the ZKFC will do all of its work. This is so that multiple federated + * nameservices can run on the same ZK quorum without having to manually + * configure them to separate subdirectories. + */ + protected abstract String getScopeInsideParentNode(); - @Override - public Configuration getConf() { - return conf; + public HAServiceTarget getLocalTarget() { + return localTarget; } - - @Override + public int run(final String[] args) throws Exception { - // TODO: need to hook DFS here to find the NN keytab info, etc, - // similar to what DFSHAAdmin does. Annoying that this is in common. + if (!localTarget.isAutoFailoverEnabled()) { + LOG.fatal("Automatic failover is not enabled for " + localTarget + "." + + " Please ensure that automatic failover is enabled in the " + + "configuration before running the ZK failover controller."); + return ERR_CODE_AUTO_FAILOVER_NOT_ENABLED; + } + loginAsFCUser(); try { return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction() { @Override @@ -99,6 +163,10 @@ public abstract class ZKFailoverControll return doRun(args); } catch (Exception t) { throw new RuntimeException(t); + } finally { + if (elector != null) { + elector.terminateConnection(); + } } } }); @@ -107,6 +175,7 @@ public abstract class ZKFailoverControll } } + private int doRun(String[] args) throws HadoopIllegalArgumentException, IOException, InterruptedException { initZK(); @@ -129,11 +198,23 @@ public abstract class ZKFailoverControll } } - if (!elector.parentZNodeExists()) { - LOG.fatal("Unable to start failover controller. " + - "Parent znode does not exist.\n" + - "Run with -formatZK flag to initialize ZooKeeper."); - return ERR_CODE_NO_PARENT_ZNODE; + try { + if (!elector.parentZNodeExists()) { + LOG.fatal("Unable to start failover controller. " + + "Parent znode does not exist.\n" + + "Run with -formatZK flag to initialize ZooKeeper."); + return ERR_CODE_NO_PARENT_ZNODE; + } + } catch (IOException ioe) { + if (ioe.getCause() instanceof KeeperException.ConnectionLossException) { + LOG.fatal("Unable to start failover controller. Unable to connect " + + "to ZooKeeper quorum at " + zkQuorum + ". Please check the " + + "configured value for " + ZK_QUORUM_KEY + " and ensure that " + + "ZooKeeper is running."); + return ERR_CODE_NO_ZK; + } else { + throw ioe; + } } try { @@ -145,8 +226,18 @@ public abstract class ZKFailoverControll return ERR_CODE_NO_FENCER; } + initRPC(); initHM(); - mainLoop(); + startRPC(); + try { + mainLoop(); + } finally { + rpcServer.stopAndJoin(); + + elector.quitElection(true); + healthMonitor.shutdown(); + healthMonitor.join(); + } return 0; } @@ -181,6 +272,7 @@ public abstract class ZKFailoverControll } private boolean confirmFormat() { + String parentZnode = getParentZnode(); System.err.println( "===============================================\n" + "The configured parent znode " + parentZnode + " already exists.\n" + @@ -206,16 +298,40 @@ public abstract class ZKFailoverControll healthMonitor.addCallback(new HealthCallbacks()); healthMonitor.start(); } + + protected void initRPC() throws IOException { + InetSocketAddress bindAddr = getRpcAddressToBindTo(); + rpcServer = new ZKFCRpcServer(conf, bindAddr, this, getPolicyProvider()); + } + + protected void startRPC() throws IOException { + rpcServer.start(); + } + private void initZK() throws HadoopIllegalArgumentException, IOException { - String zkQuorum = conf.get(ZK_QUORUM_KEY); + zkQuorum = conf.get(ZK_QUORUM_KEY); int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY, ZK_SESSION_TIMEOUT_DEFAULT); - parentZnode = conf.get(ZK_PARENT_ZNODE_KEY, - ZK_PARENT_ZNODE_DEFAULT); - // TODO: need ZK ACL support in config, also maybe auth! - List zkAcls = Ids.OPEN_ACL_UNSAFE; + // Parse ACLs from configuration. + String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT); + zkAclConf = HAZKUtil.resolveConfIndirection(zkAclConf); + List zkAcls = HAZKUtil.parseACLs(zkAclConf); + if (zkAcls.isEmpty()) { + zkAcls = Ids.CREATOR_ALL_ACL; + } + + // Parse authentication from configuration. + String zkAuthConf = conf.get(ZK_AUTH_KEY); + zkAuthConf = HAZKUtil.resolveConfIndirection(zkAuthConf); + List zkAuths; + if (zkAuthConf != null) { + zkAuths = HAZKUtil.parseAuth(zkAuthConf); + } else { + zkAuths = Collections.emptyList(); + } + // Sanity check configuration. Preconditions.checkArgument(zkQuorum != null, "Missing required configuration '%s' for ZooKeeper quorum", ZK_QUORUM_KEY); @@ -224,9 +340,19 @@ public abstract class ZKFailoverControll elector = new ActiveStandbyElector(zkQuorum, - zkTimeout, parentZnode, zkAcls, new ElectorCallbacks()); + zkTimeout, getParentZnode(), zkAcls, zkAuths, + new ElectorCallbacks()); } + private String getParentZnode() { + String znode = conf.get(ZK_PARENT_ZNODE_KEY, + ZK_PARENT_ZNODE_DEFAULT); + if (!znode.endsWith("/")) { + znode += "/"; + } + return znode + getScopeInsideParentNode(); + } + private synchronized void mainLoop() throws InterruptedException { while (fatalError == null) { wait(); @@ -242,16 +368,30 @@ public abstract class ZKFailoverControll notifyAll(); } - private synchronized void becomeActive() { + private synchronized void becomeActive() throws ServiceFailedException { LOG.info("Trying to make " + localTarget + " active..."); try { HAServiceProtocolHelper.transitionToActive(localTarget.getProxy( - conf, FailoverController.getRpcTimeoutToNewActive(conf))); - LOG.info("Successfully transitioned " + localTarget + - " to active state"); + conf, FailoverController.getRpcTimeoutToNewActive(conf)), + createReqInfo()); + String msg = "Successfully transitioned " + localTarget + + " to active state"; + LOG.info(msg); + recordActiveAttempt(new ActiveAttemptRecord(true, msg)); + } catch (Throwable t) { - LOG.fatal("Couldn't make " + localTarget + " active", t); - elector.quitElection(true); + String msg = "Couldn't make " + localTarget + " active"; + LOG.fatal(msg, t); + + recordActiveAttempt(new ActiveAttemptRecord(false, msg + "\n" + + StringUtils.stringifyException(t))); + + if (t instanceof ServiceFailedException) { + throw (ServiceFailedException)t; + } else { + throw new ServiceFailedException("Couldn't transition to active", + t); + } /* * TODO: * we need to make sure that if we get fenced and then quickly restarted, @@ -264,12 +404,79 @@ public abstract class ZKFailoverControll } } + /** + * Store the results of the last attempt to become active. + * This is used so that, during manually initiated failover, + * we can report back the results of the attempt to become active + * to the initiator of the failover. + */ + private void recordActiveAttempt( + ActiveAttemptRecord record) { + synchronized (activeAttemptRecordLock) { + lastActiveAttemptRecord = record; + activeAttemptRecordLock.notifyAll(); + } + } + + /** + * Wait until one of the following events: + *
    + *
  • Another thread publishes the results of an attempt to become active + * using {@link #recordActiveAttempt(ActiveAttemptRecord)}
  • + *
  • The node enters bad health status
  • + *
  • The specified timeout elapses
  • + *
+ * + * @param timeoutMillis number of millis to wait + * @return the published record, or null if the timeout elapses or the + * service becomes unhealthy + * @throws InterruptedException if the thread is interrupted. + */ + private ActiveAttemptRecord waitForActiveAttempt(int timeoutMillis) + throws InterruptedException { + long st = System.nanoTime(); + long waitUntil = st + TimeUnit.NANOSECONDS.convert( + timeoutMillis, TimeUnit.MILLISECONDS); + + do { + // periodically check health state, because entering an + // unhealthy state could prevent us from ever attempting to + // become active. We can detect this and respond to the user + // immediately. + synchronized (this) { + if (lastHealthState != State.SERVICE_HEALTHY) { + // early out if service became unhealthy + return null; + } + } + + synchronized (activeAttemptRecordLock) { + if ((lastActiveAttemptRecord != null && + lastActiveAttemptRecord.nanoTime >= st)) { + return lastActiveAttemptRecord; + } + // Only wait 1sec so that we periodically recheck the health state + // above. + activeAttemptRecordLock.wait(1000); + } + } while (System.nanoTime() < waitUntil); + + // Timeout elapsed. + LOG.warn(timeoutMillis + "ms timeout elapsed waiting for an attempt " + + "to become active"); + return null; + } + + private StateChangeRequestInfo createReqInfo() { + return new StateChangeRequestInfo(RequestSource.REQUEST_BY_ZKFC); + } + private synchronized void becomeStandby() { LOG.info("ZK Election indicated that " + localTarget + " should become standby"); try { int timeout = FailoverController.getGracefulFenceTimeout(conf); - localTarget.getProxy(conf, timeout).transitionToStandby(); + localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo()); LOG.info("Successfully transitioned " + localTarget + " to standby state"); } catch (Exception e) { @@ -279,27 +486,336 @@ public abstract class ZKFailoverControll // at the same time. } } + + + private synchronized void fenceOldActive(byte[] data) { + HAServiceTarget target = dataToTarget(data); + + try { + doFence(target); + } catch (Throwable t) { + recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t))); + Throwables.propagate(t); + } + } + + private void doFence(HAServiceTarget target) { + LOG.info("Should fence: " + target); + boolean gracefulWorked = new FailoverController(conf, + RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target); + if (gracefulWorked) { + // It's possible that it's in standby but just about to go into active, + // no? Is there some race here? + LOG.info("Successfully transitioned " + target + " to standby " + + "state without fencing"); + return; + } + + try { + target.checkFencingConfigured(); + } catch (BadFencingConfigurationException e) { + LOG.error("Couldn't fence old active " + target, e); + recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active")); + throw new RuntimeException(e); + } + + if (!target.getFencer().fence(target)) { + throw new RuntimeException("Unable to fence " + target); + } + } + + + /** + * Request from graceful failover to cede active role. Causes + * this ZKFC to transition its local node to standby, then quit + * the election for the specified period of time, after which it + * will rejoin iff it is healthy. + */ + void cedeActive(final int millisToCede) + throws AccessControlException, ServiceFailedException, IOException { + try { + UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + doCedeActive(millisToCede); + return null; + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private void doCedeActive(int millisToCede) + throws AccessControlException, ServiceFailedException, IOException { + int timeout = FailoverController.getGracefulFenceTimeout(conf); + + // Lock elector to maintain lock ordering of elector -> ZKFC + synchronized (elector) { + synchronized (this) { + if (millisToCede <= 0) { + delayJoiningUntilNanotime = 0; + recheckElectability(); + return; + } + + LOG.info("Requested by " + UserGroupInformation.getCurrentUser() + + " at " + Server.getRemoteAddress() + " to cede active role."); + boolean needFence = false; + try { + localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo()); + LOG.info("Successfully ensured local node is in standby mode"); + } catch (IOException ioe) { + LOG.warn("Unable to transition local node to standby: " + + ioe.getLocalizedMessage()); + LOG.warn("Quitting election but indicating that fencing is " + + "necessary"); + needFence = true; + } + delayJoiningUntilNanotime = System.nanoTime() + + TimeUnit.MILLISECONDS.toNanos(millisToCede); + elector.quitElection(needFence); + } + } + recheckElectability(); + } + + /** + * Coordinate a graceful failover to this node. + * @throws ServiceFailedException if the node fails to become active + * @throws IOException some other error occurs + */ + void gracefulFailoverToYou() throws ServiceFailedException, IOException { + try { + UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + doGracefulFailover(); + return null; + } + + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + /** + * Coordinate a graceful failover. This proceeds in several phases: + * 1) Pre-flight checks: ensure that the local node is healthy, and + * thus a candidate for failover. + * 2) Determine the current active node. If it is the local node, no + * need to failover - return success. + * 3) Ask that node to yield from the election for a number of seconds. + * 4) Allow the normal election path to run in other threads. Wait until + * we either become unhealthy or we see an election attempt recorded by + * the normal code path. + * 5) Allow the old active to rejoin the election, so a future + * failback is possible. + */ + private void doGracefulFailover() + throws ServiceFailedException, IOException, InterruptedException { + int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2; + + // Phase 1: pre-flight checks + checkEligibleForFailover(); + + // Phase 2: determine old/current active node. Check that we're not + // ourselves active, etc. + HAServiceTarget oldActive = getCurrentActive(); + if (oldActive == null) { + // No node is currently active. So, if we aren't already + // active ourselves by means of a normal election, then there's + // probably something preventing us from becoming active. + throw new ServiceFailedException( + "No other node is currently active."); + } + + if (oldActive.getAddress().equals(localTarget.getAddress())) { + LOG.info("Local node " + localTarget + " is already active. " + + "No need to failover. Returning success."); + return; + } + + // Phase 3: ask the old active to yield from the election. + LOG.info("Asking " + oldActive + " to cede its active state for " + + timeout + "ms"); + ZKFCProtocol oldZkfc = oldActive.getZKFCProxy(conf, timeout); + oldZkfc.cedeActive(timeout); + + // Phase 4: wait for the normal election to make the local node + // active. + ActiveAttemptRecord attempt = waitForActiveAttempt(timeout + 60000); + + if (attempt == null) { + // We didn't even make an attempt to become active. + synchronized(this) { + if (lastHealthState != State.SERVICE_HEALTHY) { + throw new ServiceFailedException("Unable to become active. " + + "Service became unhealthy while trying to failover."); + } + } + + throw new ServiceFailedException("Unable to become active. " + + "Local node did not get an opportunity to do so from ZooKeeper, " + + "or the local node took too long to transition to active."); + } + + // Phase 5. At this point, we made some attempt to become active. So we + // can tell the old active to rejoin if it wants. This allows a quick + // fail-back if we immediately crash. + oldZkfc.cedeActive(-1); + + if (attempt.succeeded) { + LOG.info("Successfully became active. " + attempt.status); + } else { + // Propagate failure + String msg = "Failed to become active. " + attempt.status; + throw new ServiceFailedException(msg); + } + } + + /** + * Ensure that the local node is in a healthy state, and thus + * eligible for graceful failover. + * @throws ServiceFailedException if the node is unhealthy + */ + private synchronized void checkEligibleForFailover() + throws ServiceFailedException { + // Check health + if (this.getLastHealthState() != State.SERVICE_HEALTHY) { + throw new ServiceFailedException( + localTarget + " is not currently healthy. " + + "Cannot be failover target"); + } + } + + /** + * @return an {@link HAServiceTarget} for the current active node + * in the cluster, or null if no node is active. + * @throws IOException if a ZK-related issue occurs + * @throws InterruptedException if thread is interrupted + */ + private HAServiceTarget getCurrentActive() + throws IOException, InterruptedException { + synchronized (elector) { + synchronized (this) { + byte[] activeData; + try { + activeData = elector.getActiveData(); + } catch (ActiveNotFoundException e) { + return null; + } catch (KeeperException ke) { + throw new IOException( + "Unexpected ZooKeeper issue fetching active node info", ke); + } + + HAServiceTarget oldActive = dataToTarget(activeData); + return oldActive; + } + } + } + + /** + * Check the current state of the service, and join the election + * if it should be in the election. + */ + private void recheckElectability() { + // Maintain lock ordering of elector -> ZKFC + synchronized (elector) { + synchronized (this) { + boolean healthy = lastHealthState == State.SERVICE_HEALTHY; + + long remainingDelay = delayJoiningUntilNanotime - System.nanoTime(); + if (remainingDelay > 0) { + if (healthy) { + LOG.info("Would have joined master election, but this node is " + + "prohibited from doing so for " + + TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms"); + } + scheduleRecheck(remainingDelay); + return; + } + + switch (lastHealthState) { + case SERVICE_HEALTHY: + elector.joinElection(targetToData(localTarget)); + break; + + case INITIALIZING: + LOG.info("Ensuring that " + localTarget + " does not " + + "participate in active master election"); + elector.quitElection(false); + break; + + case SERVICE_UNHEALTHY: + case SERVICE_NOT_RESPONDING: + LOG.info("Quitting master election for " + localTarget + + " and marking that fencing is necessary"); + elector.quitElection(true); + break; + + case HEALTH_MONITOR_FAILED: + fatalError("Health monitor failed!"); + break; + + default: + throw new IllegalArgumentException("Unhandled state:" + lastHealthState); + } + } + } + } + + /** + * Schedule a call to {@link #recheckElectability()} in the future. + */ + private void scheduleRecheck(long whenNanos) { + delayExecutor.schedule( + new Runnable() { + @Override + public void run() { + try { + recheckElectability(); + } catch (Throwable t) { + fatalError("Failed to recheck electability: " + + StringUtils.stringifyException(t)); + } + } + }, + whenNanos, TimeUnit.NANOSECONDS); + } /** * @return the last health state passed to the FC * by the HealthMonitor. */ @VisibleForTesting - State getLastHealthState() { + synchronized State getLastHealthState() { return lastHealthState; } + + private synchronized void setLastHealthState(HealthMonitor.State newState) { + LOG.info("Local service " + localTarget + + " entered state: " + newState); + lastHealthState = newState; + } @VisibleForTesting ActiveStandbyElector getElectorForTests() { return elector; } + + @VisibleForTesting + ZKFCRpcServer getRpcServerForTests() { + return rpcServer; + } /** * Callbacks from elector */ class ElectorCallbacks implements ActiveStandbyElectorCallback { @Override - public void becomeActive() { + public void becomeActive() throws ServiceFailedException { ZKFailoverController.this.becomeActive(); } @@ -319,31 +835,13 @@ public abstract class ZKFailoverControll @Override public void fenceOldActive(byte[] data) { - HAServiceTarget target = dataToTarget(data); - - LOG.info("Should fence: " + target); - boolean gracefulWorked = new FailoverController(conf) - .tryGracefulFence(target); - if (gracefulWorked) { - // It's possible that it's in standby but just about to go into active, - // no? Is there some race here? - LOG.info("Successfully transitioned " + target + " to standby " + - "state without fencing"); - return; - } - - try { - target.checkFencingConfigured(); - } catch (BadFencingConfigurationException e) { - LOG.error("Couldn't fence old active " + target, e); - // TODO: see below todo - throw new RuntimeException(e); - } - - if (!target.getFencer().fence(target)) { - // TODO: this will end up in some kind of tight loop, - // won't it? We need some kind of backoff - throw new RuntimeException("Unable to fence " + target); + ZKFailoverController.this.fenceOldActive(data); + } + + @Override + public String toString() { + synchronized (ZKFailoverController.this) { + return "Elector callbacks for " + localTarget; } } } @@ -354,36 +852,21 @@ public abstract class ZKFailoverControll class HealthCallbacks implements HealthMonitor.Callback { @Override public void enteredState(HealthMonitor.State newState) { - LOG.info("Local service " + localTarget + - " entered state: " + newState); - switch (newState) { - case SERVICE_HEALTHY: - LOG.info("Joining master election for " + localTarget); - elector.joinElection(targetToData(localTarget)); - break; - - case INITIALIZING: - LOG.info("Ensuring that " + localTarget + " does not " + - "participate in active master election"); - elector.quitElection(false); - break; - - case SERVICE_UNHEALTHY: - case SERVICE_NOT_RESPONDING: - LOG.info("Quitting master election for " + localTarget + - " and marking that fencing is necessary"); - elector.quitElection(true); - break; - - case HEALTH_MONITOR_FAILED: - fatalError("Health monitor failed!"); - break; - - default: - throw new IllegalArgumentException("Unhandled state:" + newState); - } - - lastHealthState = newState; + setLastHealthState(newState); + recheckElectability(); } } + + private static class ActiveAttemptRecord { + private final boolean succeeded; + private final String status; + private final long nanoTime; + + public ActiveAttemptRecord(boolean succeeded, String status) { + this.succeeded = succeeded; + this.status = status; + this.nanoTime = System.nanoTime(); + } + } + } Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java Wed Jun 6 00:17:38 2012 @@ -30,13 +30,14 @@ import org.apache.hadoop.ha.HAServicePro import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusRequestProto; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusResponseProto; +import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAStateChangeRequestInfoProto; +import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HARequestSource; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthRequestProto; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToActiveRequestProto; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyRequestProto; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; @@ -57,10 +58,6 @@ public class HAServiceProtocolClientSide private final static RpcController NULL_CONTROLLER = null; private final static MonitorHealthRequestProto MONITOR_HEALTH_REQ = MonitorHealthRequestProto.newBuilder().build(); - private final static TransitionToActiveRequestProto TRANSITION_TO_ACTIVE_REQ = - TransitionToActiveRequestProto.newBuilder().build(); - private final static TransitionToStandbyRequestProto TRANSITION_TO_STANDBY_REQ = - TransitionToStandbyRequestProto.newBuilder().build(); private final static GetServiceStatusRequestProto GET_SERVICE_STATUS_REQ = GetServiceStatusRequestProto.newBuilder().build(); @@ -94,18 +91,25 @@ public class HAServiceProtocolClientSide } @Override - public void transitionToActive() throws IOException { + public void transitionToActive(StateChangeRequestInfo reqInfo) throws IOException { try { - rpcProxy.transitionToActive(NULL_CONTROLLER, TRANSITION_TO_ACTIVE_REQ); + TransitionToActiveRequestProto req = + TransitionToActiveRequestProto.newBuilder() + .setReqInfo(convert(reqInfo)).build(); + + rpcProxy.transitionToActive(NULL_CONTROLLER, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } } @Override - public void transitionToStandby() throws IOException { + public void transitionToStandby(StateChangeRequestInfo reqInfo) throws IOException { try { - rpcProxy.transitionToStandby(NULL_CONTROLLER, TRANSITION_TO_STANDBY_REQ); + TransitionToStandbyRequestProto req = + TransitionToStandbyRequestProto.newBuilder() + .setReqInfo(convert(reqInfo)).build(); + rpcProxy.transitionToStandby(NULL_CONTROLLER, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -143,6 +147,27 @@ public class HAServiceProtocolClientSide } } + private HAStateChangeRequestInfoProto convert(StateChangeRequestInfo reqInfo) { + HARequestSource src; + switch (reqInfo.getSource()) { + case REQUEST_BY_USER: + src = HARequestSource.REQUEST_BY_USER; + break; + case REQUEST_BY_USER_FORCED: + src = HARequestSource.REQUEST_BY_USER_FORCED; + break; + case REQUEST_BY_ZKFC: + src = HARequestSource.REQUEST_BY_ZKFC; + break; + default: + throw new IllegalArgumentException("Bad source: " + reqInfo.getSource()); + } + return HAStateChangeRequestInfoProto.newBuilder() + .setReqSource(src) + .build(); + } + + @Override public void close() { RPC.stopProxy(rpcProxy); Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java Wed Jun 6 00:17:38 2012 @@ -19,12 +19,17 @@ package org.apache.hadoop.ha.protocolPB; import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; +import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusRequestProto; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusResponseProto; +import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAStateChangeRequestInfoProto; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthRequestProto; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthResponseProto; @@ -56,6 +61,8 @@ public class HAServiceProtocolServerSide TransitionToActiveResponseProto.newBuilder().build(); private static final TransitionToStandbyResponseProto TRANSITION_TO_STANDBY_RESP = TransitionToStandbyResponseProto.newBuilder().build(); + private static final Log LOG = LogFactory.getLog( + HAServiceProtocolServerSideTranslatorPB.class); public HAServiceProtocolServerSideTranslatorPB(HAServiceProtocol server) { this.server = server; @@ -71,13 +78,33 @@ public class HAServiceProtocolServerSide throw new ServiceException(e); } } + + private StateChangeRequestInfo convert(HAStateChangeRequestInfoProto proto) { + RequestSource src; + switch (proto.getReqSource()) { + case REQUEST_BY_USER: + src = RequestSource.REQUEST_BY_USER; + break; + case REQUEST_BY_USER_FORCED: + src = RequestSource.REQUEST_BY_USER_FORCED; + break; + case REQUEST_BY_ZKFC: + src = RequestSource.REQUEST_BY_ZKFC; + break; + default: + LOG.warn("Unknown request source: " + proto.getReqSource()); + src = null; + } + + return new StateChangeRequestInfo(src); + } @Override public TransitionToActiveResponseProto transitionToActive( RpcController controller, TransitionToActiveRequestProto request) throws ServiceException { try { - server.transitionToActive(); + server.transitionToActive(convert(request.getReqInfo())); return TRANSITION_TO_ACTIVE_RESP; } catch(IOException e) { throw new ServiceException(e); @@ -89,7 +116,7 @@ public class HAServiceProtocolServerSide RpcController controller, TransitionToStandbyRequestProto request) throws ServiceException { try { - server.transitionToStandby(); + server.transitionToStandby(convert(request.getReqInfo())); return TRANSITION_TO_STANDBY_RESP; } catch(IOException e) { throw new ServiceException(e); Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java Wed Jun 6 00:17:38 2012 @@ -96,7 +96,7 @@ public class HttpServer implements Filte // The ServletContext attribute where the daemon Configuration // gets stored. public static final String CONF_CONTEXT_ATTRIBUTE = "hadoop.conf"; - static final String ADMINS_ACL = "admins.acl"; + public static final String ADMINS_ACL = "admins.acl"; public static final String SPNEGO_FILTER = "SpnegoFilter"; public static final String BIND_ADDRESS = "bind.address"; @@ -792,7 +792,7 @@ public class HttpServer implements Filte * * @param servletContext * @param request - * @param response + * @param response used to send the error response if user does not have admin access. * @return true if admin-authorized, false otherwise * @throws IOException */ @@ -814,18 +814,33 @@ public class HttpServer implements Filte "authorized to access this page."); return false; } + + if (servletContext.getAttribute(ADMINS_ACL) != null && + !userHasAdministratorAccess(servletContext, remoteUser)) { + response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "User " + + remoteUser + " is unauthorized to access this page."); + return false; + } + + return true; + } + + /** + * Get the admin ACLs from the given ServletContext and check if the given + * user is in the ACL. + * + * @param servletContext the context containing the admin ACL. + * @param remoteUser the remote user to check for. + * @return true if the user is present in the ACL, false if no ACL is set or + * the user is not present + */ + public static boolean userHasAdministratorAccess(ServletContext servletContext, + String remoteUser) { AccessControlList adminsAcl = (AccessControlList) servletContext .getAttribute(ADMINS_ACL); UserGroupInformation remoteUserUGI = UserGroupInformation.createRemoteUser(remoteUser); - if (adminsAcl != null) { - if (!adminsAcl.isUserAllowed(remoteUserUGI)) { - response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "User " - + remoteUser + " is unauthorized to access this page."); - return false; - } - } - return true; + return adminsAcl != null && adminsAcl.isUserAllowed(remoteUserUGI); } /** Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/lib/StaticUserWebFilter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/lib/StaticUserWebFilter.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/lib/StaticUserWebFilter.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/lib/StaticUserWebFilter.java Wed Jun 6 00:17:38 2012 @@ -37,15 +37,15 @@ import org.apache.hadoop.http.FilterInit import javax.servlet.Filter; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; +import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; + /** * Provides a servlet filter that pretends to authenticate a fake user (Dr.Who) * so that the web UI is usable for a secure cluster without authentication. */ public class StaticUserWebFilter extends FilterInitializer { static final String DEPRECATED_UGI_KEY = "dfs.web.ugi"; - - static final String USERNAME_KEY = "hadoop.http.staticuser.user"; - static final String USERNAME_DEFAULT = "dr.who"; private static final Log LOG = LogFactory.getLog(StaticUserWebFilter.class); @@ -112,7 +112,7 @@ public class StaticUserWebFilter extends @Override public void init(FilterConfig conf) throws ServletException { - this.username = conf.getInitParameter(USERNAME_KEY); + this.username = conf.getInitParameter(HADOOP_HTTP_STATIC_USER); this.user = new User(username); } @@ -123,7 +123,7 @@ public class StaticUserWebFilter extends HashMap options = new HashMap(); String username = getUsernameFromConf(conf); - options.put(USERNAME_KEY, username); + options.put(HADOOP_HTTP_STATIC_USER, username); container.addFilter("static_user_filter", StaticUserFilter.class.getName(), @@ -139,11 +139,12 @@ public class StaticUserWebFilter extends // We can't use the normal configuration deprecation mechanism here // since we need to split out the username from the configured UGI. LOG.warn(DEPRECATED_UGI_KEY + " should not be used. Instead, use " + - USERNAME_KEY + "."); + HADOOP_HTTP_STATIC_USER + "."); String[] parts = oldStyleUgi.split(","); return parts[0]; } else { - return conf.get(USERNAME_KEY, USERNAME_DEFAULT); + return conf.get(HADOOP_HTTP_STATIC_USER, + DEFAULT_HADOOP_HTTP_STATIC_USER); } } Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java Wed Jun 6 00:17:38 2012 @@ -807,7 +807,7 @@ public class SequenceFile { } /** Write key/value pairs to a sequence-format file. */ - public static class Writer implements java.io.Closeable { + public static class Writer implements java.io.Closeable, Syncable { private Configuration conf; FSDataOutputStream out; boolean ownOutputStream = true; @@ -1193,13 +1193,31 @@ public class SequenceFile { } } - /** flush all currently written data to the file system */ + /** + * flush all currently written data to the file system + * @deprecated Use {@link #hsync()} or {@link #hflush()} instead + */ + @Deprecated public void syncFs() throws IOException { if (out != null) { out.hflush(); // flush contents to file system } } + @Override + public void hsync() throws IOException { + if (out != null) { + out.hsync(); + } + } + + @Override + public void hflush() throws IOException { + if (out != null) { + out.hflush(); + } + } + /** Returns the configuration of this file. */ Configuration getConf() { return conf; } Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Text.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Text.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Text.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Text.java Wed Jun 6 00:17:38 2012 @@ -236,6 +236,11 @@ public class Text extends BinaryComparab /** * Clear the string to empty. + * + * Note: For performance reasons, this call does not clear the + * underlying byte array that is retrievable via {@link #getBytes()}. + * In order to free the byte-array memory, call {@link #set(byte[])} + * with an empty byte array (For example, new byte[0]). */ public void clear() { length = 0; Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Writable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Writable.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Writable.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Writable.java Wed Jun 6 00:17:38 2012 @@ -39,20 +39,23 @@ import org.apache.hadoop.classification. *

Example:

*

  *     public class MyWritable implements Writable {
- *       // Some data     
+ *       // Some data
  *       private int counter;
  *       private long timestamp;
- *       
+ *
+ *       // Default constructor to allow (de)serialization
+ *       MyWritable() { }
+ *
  *       public void write(DataOutput out) throws IOException {
  *         out.writeInt(counter);
  *         out.writeLong(timestamp);
  *       }
- *       
+ *
  *       public void readFields(DataInput in) throws IOException {
  *         counter = in.readInt();
  *         timestamp = in.readLong();
  *       }
- *       
+ *
  *       public static MyWritable read(DataInput in) throws IOException {
  *         MyWritable w = new MyWritable();
  *         w.readFields(in);

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java Wed Jun  6 00:17:38 2012
@@ -109,8 +109,12 @@ public class CompressionCodecFactory {
     List> result
       = new ArrayList>();
     // Add codec classes discovered via service loading
-    for (CompressionCodec codec : CODEC_PROVIDERS) {
-      result.add(codec.getClass());
+    synchronized (CODEC_PROVIDERS) {
+      // CODEC_PROVIDERS is a lazy collection. Synchronize so it is
+      // thread-safe. See HADOOP-8406.
+      for (CompressionCodec codec : CODEC_PROVIDERS) {
+        result.add(codec.getClass());
+      }
     }
     // Add codec classes from configuration
     String codecsString = conf.get("io.compression.codecs");

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Wed Jun  6 00:17:38 2012
@@ -53,6 +53,8 @@ import org.apache.hadoop.fs.CommonConfig
 import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
 import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto;
+import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto;
+import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -845,24 +847,24 @@ public class Client {
       touch();
       
       try {
-        int id = in.readInt();                    // try to read an id
-
+        RpcResponseHeaderProto response = 
+            RpcResponseHeaderProto.parseDelimitedFrom(in);
+        int callId = response.getCallId();
         if (LOG.isDebugEnabled())
-          LOG.debug(getName() + " got value #" + id);
-
-        Call call = calls.get(id);
+          LOG.debug(getName() + " got value #" + callId);
 
-        int state = in.readInt();     // read call status
-        if (state == Status.SUCCESS.state) {
+        Call call = calls.get(callId);
+        RpcStatusProto status = response.getStatus();
+        if (status == RpcStatusProto.SUCCESS) {
           Writable value = ReflectionUtils.newInstance(valueClass, conf);
           value.readFields(in);                 // read value
           call.setRpcResponse(value);
-          calls.remove(id);
-        } else if (state == Status.ERROR.state) {
+          calls.remove(callId);
+        } else if (status == RpcStatusProto.ERROR) {
           call.setException(new RemoteException(WritableUtils.readString(in),
                                                 WritableUtils.readString(in)));
-          calls.remove(id);
-        } else if (state == Status.FATAL.state) {
+          calls.remove(callId);
+        } else if (status == RpcStatusProto.FATAL) {
           // Close the connection
           markClosed(new RemoteException(WritableUtils.readString(in), 
                                          WritableUtils.readString(in)));

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Wed Jun  6 00:17:38 2012
@@ -396,24 +396,44 @@ public class ProtobufRpcEngine implement
        * it is.
        * 
        */
-      public Writable call(RPC.Server server, String protocol,
+      public Writable call(RPC.Server server, String connectionProtocolName,
           Writable writableRequest, long receiveTime) throws Exception {
         RpcRequestWritable request = (RpcRequestWritable) writableRequest;
         HadoopRpcRequestProto rpcRequest = request.message;
         String methodName = rpcRequest.getMethodName();
-        String protoName = rpcRequest.getDeclaringClassProtocolName();
+        
+        
+        /** 
+         * RPCs for a particular interface (ie protocol) are done using a
+         * IPC connection that is setup using rpcProxy.
+         * The rpcProxy's has a declared protocol name that is 
+         * sent form client to server at connection time. 
+         * 
+         * Each Rpc call also sends a protocol name 
+         * (called declaringClassprotocolName). This name is usually the same
+         * as the connection protocol name except in some cases. 
+         * For example metaProtocols such ProtocolInfoProto which get info
+         * about the protocol reuse the connection but need to indicate that
+         * the actual protocol is different (i.e. the protocol is
+         * ProtocolInfoProto) since they reuse the connection; in this case
+         * the declaringClassProtocolName field is set to the ProtocolInfoProto.
+         */
+
+        String declaringClassProtoName = 
+            rpcRequest.getDeclaringClassProtocolName();
         long clientVersion = rpcRequest.getClientProtocolVersion();
         if (server.verbose)
-          LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
+          LOG.info("Call: connectionProtocolName=" + connectionProtocolName + 
+              ", method=" + methodName);
         
-        ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,
-            clientVersion);
+        ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, 
+                              declaringClassProtoName, clientVersion);
         BlockingService service = (BlockingService) protocolImpl.protocolImpl;
         MethodDescriptor methodDescriptor = service.getDescriptorForType()
             .findMethodByName(methodName);
         if (methodDescriptor == null) {
-          String msg = "Unknown method " + methodName + " called on " + protocol
-              + " protocol.";
+          String msg = "Unknown method " + methodName + " called on " 
+                                + connectionProtocolName + " protocol.";
           LOG.warn(msg);
           throw new RpcServerException(msg);
         }

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Wed Jun  6 00:17:38 2012
@@ -1339,7 +1339,7 @@ public abstract class Server {
               + CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
               + ") is configured as simple. Please configure another method "
               + "like kerberos or digest.");
-            setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
+            setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL,
                 null, ae.getClass().getName(), ae.getMessage());
             responder.doRespond(authFailedCall);
             throw ae;
@@ -1420,7 +1420,7 @@ public abstract class Server {
         Call fakeCall =  new Call(-1, null, this);
         // Versions 3 and greater can interpret this exception
         // response in the same manner
-        setupResponse(buffer, fakeCall, Status.FATAL,
+        setupResponseOldVersionFatal(buffer, fakeCall,
             null, VersionMismatch.class.getName(), errMsg);
 
         responder.doRespond(fakeCall);
@@ -1443,7 +1443,7 @@ public abstract class Server {
       ByteArrayOutputStream buffer = new ByteArrayOutputStream();
 
       Call fakeCall = new Call(-1, null, this);
-      setupResponse(buffer, fakeCall, Status.FATAL, null,
+      setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, null,
           IpcException.class.getName(), errMsg);
       responder.doRespond(fakeCall);
     }
@@ -1579,7 +1579,7 @@ public abstract class Server {
             new Call(header.getCallId(), null, this);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
 
-        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+        setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null,
             IOException.class.getName(),
             "Unknown rpc kind "  + header.getRpcKind());
         responder.doRespond(readParamsFailedCall);
@@ -1597,7 +1597,7 @@ public abstract class Server {
             new Call(header.getCallId(), null, this);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
 
-        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+        setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null,
             t.getClass().getName(),
             "IPC server unable to read call parameters: " + t.getMessage());
         responder.doRespond(readParamsFailedCall);
@@ -1627,7 +1627,7 @@ public abstract class Server {
         rpcMetrics.incrAuthorizationSuccesses();
       } catch (AuthorizationException ae) {
         rpcMetrics.incrAuthorizationFailures();
-        setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
+        setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL, null,
             ae.getClass().getName(), ae.getMessage());
         responder.doRespond(authFailedCall);
         return false;
@@ -1725,8 +1725,8 @@ public abstract class Server {
             // responder.doResponse() since setupResponse may use
             // SASL to encrypt response data and SASL enforces
             // its own message ordering.
-            setupResponse(buf, call, (error == null) ? Status.SUCCESS
-                : Status.ERROR, value, errorClass, error);
+            setupResponse(buf, call, (error == null) ? RpcStatusProto.SUCCESS
+                : RpcStatusProto.ERROR, value, errorClass, error);
             
             // Discard the large buf and reset it back to smaller size 
             // to free up heap
@@ -1859,41 +1859,80 @@ public abstract class Server {
   /**
    * Setup response for the IPC Call.
    * 
-   * @param response buffer to serialize the response into
+   * @param responseBuf buffer to serialize the response into
    * @param call {@link Call} to which we are setting up the response
-   * @param status {@link Status} of the IPC call
+   * @param status of the IPC call
    * @param rv return value for the IPC Call, if the call was successful
    * @param errorClass error class, if the the call failed
    * @param error error message, if the call failed
    * @throws IOException
    */
-  private void setupResponse(ByteArrayOutputStream response, 
-                             Call call, Status status, 
+  private void setupResponse(ByteArrayOutputStream responseBuf,
+                             Call call, RpcStatusProto status, 
                              Writable rv, String errorClass, String error) 
   throws IOException {
-    response.reset();
-    DataOutputStream out = new DataOutputStream(response);
-    out.writeInt(call.callId);                // write call id
-    out.writeInt(status.state);           // write status
+    responseBuf.reset();
+    DataOutputStream out = new DataOutputStream(responseBuf);
+    RpcResponseHeaderProto.Builder response =  
+        RpcResponseHeaderProto.newBuilder();
+    response.setCallId(call.callId);
+    response.setStatus(status);
 
-    if (status == Status.SUCCESS) {
+
+    if (status == RpcStatusProto.SUCCESS) {
       try {
+        response.build().writeDelimitedTo(out);
         rv.write(out);
       } catch (Throwable t) {
         LOG.warn("Error serializing call response for call " + call, t);
         // Call back to same function - this is OK since the
         // buffer is reset at the top, and since status is changed
         // to ERROR it won't infinite loop.
-        setupResponse(response, call, Status.ERROR,
+        setupResponse(responseBuf, call, RpcStatusProto.ERROR,
             null, t.getClass().getName(),
             StringUtils.stringifyException(t));
         return;
       }
     } else {
+      if (status == RpcStatusProto.FATAL) {
+        response.setServerIpcVersionNum(Server.CURRENT_VERSION);
+      }
+      response.build().writeDelimitedTo(out);
       WritableUtils.writeString(out, errorClass);
       WritableUtils.writeString(out, error);
     }
     if (call.connection.useWrap) {
+      wrapWithSasl(responseBuf, call);
+    }
+    call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray()));
+  }
+  
+  /**
+   * Setup response for the IPC Call on Fatal Error from a 
+   * client that is using old version of Hadoop.
+   * The response is serialized using the previous protocol's response
+   * layout.
+   * 
+   * @param response buffer to serialize the response into
+   * @param call {@link Call} to which we are setting up the response
+   * @param rv return value for the IPC Call, if the call was successful
+   * @param errorClass error class, if the the call failed
+   * @param error error message, if the call failed
+   * @throws IOException
+   */
+  private void setupResponseOldVersionFatal(ByteArrayOutputStream response, 
+                             Call call,
+                             Writable rv, String errorClass, String error) 
+  throws IOException {
+    final int OLD_VERSION_FATAL_STATUS = -1;
+    response.reset();
+    DataOutputStream out = new DataOutputStream(response);
+    out.writeInt(call.callId);                // write call id
+    out.writeInt(OLD_VERSION_FATAL_STATUS);   // write FATAL_STATUS
+    WritableUtils.writeString(out, errorClass);
+    WritableUtils.writeString(out, error);
+
+    if (call.connection.useWrap) {
       wrapWithSasl(response, call);
     }
     call.setResponse(ByteBuffer.wrap(response.toByteArray()));

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java Wed Jun  6 00:17:38 2012
@@ -34,6 +34,7 @@ import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import javax.management.ReflectionException;
+import javax.management.RuntimeErrorException;
 import javax.management.RuntimeMBeanException;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeType;
@@ -317,6 +318,11 @@ public class JMXJsonServlet extends Http
         LOG.error("getting attribute "+attName+" of "+oname+" threw an exception", e);
       }
       return;
+    } catch (RuntimeErrorException e) {
+      // RuntimeErrorException happens when an unexpected failure occurs in getAttribute
+      // for example https://issues.apache.org/jira/browse/DAEMON-120
+      LOG.debug("getting attribute "+attName+" of "+oname+" threw an exception", e);
+      return;
     } catch (AttributeNotFoundException e) {
       //Ignored the attribute was not found, which should never happen because the bean
       //just told us that it has this attribute, but if this happens just don't output

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/Node.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/Node.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/Node.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/Node.java Wed Jun  6 00:17:38 2012
@@ -40,6 +40,7 @@ public interface Node {
    * @param location the location
    */
   public void setNetworkLocation(String location);
+
   /** @return this node's name */
   public String getName();
 

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java Wed Jun  6 00:17:38 2012
@@ -110,7 +110,7 @@ public class NodeBase implements Node {
    * @return the path of a node
    */
   public static String getPath(Node node) {
-    return node.getNetworkLocation()+PATH_SEPARATOR_STR+node.getName();
+    return node.getNetworkLocation() + PATH_SEPARATOR_STR + node.getName();
   }
   
   /** @return this node's path as its string representation */

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java Wed Jun  6 00:17:38 2012
@@ -18,10 +18,15 @@
 
 package org.apache.hadoop.security.token;
 
+import com.google.common.collect.Maps;
+
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.ServiceLoader;
 
 import org.apache.commons.codec.binary.Base64;
@@ -37,6 +42,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * The client-side form of the token.
@@ -45,6 +51,9 @@ import org.apache.hadoop.io.WritableUtil
 @InterfaceStability.Evolving
 public class Token implements Writable {
   public static final Log LOG = LogFactory.getLog(Token.class);
+  
+  private static Map> tokenKindMap;
+  
   private byte[] identifier;
   private byte[] password;
   private Text kind;
@@ -100,13 +109,49 @@ public class Token
+      getClassForIdentifier(Text kind) {
+    if (tokenKindMap == null) {
+      tokenKindMap = Maps.newHashMap();
+      for (TokenIdentifier id : ServiceLoader.load(TokenIdentifier.class)) {
+        tokenKindMap.put(id.getKind(), id.getClass());
+      }
+    }
+    Class cls = tokenKindMap.get(kind);
+    if (cls == null) {
+      LOG.warn("Cannot find class for token kind " + kind);
+       return null;
+    }
+    return cls;
+  }
+  
+  /**
+   * Get the token identifier object, or null if it could not be constructed
+   * (because the class could not be loaded, for example).
+   * @return the token identifier, or null
+   * @throws IOException 
+   */
+  @SuppressWarnings("unchecked")
+  public T decodeIdentifier() throws IOException {
+    Class cls = getClassForIdentifier(getKind());
+    if (cls == null) {
+      return null;
+    }
+    TokenIdentifier tokenIdentifier = ReflectionUtils.newInstance(cls, null);
+    ByteArrayInputStream buf = new ByteArrayInputStream(identifier);
+    DataInputStream in = new DataInputStream(buf);  
+    tokenIdentifier.readFields(in);
+    in.close();
+    return (T) tokenIdentifier;
+  }
+  
   /**
    * Get the token password/secret
    * @return the token password/secret
@@ -260,16 +305,31 @@ public class TokenACL for HAService protocol used by HAAdmin to manage the
       active and stand-by states of namenode.
   
+  
+    security.zkfc.protocol.acl
+    *
+    ACL for access to the ZK Failover Controller
+    
+  
 
    
       security.mrhs.client.protocol.acl

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/log4j.properties?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/log4j.properties (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/log4j.properties Wed Jun  6 00:17:38 2012
@@ -102,7 +102,7 @@ log4j.appender.TLA.layout.ConversionPatt
 #
 #Security appender
 #
-hadoop.security.logger=INFO,console
+hadoop.security.logger=INFO,NullAppender
 hadoop.security.log.maxfilesize=256MB
 hadoop.security.log.maxbackupindex=20
 log4j.category.SecurityLogger=${hadoop.security.logger}
@@ -126,7 +126,7 @@ log4j.appender.DRFAS.DatePattern=.yyyy-M
 #
 # hdfs audit logging
 #
-hdfs.audit.logger=INFO,console
+hdfs.audit.logger=INFO,NullAppender
 hdfs.audit.log.maxfilesize=256MB
 hdfs.audit.log.maxbackupindex=20
 log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}
@@ -141,7 +141,7 @@ log4j.appender.RFAAUDIT.MaxBackupIndex=$
 #
 # mapred audit logging
 #
-mapred.audit.logger=INFO,console
+mapred.audit.logger=INFO,NullAppender
 mapred.audit.log.maxfilesize=256MB
 mapred.audit.log.maxbackupindex=20
 log4j.logger.org.apache.hadoop.mapred.AuditLogger=${mapred.audit.logger}

Modified: hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto Wed Jun  6 00:17:38 2012
@@ -27,6 +27,16 @@ enum HAServiceStateProto {
   STANDBY = 2;
 }
 
+enum HARequestSource {
+  REQUEST_BY_USER = 0;
+  REQUEST_BY_USER_FORCED = 1;
+  REQUEST_BY_ZKFC = 2;
+}
+
+message HAStateChangeRequestInfoProto {
+  required HARequestSource reqSource = 1;
+}
+
 /**
  * void request
  */
@@ -43,6 +53,7 @@ message MonitorHealthResponseProto { 
  * void request
  */
 message TransitionToActiveRequestProto { 
+  required HAStateChangeRequestInfoProto reqInfo = 1;
 }
 
 /**
@@ -55,6 +66,7 @@ message TransitionToActiveResponseProto 
  * void request
  */
 message TransitionToStandbyRequestProto { 
+  required HAStateChangeRequestInfoProto reqInfo = 1;
 }
 
 /**