Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 023B098C2 for ; Fri, 18 Nov 2011 07:13:43 +0000 (UTC) Received: (qmail 14624 invoked by uid 500); 18 Nov 2011 07:13:42 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 14552 invoked by uid 500); 18 Nov 2011 07:13:41 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 14517 invoked by uid 99); 18 Nov 2011 07:13:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Nov 2011 07:13:33 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Nov 2011 07:13:28 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3CD7E2388978 for ; Fri, 18 Nov 2011 07:13:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1203512 [2/4] - in /hbase/branches/0.92: ./ conf/ security/ security/src/ security/src/main/ security/src/main/java/ security/src/main/java/org/ security/src/main/java/org/apache/ security/src/main/java/org/apache/hadoop/ security/src/main... Date: Fri, 18 Nov 2011 07:13:05 -0000 To: commits@hbase.apache.org From: garyh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111118071308.3CD7E2388978@eris.apache.org> Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java?rev=1203512&view=auto ============================================================================== --- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java (added) +++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java Fri Nov 18 07:13:03 2011 @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ipc.HMasterInterface; +import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.security.authorize.Service; +import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; + +/** + * Implementation of secure Hadoop policy provider for mapping + * protocol interfaces to hbase-policy.xml entries. + */ +public class HBasePolicyProvider extends PolicyProvider { + protected static Service[] services = { + new Service("security.client.protocol.acl", HRegionInterface.class), + new Service("security.admin.protocol.acl", HMasterInterface.class), + new Service("security.masterregion.protocol.acl", HMasterRegionInterface.class) + }; + + @Override + public Service[] getServices() { + return services; + } + + public static void init(Configuration conf, + ServiceAuthorizationManager authManager) { + // set service-level authorization security policy + conf.set("hadoop.policy.file", "hbase-policy.xml"); + if (conf.getBoolean( + ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) { + authManager.refresh(conf, new HBasePolicyProvider()); + } + } +} Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java?rev=1203512&view=auto ============================================================================== --- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java (added) +++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java Fri Nov 18 07:13:03 2011 @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.RealmCallback; +import javax.security.sasl.RealmChoiceCallback; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslClient; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod; +import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus; +import org.apache.hadoop.security.SaslInputStream; +import org.apache.hadoop.security.SaslOutputStream; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; + +/** + * A utility class that encapsulates SASL logic for RPC client. + * Copied from org.apache.hadoop.security + */ +public class HBaseSaslRpcClient { + public static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class); + + private final SaslClient saslClient; + + /** + * Create a HBaseSaslRpcClient for an authentication method + * + * @param method + * the requested authentication method + * @param token + * token to use if needed by the authentication method + */ + public HBaseSaslRpcClient(AuthMethod method, + Token token, String serverPrincipal) + throws IOException { + switch (method) { + case DIGEST: + if (LOG.isDebugEnabled()) + LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName() + + " client to authenticate to service at " + token.getService()); + saslClient = Sasl.createSaslClient(new String[] { AuthMethod.DIGEST + .getMechanismName() }, null, null, HBaseSaslRpcServer.SASL_DEFAULT_REALM, + HBaseSaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(token)); + break; + case KERBEROS: + if (LOG.isDebugEnabled()) { + LOG + .debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName() + + " client. Server's Kerberos principal name is " + + serverPrincipal); + } + if (serverPrincipal == null || serverPrincipal.length() == 0) { + throw new IOException( + "Failed to specify server's Kerberos principal name"); + } + String names[] = HBaseSaslRpcServer.splitKerberosName(serverPrincipal); + if (names.length != 3) { + throw new IOException( + "Kerberos principal name does NOT have the expected hostname part: " + + serverPrincipal); + } + saslClient = Sasl.createSaslClient(new String[] { AuthMethod.KERBEROS + .getMechanismName() }, null, names[0], names[1], + HBaseSaslRpcServer.SASL_PROPS, null); + break; + default: + throw new IOException("Unknown authentication method " + method); + } + if (saslClient == null) + throw new IOException("Unable to find SASL client implementation"); + } + + private static void readStatus(DataInputStream inStream) throws IOException { + int id = inStream.readInt(); // read and discard dummy id + int status = inStream.readInt(); // read status + if (status != SaslStatus.SUCCESS.state) { + throw new RemoteException(WritableUtils.readString(inStream), + WritableUtils.readString(inStream)); + } + } + + /** + * Do client side SASL authentication with server via the given InputStream + * and OutputStream + * + * @param inS + * InputStream to use + * @param outS + * OutputStream to use + * @return true if connection is set up, or false if needs to switch + * to simple Auth. + * @throws IOException + */ + public boolean saslConnect(InputStream inS, OutputStream outS) + throws IOException { + DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS)); + DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream( + outS)); + + try { + byte[] saslToken = new byte[0]; + if (saslClient.hasInitialResponse()) + saslToken = saslClient.evaluateChallenge(saslToken); + if (saslToken != null) { + outStream.writeInt(saslToken.length); + outStream.write(saslToken, 0, saslToken.length); + outStream.flush(); + if (LOG.isDebugEnabled()) + LOG.debug("Have sent token of size " + saslToken.length + + " from initSASLContext."); + } + if (!saslClient.isComplete()) { + readStatus(inStream); + int len = inStream.readInt(); + if (len == HBaseSaslRpcServer.SWITCH_TO_SIMPLE_AUTH) { + if (LOG.isDebugEnabled()) + LOG.debug("Server asks us to fall back to simple auth."); + saslClient.dispose(); + return false; + } + saslToken = new byte[len]; + if (LOG.isDebugEnabled()) + LOG.debug("Will read input token of size " + saslToken.length + + " for processing by initSASLContext"); + inStream.readFully(saslToken); + } + + while (!saslClient.isComplete()) { + saslToken = saslClient.evaluateChallenge(saslToken); + if (saslToken != null) { + if (LOG.isDebugEnabled()) + LOG.debug("Will send token of size " + saslToken.length + + " from initSASLContext."); + outStream.writeInt(saslToken.length); + outStream.write(saslToken, 0, saslToken.length); + outStream.flush(); + } + if (!saslClient.isComplete()) { + readStatus(inStream); + saslToken = new byte[inStream.readInt()]; + if (LOG.isDebugEnabled()) + LOG.debug("Will read input token of size " + saslToken.length + + " for processing by initSASLContext"); + inStream.readFully(saslToken); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client context established. Negotiated QoP: " + + saslClient.getNegotiatedProperty(Sasl.QOP)); + } + return true; + } catch (IOException e) { + try { + saslClient.dispose(); + } catch (SaslException ignored) { + // ignore further exceptions during cleanup + } + throw e; + } + } + + /** + * Get a SASL wrapped InputStream. Can be called only after saslConnect() has + * been called. + * + * @param in + * the InputStream to wrap + * @return a SASL wrapped InputStream + * @throws IOException + */ + public InputStream getInputStream(InputStream in) throws IOException { + if (!saslClient.isComplete()) { + throw new IOException("Sasl authentication exchange hasn't completed yet"); + } + return new SaslInputStream(in, saslClient); + } + + /** + * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has + * been called. + * + * @param out + * the OutputStream to wrap + * @return a SASL wrapped OutputStream + * @throws IOException + */ + public OutputStream getOutputStream(OutputStream out) throws IOException { + if (!saslClient.isComplete()) { + throw new IOException("Sasl authentication exchange hasn't completed yet"); + } + return new SaslOutputStream(out, saslClient); + } + + /** Release resources used by wrapped saslClient */ + public void dispose() throws SaslException { + saslClient.dispose(); + } + + private static class SaslClientCallbackHandler implements CallbackHandler { + private final String userName; + private final char[] userPassword; + + public SaslClientCallbackHandler(Token token) { + this.userName = HBaseSaslRpcServer.encodeIdentifier(token.getIdentifier()); + this.userPassword = HBaseSaslRpcServer.encodePassword(token.getPassword()); + } + + public void handle(Callback[] callbacks) + throws UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + RealmCallback rc = null; + for (Callback callback : callbacks) { + if (callback instanceof RealmChoiceCallback) { + continue; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof RealmCallback) { + rc = (RealmCallback) callback; + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL client callback"); + } + } + if (nc != null) { + if (LOG.isDebugEnabled()) + LOG.debug("SASL client callback: setting username: " + userName); + nc.setName(userName); + } + if (pc != null) { + if (LOG.isDebugEnabled()) + LOG.debug("SASL client callback: setting userPassword"); + pc.setPassword(userPassword); + } + if (rc != null) { + if (LOG.isDebugEnabled()) + LOG.debug("SASL client callback: setting realm: " + + rc.getDefaultText()); + rc.setText(rc.getDefaultText()); + } + } + } +} Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java?rev=1203512&view=auto ============================================================================== --- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java (added) +++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java Fri Nov 18 07:13:03 2011 @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; +import javax.security.sasl.RealmCallback; +import javax.security.sasl.Sasl; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ipc.HBaseServer; +import org.apache.hadoop.hbase.ipc.SecureServer; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; + +/** + * A utility class for dealing with SASL on RPC server + */ +public class HBaseSaslRpcServer { + public static final Log LOG = LogFactory.getLog(HBaseSaslRpcServer.class); + public static final String SASL_DEFAULT_REALM = "default"; + public static final Map SASL_PROPS = + new TreeMap(); + + public static final int SWITCH_TO_SIMPLE_AUTH = -88; + + public static enum QualityOfProtection { + AUTHENTICATION("auth"), + INTEGRITY("auth-int"), + PRIVACY("auth-conf"); + + public final String saslQop; + + private QualityOfProtection(String saslQop) { + this.saslQop = saslQop; + } + + public String getSaslQop() { + return saslQop; + } + } + + public static void init(Configuration conf) { + QualityOfProtection saslQOP = QualityOfProtection.AUTHENTICATION; + String rpcProtection = conf.get("hbase.rpc.protection", + QualityOfProtection.AUTHENTICATION.name().toLowerCase()); + if (QualityOfProtection.INTEGRITY.name().toLowerCase() + .equals(rpcProtection)) { + saslQOP = QualityOfProtection.INTEGRITY; + } else if (QualityOfProtection.PRIVACY.name().toLowerCase().equals( + rpcProtection)) { + saslQOP = QualityOfProtection.PRIVACY; + } + + SASL_PROPS.put(Sasl.QOP, saslQOP.getSaslQop()); + SASL_PROPS.put(Sasl.SERVER_AUTH, "true"); + } + + static String encodeIdentifier(byte[] identifier) { + return new String(Base64.encodeBase64(identifier)); + } + + static byte[] decodeIdentifier(String identifier) { + return Base64.decodeBase64(identifier.getBytes()); + } + + public static T getIdentifier(String id, + SecretManager secretManager) throws InvalidToken { + byte[] tokenId = decodeIdentifier(id); + T tokenIdentifier = secretManager.createIdentifier(); + try { + tokenIdentifier.readFields(new DataInputStream(new ByteArrayInputStream( + tokenId))); + } catch (IOException e) { + throw (InvalidToken) new InvalidToken( + "Can't de-serialize tokenIdentifier").initCause(e); + } + return tokenIdentifier; + } + + static char[] encodePassword(byte[] password) { + return new String(Base64.encodeBase64(password)).toCharArray(); + } + + /** Splitting fully qualified Kerberos name into parts */ + public static String[] splitKerberosName(String fullName) { + return fullName.split("[/@]"); + } + + public enum SaslStatus { + SUCCESS (0), + ERROR (1); + + public final int state; + private SaslStatus(int state) { + this.state = state; + } + } + + /** Authentication method */ + public static enum AuthMethod { + SIMPLE((byte) 80, "", AuthenticationMethod.SIMPLE), + KERBEROS((byte) 81, "GSSAPI", AuthenticationMethod.KERBEROS), + DIGEST((byte) 82, "DIGEST-MD5", AuthenticationMethod.TOKEN); + + /** The code for this method. */ + public final byte code; + public final String mechanismName; + public final AuthenticationMethod authenticationMethod; + + private AuthMethod(byte code, String mechanismName, + AuthenticationMethod authMethod) { + this.code = code; + this.mechanismName = mechanismName; + this.authenticationMethod = authMethod; + } + + private static final int FIRST_CODE = values()[0].code; + + /** Return the object represented by the code. */ + private static AuthMethod valueOf(byte code) { + final int i = (code & 0xff) - FIRST_CODE; + return i < 0 || i >= values().length ? null : values()[i]; + } + + /** Return the SASL mechanism name */ + public String getMechanismName() { + return mechanismName; + } + + /** Read from in */ + public static AuthMethod read(DataInput in) throws IOException { + return valueOf(in.readByte()); + } + + /** Write to out */ + public void write(DataOutput out) throws IOException { + out.write(code); + } + }; + + /** CallbackHandler for SASL DIGEST-MD5 mechanism */ + public static class SaslDigestCallbackHandler implements CallbackHandler { + private SecretManager secretManager; + private SecureServer.SecureConnection connection; + + public SaslDigestCallbackHandler( + SecretManager secretManager, + SecureServer.SecureConnection connection) { + this.secretManager = secretManager; + this.connection = connection; + } + + private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken { + return encodePassword(secretManager.retrievePassword(tokenid)); + } + + /** {@inheritDoc} */ + @Override + public void handle(Callback[] callbacks) throws InvalidToken, + UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + AuthorizeCallback ac = null; + for (Callback callback : callbacks) { + if (callback instanceof AuthorizeCallback) { + ac = (AuthorizeCallback) callback; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof RealmCallback) { + continue; // realm is ignored + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL DIGEST-MD5 Callback"); + } + } + if (pc != null) { + TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), secretManager); + char[] password = getPassword(tokenIdentifier); + UserGroupInformation user = null; + user = tokenIdentifier.getUser(); // may throw exception + connection.attemptingUser = user; + if (LOG.isDebugEnabled()) { + LOG.debug("SASL server DIGEST-MD5 callback: setting password " + + "for client: " + tokenIdentifier.getUser()); + } + pc.setPassword(password); + } + if (ac != null) { + String authid = ac.getAuthenticationID(); + String authzid = ac.getAuthorizationID(); + if (authid.equals(authzid)) { + ac.setAuthorized(true); + } else { + ac.setAuthorized(false); + } + if (ac.isAuthorized()) { + if (LOG.isDebugEnabled()) { + String username = + getIdentifier(authzid, secretManager).getUser().getUserName(); + LOG.debug("SASL server DIGEST-MD5 callback: setting " + + "canonicalized client ID: " + username); + } + ac.setAuthorizedID(authzid); + } + } + } + } + + /** CallbackHandler for SASL GSSAPI Kerberos mechanism */ + public static class SaslGssCallbackHandler implements CallbackHandler { + + /** {@inheritDoc} */ + @Override + public void handle(Callback[] callbacks) throws + UnsupportedCallbackException { + AuthorizeCallback ac = null; + for (Callback callback : callbacks) { + if (callback instanceof AuthorizeCallback) { + ac = (AuthorizeCallback) callback; + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL GSSAPI Callback"); + } + } + if (ac != null) { + String authid = ac.getAuthenticationID(); + String authzid = ac.getAuthorizationID(); + if (authid.equals(authzid)) { + ac.setAuthorized(true); + } else { + ac.setAuthorized(false); + } + if (ac.isAuthorized()) { + if (LOG.isDebugEnabled()) + LOG.debug("SASL server GSSAPI callback: setting " + + "canonicalized client ID: " + authzid); + ac.setAuthorizedID(authzid); + } + } + } + } +} Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationKey.java URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationKey.java?rev=1203512&view=auto ============================================================================== --- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationKey.java (added) +++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationKey.java Fri Nov 18 07:13:03 2011 @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security.token; + +import javax.crypto.SecretKey; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +/** + * Represents a secret key used for signing and verifying authentication tokens + * by {@link AuthenticationTokenSecretManager}. + */ +public class AuthenticationKey implements Writable { + private int id; + private long expirationDate; + private SecretKey secret; + + public AuthenticationKey() { + // for Writable + } + + public AuthenticationKey(int keyId, long expirationDate, SecretKey key) { + this.id = keyId; + this.expirationDate = expirationDate; + this.secret = key; + } + + public int getKeyId() { + return id; + } + + public long getExpiration() { + return expirationDate; + } + + public void setExpiration(long timestamp) { + expirationDate = timestamp; + } + + SecretKey getKey() { + return secret; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof AuthenticationKey)) { + return false; + } + AuthenticationKey other = (AuthenticationKey)obj; + return id == other.getKeyId() && + expirationDate == other.getExpiration() && + (secret == null ? other.getKey() == null : + other.getKey() != null && + Bytes.equals(secret.getEncoded(), other.getKey().getEncoded())); + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("AuthenticationKey[ ") + .append("id=").append(id) + .append(", expiration=").append(expirationDate) + .append(" ]"); + return buf.toString(); + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeVInt(out, id); + WritableUtils.writeVLong(out, expirationDate); + if (secret == null) { + WritableUtils.writeVInt(out, -1); + } else { + byte[] keyBytes = secret.getEncoded(); + WritableUtils.writeVInt(out, keyBytes.length); + out.write(keyBytes); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + id = WritableUtils.readVInt(in); + expirationDate = WritableUtils.readVLong(in); + int keyLength = WritableUtils.readVInt(in); + if (keyLength < 0) { + secret = null; + } else { + byte[] keyBytes = new byte[keyLength]; + in.readFully(keyBytes); + secret = AuthenticationTokenSecretManager.createSecretKey(keyBytes); + } + } +} Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationProtocol.java URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationProtocol.java?rev=1203512&view=auto ============================================================================== --- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationProtocol.java (added) +++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationProtocol.java Fri Nov 18 07:13:03 2011 @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security.token; + +import java.io.IOException; + +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.security.token.Token; + +/** + * Defines a custom RPC protocol for obtaining authentication tokens + */ +public interface AuthenticationProtocol extends CoprocessorProtocol { + /** + * Obtains a token capable of authenticating as the current user for future + * connections. + * @return an authentication token for the current user + * @throws IOException If obtaining a token is denied or encounters an error + */ + public Token getAuthenticationToken() + throws IOException; + + /** + * Returns the currently authenticated username. + */ + public String whoami(); +} Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java?rev=1203512&view=auto ============================================================================== --- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java (added) +++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java Fri Nov 18 07:13:03 2011 @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security.token; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.TokenIdentifier; + +/** + * Represents the identity information stored in an HBase authentication token. + */ +public class AuthenticationTokenIdentifier extends TokenIdentifier { + public static final byte VERSION = 1; + public static final Text AUTH_TOKEN_TYPE = new Text("HBASE_AUTH_TOKEN"); + + protected String username; + protected int keyId; + protected long issueDate; + protected long expirationDate; + protected long sequenceNumber; + + public AuthenticationTokenIdentifier() { + } + + public AuthenticationTokenIdentifier(String username) { + this.username = username; + } + + public AuthenticationTokenIdentifier(String username, int keyId, + long issueDate, long expirationDate) { + this.username = username; + this.keyId = keyId; + this.issueDate = issueDate; + this.expirationDate = expirationDate; + } + + @Override + public Text getKind() { + return AUTH_TOKEN_TYPE; + } + + @Override + public UserGroupInformation getUser() { + if (username == null || "".equals(username)) { + return null; + } + return UserGroupInformation.createRemoteUser(username); + } + + public String getUsername() { + return username; + } + + void setUsername(String name) { + this.username = name; + } + + public int getKeyId() { + return keyId; + } + + void setKeyId(int id) { + this.keyId = id; + } + + public long getIssueDate() { + return issueDate; + } + + void setIssueDate(long timestamp) { + this.issueDate = timestamp; + } + + public long getExpirationDate() { + return expirationDate; + } + + void setExpirationDate(long timestamp) { + this.expirationDate = timestamp; + } + + public long getSequenceNumber() { + return sequenceNumber; + } + + void setSequenceNumber(long seq) { + this.sequenceNumber = seq; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeByte(VERSION); + WritableUtils.writeString(out, username); + WritableUtils.writeVInt(out, keyId); + WritableUtils.writeVLong(out, issueDate); + WritableUtils.writeVLong(out, expirationDate); + WritableUtils.writeVLong(out, sequenceNumber); + } + + @Override + public void readFields(DataInput in) throws IOException { + byte version = in.readByte(); + if (version != VERSION) { + throw new IOException("Version mismatch in deserialization: " + + "expected="+VERSION+", got="+version); + } + username = WritableUtils.readString(in); + keyId = WritableUtils.readVInt(in); + issueDate = WritableUtils.readVLong(in); + expirationDate = WritableUtils.readVLong(in); + sequenceNumber = WritableUtils.readVLong(in); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other instanceof AuthenticationTokenIdentifier) { + AuthenticationTokenIdentifier ident = (AuthenticationTokenIdentifier)other; + return sequenceNumber == ident.getSequenceNumber() + && keyId == ident.getKeyId() + && issueDate == ident.getIssueDate() + && expirationDate == ident.getExpirationDate() + && (username == null ? ident.getUsername() == null : + username.equals(ident.getUsername())); + } + return false; + } + + @Override + public int hashCode() { + return (int)sequenceNumber; + } +} Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java?rev=1203512&view=auto ============================================================================== --- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java (added) +++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java Fri Nov 18 07:13:03 2011 @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security.token; + +import javax.crypto.SecretKey; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.zookeeper.ClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKLeaderManager; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.zookeeper.KeeperException; + +/** + * Manages an internal list of secret keys used to sign new authentication + * tokens as they are generated, and to valid existing tokens used for + * authentication. + * + *

+ * A single instance of {@code AuthenticationTokenSecretManager} will be + * running as the "leader" in a given HBase cluster. The leader is responsible + * for periodically generating new secret keys, which are then distributed to + * followers via ZooKeeper, and for expiring previously used secret keys that + * are no longer needed (as any tokens using them have expired). + *

+ */ +public class AuthenticationTokenSecretManager + extends SecretManager { + + static final String NAME_PREFIX = "SecretManager-"; + + private static Log LOG = LogFactory.getLog( + AuthenticationTokenSecretManager.class); + + private long lastKeyUpdate; + private long keyUpdateInterval; + private long tokenMaxLifetime; + private ZKSecretWatcher zkWatcher; + private LeaderElector leaderElector; + private ClusterId clusterId; + + private Map allKeys = + new ConcurrentHashMap(); + private AuthenticationKey currentKey; + + private int idSeq; + private AtomicLong tokenSeq = new AtomicLong(); + private String name; + + /** + * Create a new secret manager instance for generating keys. + * @param conf Configuration to use + * @param zk Connection to zookeeper for handling leader elections + * @param keyUpdateInterval Time (in milliseconds) between rolling a new master key for token signing + * @param tokenMaxLifetime Maximum age (in milliseconds) before a token expires and is no longer valid + */ + /* TODO: Restrict access to this constructor to make rogues instances more difficult. + * For the moment this class is instantiated from + * org.apache.hadoop.hbase.ipc.SecureServer so public access is needed. + */ + public AuthenticationTokenSecretManager(Configuration conf, + ZooKeeperWatcher zk, String serverName, + long keyUpdateInterval, long tokenMaxLifetime) { + this.zkWatcher = new ZKSecretWatcher(conf, zk, this); + this.keyUpdateInterval = keyUpdateInterval; + this.tokenMaxLifetime = tokenMaxLifetime; + this.leaderElector = new LeaderElector(zk, serverName); + this.name = NAME_PREFIX+serverName; + this.clusterId = new ClusterId(zk, zk); + } + + public void start() { + try { + // populate any existing keys + this.zkWatcher.start(); + // try to become leader + this.leaderElector.start(); + } catch (KeeperException ke) { + LOG.error("Zookeeper initialization failed", ke); + } + } + + public void stop() { + this.leaderElector.stop("SecretManager stopping"); + } + + public boolean isMaster() { + return leaderElector.isMaster(); + } + + public String getName() { + return name; + } + + @Override + protected byte[] createPassword(AuthenticationTokenIdentifier identifier) { + long now = EnvironmentEdgeManager.currentTimeMillis(); + AuthenticationKey secretKey = currentKey; + identifier.setKeyId(secretKey.getKeyId()); + identifier.setIssueDate(now); + identifier.setExpirationDate(now + tokenMaxLifetime); + identifier.setSequenceNumber(tokenSeq.getAndIncrement()); + return createPassword(WritableUtils.toByteArray(identifier), + secretKey.getKey()); + } + + @Override + public byte[] retrievePassword(AuthenticationTokenIdentifier identifier) + throws InvalidToken { + long now = EnvironmentEdgeManager.currentTimeMillis(); + if (identifier.getExpirationDate() < now) { + throw new InvalidToken("Token has expired"); + } + AuthenticationKey masterKey = allKeys.get(identifier.getKeyId()); + if (masterKey == null) { + throw new InvalidToken("Unknown master key for token (id="+ + identifier.getKeyId()+")"); + } + // regenerate the password + return createPassword(WritableUtils.toByteArray(identifier), + masterKey.getKey()); + } + + @Override + public AuthenticationTokenIdentifier createIdentifier() { + return new AuthenticationTokenIdentifier(); + } + + public Token generateToken(String username) { + AuthenticationTokenIdentifier ident = + new AuthenticationTokenIdentifier(username); + Token token = + new Token(ident, this); + if (clusterId.hasId()) { + token.setService(new Text(clusterId.getId())); + } + return token; + } + + public synchronized void addKey(AuthenticationKey key) throws IOException { + // ignore zk changes when running as master + if (leaderElector.isMaster()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Running as master, ignoring new key "+key.getKeyId()); + } + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding key "+key.getKeyId()); + } + + allKeys.put(key.getKeyId(), key); + if (currentKey == null || key.getKeyId() > currentKey.getKeyId()) { + currentKey = key; + } + // update current sequence + if (key.getKeyId() > idSeq) { + idSeq = key.getKeyId(); + } + } + + synchronized void removeKey(Integer keyId) { + // ignore zk changes when running as master + if (leaderElector.isMaster()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Running as master, ignoring removed key "+keyId); + } + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Removing key "+keyId); + } + + allKeys.remove(keyId); + } + + AuthenticationKey getCurrentKey() { + return currentKey; + } + + AuthenticationKey getKey(int keyId) { + return allKeys.get(keyId); + } + + synchronized void removeExpiredKeys() { + if (!leaderElector.isMaster()) { + LOG.info("Skipping removeExpiredKeys() because not running as master."); + return; + } + + long now = EnvironmentEdgeManager.currentTimeMillis(); + Iterator iter = allKeys.values().iterator(); + while (iter.hasNext()) { + AuthenticationKey key = iter.next(); + if (key.getExpiration() < now) { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing expired key "+key.getKeyId()); + } + iter.remove(); + zkWatcher.removeKeyFromZK(key); + } + } + } + + synchronized void rollCurrentKey() { + if (!leaderElector.isMaster()) { + LOG.info("Skipping rollCurrentKey() because not running as master."); + return; + } + + long now = EnvironmentEdgeManager.currentTimeMillis(); + AuthenticationKey prev = currentKey; + AuthenticationKey newKey = new AuthenticationKey(++idSeq, + Long.MAX_VALUE, // don't allow to expire until it's replaced by a new key + generateSecret()); + allKeys.put(newKey.getKeyId(), newKey); + currentKey = newKey; + zkWatcher.addKeyToZK(newKey); + lastKeyUpdate = now; + + if (prev != null) { + // make sure previous key is still stored + prev.setExpiration(now + tokenMaxLifetime); + allKeys.put(prev.getKeyId(), prev); + zkWatcher.updateKeyInZK(prev); + } + } + + public static SecretKey createSecretKey(byte[] raw) { + return SecretManager.createSecretKey(raw); + } + + private class LeaderElector extends Thread implements Stoppable { + private boolean stopped = false; + /** Flag indicating whether we're in charge of rolling/expiring keys */ + private boolean isMaster = false; + private ZKLeaderManager zkLeader; + + public LeaderElector(ZooKeeperWatcher watcher, String serverName) { + setDaemon(true); + setName("ZKSecretWatcher-leaderElector"); + zkLeader = new ZKLeaderManager(watcher, + ZKUtil.joinZNode(zkWatcher.getRootKeyZNode(), "keymaster"), + Bytes.toBytes(serverName), this); + } + + public boolean isMaster() { + return isMaster; + } + + @Override + public boolean isStopped() { + return stopped; + } + + @Override + public void stop(String reason) { + stopped = true; + // prevent further key generation when stopping + if (isMaster) { + zkLeader.stepDownAsLeader(); + } + isMaster = false; + LOG.info("Stopping leader election, because: "+reason); + interrupt(); + } + + public void run() { + zkLeader.start(); + zkLeader.waitToBecomeLeader(); + isMaster = true; + + while (!stopped) { + long now = EnvironmentEdgeManager.currentTimeMillis(); + + // clear any expired + removeExpiredKeys(); + + if (lastKeyUpdate + keyUpdateInterval < now) { + // roll a new master key + rollCurrentKey(); + } + + try { + Thread.sleep(5000); + } catch (InterruptedException ie) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted waiting for next update", ie); + } + } + } + } + } +} Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java?rev=1203512&view=auto ============================================================================== --- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java (added) +++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java Fri Nov 18 07:13:03 2011 @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security.token; + +import java.util.Collection; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenSelector; + +public class AuthenticationTokenSelector + implements TokenSelector { + + public AuthenticationTokenSelector() { + } + + @Override + public Token selectToken(Text serviceName, + Collection> tokens) { + if (serviceName != null) { + for (Token ident : tokens) { + if (serviceName.equals(ident.getService()) && + AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.equals(ident.getKind())) { + return (Token)ident; + } + } + } + return null; + } +} Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java?rev=1203512&view=auto ============================================================================== --- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java (added) +++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java Fri Nov 18 07:13:03 2011 @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security.token; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.RequestContext; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.SecureServer; +import org.apache.hadoop.hbase.security.AccessDeniedException; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; + +/** + * Provides a service for obtaining authentication tokens via the + * {@link AuthenticationProtocol} coprocessor protocol. + */ +public class TokenProvider extends BaseEndpointCoprocessor + implements AuthenticationProtocol { + + public static final long VERSION = 0L; + private static Log LOG = LogFactory.getLog(TokenProvider.class); + + private AuthenticationTokenSecretManager secretManager; + + + @Override + public void start(CoprocessorEnvironment env) { + super.start(env); + + // if running at region + if (env instanceof RegionCoprocessorEnvironment) { + RegionCoprocessorEnvironment regionEnv = + (RegionCoprocessorEnvironment)env; + RpcServer server = regionEnv.getRegionServerServices().getRpcServer(); + if (server instanceof SecureServer) { + SecretManager mgr = ((SecureServer)server).getSecretManager(); + if (mgr instanceof AuthenticationTokenSecretManager) { + secretManager = (AuthenticationTokenSecretManager)mgr; + } + } + } + } + + @Override + public Token getAuthenticationToken() + throws IOException { + if (secretManager == null) { + throw new IOException( + "No secret manager configured for token authentication"); + } + + User currentUser = RequestContext.getRequestUser(); + UserGroupInformation ugi = null; + if (currentUser != null) { + ugi = currentUser.getUGI(); + } + if (currentUser == null) { + throw new AccessDeniedException("No authenticated user for request!"); + } else if (ugi.getAuthenticationMethod() != + UserGroupInformation.AuthenticationMethod.KERBEROS) { + LOG.warn("Token generation denied for user="+currentUser.getName() + +", authMethod="+ugi.getAuthenticationMethod()); + throw new AccessDeniedException( + "Token generation only allowed for Kerberos authenticated clients"); + } + + return secretManager.generateToken(currentUser.getName()); + } + + @Override + public String whoami() { + return RequestContext.getRequestUserName(); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + if (AuthenticationProtocol.class.getName().equals(protocol)) { + return TokenProvider.VERSION; + } + LOG.warn("Unknown protocol requested: "+protocol); + return -1; + } +} Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java?rev=1203512&view=auto ============================================================================== --- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java (added) +++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java Fri Nov 18 07:13:03 2011 @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security.token; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; + +/** + * Utility methods for obtaining authentication tokens. + */ +public class TokenUtil { + private static Log LOG = LogFactory.getLog(TokenUtil.class); + + /** + * Obtain and return an authentication token for the current user. + * @param conf The configuration for connecting to the cluster + * @return the authentication token instance + */ + public static Token obtainToken( + Configuration conf) throws IOException { + HTable meta = null; + try { + meta = new HTable(conf, ".META."); + AuthenticationProtocol prot = meta.coprocessorProxy( + AuthenticationProtocol.class, HConstants.EMPTY_START_ROW); + return prot.getAuthenticationToken(); + } finally { + if (meta != null) { + meta.close(); + } + } + } + + private static Text getClusterId(Token token) + throws IOException { + return token.getService() != null + ? token.getService() : new Text("default"); + } + + /** + * Obtain an authentication token for the given user and add it to the + * user's credentials. + * @param conf The configuration for connecting to the cluster + * @param user The user for whom to obtain the token + * @throws IOException If making a remote call to the {@link TokenProvider} fails + * @throws InterruptedException If executing as the given user is interrupted + */ + public static void obtainAndCacheToken(final Configuration conf, + UserGroupInformation user) + throws IOException, InterruptedException { + try { + Token token = + user.doAs(new PrivilegedExceptionAction>() { + public Token run() throws Exception { + return obtainToken(conf); + } + }); + + if (token == null) { + throw new IOException("No token returned for user "+user.getUserName()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Obtained token "+token.getKind().toString()+" for user "+ + user.getUserName()); + } + user.addToken(token); + } catch (IOException ioe) { + throw ioe; + } catch (InterruptedException ie) { + throw ie; + } catch (RuntimeException re) { + throw re; + } catch (Exception e) { + throw new UndeclaredThrowableException(e, + "Unexpected exception obtaining token for user "+user.getUserName()); + } + } + + /** + * Obtain an authentication token on behalf of the given user and add it to + * the credentials for the given map reduce job. + * @param conf The configuration for connecting to the cluster + * @param user The user for whom to obtain the token + * @param job The job instance in which the token should be stored + * @throws IOException If making a remote call to the {@link TokenProvider} fails + * @throws InterruptedException If executing as the given user is interrupted + */ + public static void obtainTokenForJob(final Configuration conf, + UserGroupInformation user, Job job) + throws IOException, InterruptedException { + try { + Token token = + user.doAs(new PrivilegedExceptionAction>() { + public Token run() throws Exception { + return obtainToken(conf); + } + }); + + if (token == null) { + throw new IOException("No token returned for user "+user.getUserName()); + } + Text clusterId = getClusterId(token); + LOG.info("Obtained token "+token.getKind().toString()+" for user "+ + user.getUserName() + " on cluster "+clusterId.toString()); + job.getCredentials().addToken(clusterId, token); + } catch (IOException ioe) { + throw ioe; + } catch (InterruptedException ie) { + throw ie; + } catch (RuntimeException re) { + throw re; + } catch (Exception e) { + throw new UndeclaredThrowableException(e, + "Unexpected exception obtaining token for user "+user.getUserName()); + } + } + + /** + * Obtain an authentication token on behalf of the given user and add it to + * the credentials for the given map reduce job. + * @param user The user for whom to obtain the token + * @param job The job configuration in which the token should be stored + * @throws IOException If making a remote call to the {@link TokenProvider} fails + * @throws InterruptedException If executing as the given user is interrupted + */ + public static void obtainTokenForJob(final JobConf job, + UserGroupInformation user) + throws IOException, InterruptedException { + try { + Token token = + user.doAs(new PrivilegedExceptionAction>() { + public Token run() throws Exception { + return obtainToken(job); + } + }); + + if (token == null) { + throw new IOException("No token returned for user "+user.getUserName()); + } + Text clusterId = getClusterId(token); + LOG.info("Obtained token "+token.getKind().toString()+" for user "+ + user.getUserName()+" on cluster "+clusterId.toString()); + job.getCredentials().addToken(clusterId, token); + } catch (IOException ioe) { + throw ioe; + } catch (InterruptedException ie) { + throw ie; + } catch (RuntimeException re) { + throw re; + } catch (Exception e) { + throw new UndeclaredThrowableException(e, + "Unexpected exception obtaining token for user "+user.getUserName()); + } + } +} Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/ZKSecretWatcher.java URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/ZKSecretWatcher.java?rev=1203512&view=auto ============================================================================== --- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/ZKSecretWatcher.java (added) +++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/ZKSecretWatcher.java Fri Nov 18 07:13:03 2011 @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security.token; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * Synchronizes token encryption keys across cluster nodes. + */ +public class ZKSecretWatcher extends ZooKeeperListener { + private static final String DEFAULT_ROOT_NODE = "tokenauth"; + private static final String DEFAULT_KEYS_PARENT = "keys"; + private static Log LOG = LogFactory.getLog(ZKSecretWatcher.class); + + private AuthenticationTokenSecretManager secretManager; + private String baseKeyZNode; + private String keysParentZNode; + + public ZKSecretWatcher(Configuration conf, + ZooKeeperWatcher watcher, + AuthenticationTokenSecretManager secretManager) { + super(watcher); + this.secretManager = secretManager; + String keyZNodeParent = conf.get("zookeeper.znode.tokenauth.parent", DEFAULT_ROOT_NODE); + this.baseKeyZNode = ZKUtil.joinZNode(watcher.baseZNode, keyZNodeParent); + this.keysParentZNode = ZKUtil.joinZNode(baseKeyZNode, DEFAULT_KEYS_PARENT); + } + + public void start() throws KeeperException { + watcher.registerListener(this); + // make sure the base node exists + ZKUtil.createWithParents(watcher, keysParentZNode); + + if (ZKUtil.watchAndCheckExists(watcher, keysParentZNode)) { + List nodes = + ZKUtil.getChildDataAndWatchForNewChildren(watcher, keysParentZNode); + refreshNodes(nodes); + } + } + + @Override + public void nodeCreated(String path) { + if (path.equals(keysParentZNode)) { + try { + List nodes = + ZKUtil.getChildDataAndWatchForNewChildren(watcher, keysParentZNode); + refreshNodes(nodes); + } catch (KeeperException ke) { + LOG.fatal("Error reading data from zookeeper", ke); + watcher.abort("Error reading new key znode "+path, ke); + } + } + } + + @Override + public void nodeDeleted(String path) { + if (keysParentZNode.equals(ZKUtil.getParent(path))) { + String keyId = ZKUtil.getNodeName(path); + try { + Integer id = new Integer(keyId); + secretManager.removeKey(id); + } catch (NumberFormatException nfe) { + LOG.error("Invalid znode name for key ID '"+keyId+"'", nfe); + } + } + } + + @Override + public void nodeDataChanged(String path) { + if (keysParentZNode.equals(ZKUtil.getParent(path))) { + try { + byte[] data = ZKUtil.getDataAndWatch(watcher, path); + if (data == null || data.length == 0) { + LOG.debug("Ignoring empty node "+path); + return; + } + + AuthenticationKey key = (AuthenticationKey)Writables.getWritable(data, + new AuthenticationKey()); + secretManager.addKey(key); + } catch (KeeperException ke) { + LOG.fatal("Error reading data from zookeeper", ke); + watcher.abort("Error reading updated key znode "+path, ke); + } catch (IOException ioe) { + LOG.fatal("Error reading key writables", ioe); + watcher.abort("Error reading key writables from znode "+path, ioe); + } + } + } + + @Override + public void nodeChildrenChanged(String path) { + if (path.equals(keysParentZNode)) { + // keys changed + try { + List nodes = + ZKUtil.getChildDataAndWatchForNewChildren(watcher, keysParentZNode); + refreshNodes(nodes); + } catch (KeeperException ke) { + LOG.fatal("Error reading data from zookeeper", ke); + watcher.abort("Error reading changed keys from zookeeper", ke); + } + } + } + + public String getRootKeyZNode() { + return baseKeyZNode; + } + + private void refreshNodes(List nodes) { + for (ZKUtil.NodeAndData n : nodes) { + String path = n.getNode(); + String keyId = ZKUtil.getNodeName(path); + try { + byte[] data = n.getData(); + if (data == null || data.length == 0) { + LOG.debug("Ignoring empty node "+path); + continue; + } + AuthenticationKey key = (AuthenticationKey)Writables.getWritable( + data, new AuthenticationKey()); + secretManager.addKey(key); + } catch (IOException ioe) { + LOG.fatal("Failed reading new secret key for id '" + keyId + + "' from zk", ioe); + watcher.abort("Error deserializing key from znode "+path, ioe); + } + } + } + + private String getKeyNode(int keyId) { + return ZKUtil.joinZNode(keysParentZNode, Integer.toString(keyId)); + } + + public void removeKeyFromZK(AuthenticationKey key) { + String keyZNode = getKeyNode(key.getKeyId()); + try { + ZKUtil.deleteNode(watcher, keyZNode); + } catch (KeeperException.NoNodeException nne) { + LOG.error("Non-existent znode "+keyZNode+" for key "+key.getKeyId(), nne); + } catch (KeeperException ke) { + LOG.fatal("Failed removing znode "+keyZNode+" for key "+key.getKeyId(), + ke); + watcher.abort("Unhandled zookeeper error removing znode "+keyZNode+ + " for key "+key.getKeyId(), ke); + } + } + + public void addKeyToZK(AuthenticationKey key) { + String keyZNode = getKeyNode(key.getKeyId()); + try { + byte[] keyData = Writables.getBytes(key); + // TODO: is there any point in retrying beyond what ZK client does? + ZKUtil.createSetData(watcher, keyZNode, keyData); + } catch (KeeperException ke) { + LOG.fatal("Unable to synchronize master key "+key.getKeyId()+ + " to znode "+keyZNode, ke); + watcher.abort("Unable to synchronize secret key "+ + key.getKeyId()+" in zookeeper", ke); + } catch (IOException ioe) { + // this can only happen from an error serializing the key + watcher.abort("Failed serializing key "+key.getKeyId(), ioe); + } + } + + public void updateKeyInZK(AuthenticationKey key) { + String keyZNode = getKeyNode(key.getKeyId()); + try { + byte[] keyData = Writables.getBytes(key); + try { + ZKUtil.updateExistingNodeData(watcher, keyZNode, keyData, -1); + } catch (KeeperException.NoNodeException ne) { + // node was somehow removed, try adding it back + ZKUtil.createSetData(watcher, keyZNode, keyData); + } + } catch (KeeperException ke) { + LOG.fatal("Unable to update master key "+key.getKeyId()+ + " in znode "+keyZNode); + watcher.abort("Unable to synchronize secret key "+ + key.getKeyId()+" in zookeeper", ke); + } catch (IOException ioe) { + // this can only happen from an error serializing the key + watcher.abort("Failed serializing key "+key.getKeyId(), ioe); + } + } +} Added: hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java?rev=1203512&view=auto ============================================================================== --- hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java (added) +++ hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java Fri Nov 18 07:13:03 2011 @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security.token; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.security.PrivilegedExceptionAction; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.ipc.HBaseRPC; +import org.apache.hadoop.hbase.ipc.RequestContext; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.SecureRpcEngine; +import org.apache.hadoop.hbase.ipc.SecureServer; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; + +import org.apache.hadoop.security.token.Token; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests for authentication token creation and usage + */ +public class TestTokenAuthentication { + public static interface IdentityProtocol extends CoprocessorProtocol { + public String whoami(); + public String getAuthMethod(); + } + + public static class IdentityCoprocessor extends BaseEndpointCoprocessor + implements IdentityProtocol { + public String whoami() { + return RequestContext.getRequestUserName(); + } + + public String getAuthMethod() { + UserGroupInformation ugi = null; + User user = RequestContext.getRequestUser(); + if (user != null) { + ugi = user.getUGI(); + } + if (ugi != null) { + return ugi.getAuthenticationMethod().toString(); + } + return null; + } + } + + private static HBaseTestingUtility TEST_UTIL; + private static AuthenticationTokenSecretManager secretManager; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(HBaseRPC.RPC_ENGINE_PROP, SecureRpcEngine.class.getName()); + conf.set("hbase.coprocessor.region.classes", + IdentityCoprocessor.class.getName()); + TEST_UTIL.startMiniCluster(); + HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); + RpcServer server = rs.getRpcServer(); + assertTrue(server instanceof SecureServer); + SecretManager mgr = + ((SecureServer)server).getSecretManager(); + assertTrue(mgr instanceof AuthenticationTokenSecretManager); + secretManager = (AuthenticationTokenSecretManager)mgr; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testTokenCreation() throws Exception { + Token token = + secretManager.generateToken("testuser"); + + AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier(); + Writables.getWritable(token.getIdentifier(), ident); + assertEquals("Token username should match", "testuser", + ident.getUsername()); + byte[] passwd = secretManager.retrievePassword(ident); + assertTrue("Token password and password from secret manager should match", + Bytes.equals(token.getPassword(), passwd)); + } + + // @Test - Disable due to kerberos requirement + public void testTokenAuthentication() throws Exception { + UserGroupInformation testuser = + UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"}); + + testuser.setAuthenticationMethod( + UserGroupInformation.AuthenticationMethod.TOKEN); + final Configuration conf = TEST_UTIL.getConfiguration(); + conf.set("hadoop.security.authentication", "kerberos"); + conf.set("randomkey", UUID.randomUUID().toString()); + testuser.setConfiguration(conf); + Token token = + secretManager.generateToken("testuser"); + testuser.addToken(token); + + // verify the server authenticates us as this token user + testuser.doAs(new PrivilegedExceptionAction() { + public Object run() throws Exception { + HTable table = new HTable(conf, ".META."); + IdentityProtocol prot = table.coprocessorProxy( + IdentityProtocol.class, HConstants.EMPTY_START_ROW); + String myname = prot.whoami(); + assertEquals("testuser", myname); + String authMethod = prot.getAuthMethod(); + assertEquals("TOKEN", authMethod); + return null; + } + }); + } +} Added: hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/token/TestZKSecretWatcher.java URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/token/TestZKSecretWatcher.java?rev=1203512&view=auto ============================================================================== --- hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/token/TestZKSecretWatcher.java (added) +++ hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/token/TestZKSecretWatcher.java Fri Nov 18 07:13:03 2011 @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security.token; + +import static org.junit.Assert.*; + +import java.util.Map; + +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the synchronization of token authentication master keys through + * ZKSecretWatcher + */ +public class TestZKSecretWatcher { + private static Log LOG = LogFactory.getLog(TestZKSecretWatcher.class); + private static HBaseTestingUtility TEST_UTIL; + private static AuthenticationTokenSecretManager KEY_MASTER; + private static AuthenticationTokenSecretManager KEY_SLAVE; + private static AuthenticationTokenSecretManager KEY_SLAVE2; + private static AuthenticationTokenSecretManager KEY_SLAVE3; + + private static class MockAbortable implements Abortable { + private boolean abort; + public void abort(String reason, Throwable e) { + LOG.info("Aborting: "+reason, e); + abort = true; + } + + public boolean isAborted() { + return abort; + } + } + + @BeforeClass + public static void setupBeforeClass() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.startMiniZKCluster(); + Configuration conf = TEST_UTIL.getConfiguration(); + + ZooKeeperWatcher zk = newZK(conf, "server1", new MockAbortable()); + AuthenticationTokenSecretManager[] tmp = new AuthenticationTokenSecretManager[2]; + tmp[0] = new AuthenticationTokenSecretManager( + conf, zk, "server1", 60*60*1000, 60*1000); + tmp[0].start(); + + zk = newZK(conf, "server2", new MockAbortable()); + tmp[1] = new AuthenticationTokenSecretManager( + conf, zk, "server2", 60*60*1000, 60*1000); + tmp[1].start(); + + while (KEY_MASTER == null) { + for (int i=0; i<2; i++) { + if (tmp[i].isMaster()) { + KEY_MASTER = tmp[i]; + KEY_SLAVE = tmp[ i+1 % 2 ]; + break; + } + } + Thread.sleep(500); + } + LOG.info("Master is "+KEY_MASTER.getName()+ + ", slave is "+KEY_SLAVE.getName()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @Test + public void testKeyUpdate() throws Exception { + // sanity check + assertTrue(KEY_MASTER.isMaster()); + assertFalse(KEY_SLAVE.isMaster()); + int maxKeyId = 0; + + KEY_MASTER.rollCurrentKey(); + AuthenticationKey key1 = KEY_MASTER.getCurrentKey(); + assertNotNull(key1); + LOG.debug("Master current key: "+key1.getKeyId()); + + // wait for slave to update + Thread.sleep(1000); + AuthenticationKey slaveCurrent = KEY_SLAVE.getCurrentKey(); + assertNotNull(slaveCurrent); + assertEquals(key1, slaveCurrent); + LOG.debug("Slave current key: "+slaveCurrent.getKeyId()); + + // generate two more keys then expire the original + KEY_MASTER.rollCurrentKey(); + AuthenticationKey key2 = KEY_MASTER.getCurrentKey(); + LOG.debug("Master new current key: "+key2.getKeyId()); + KEY_MASTER.rollCurrentKey(); + AuthenticationKey key3 = KEY_MASTER.getCurrentKey(); + LOG.debug("Master new current key: "+key3.getKeyId()); + + // force expire the original key + key1.setExpiration(EnvironmentEdgeManager.currentTimeMillis() - 1000); + KEY_MASTER.removeExpiredKeys(); + // verify removed from master + assertNull(KEY_MASTER.getKey(key1.getKeyId())); + + // wait for slave to catch up + Thread.sleep(1000); + // make sure the slave has both new keys + AuthenticationKey slave2 = KEY_SLAVE.getKey(key2.getKeyId()); + assertNotNull(slave2); + assertEquals(key2, slave2); + AuthenticationKey slave3 = KEY_SLAVE.getKey(key3.getKeyId()); + assertNotNull(slave3); + assertEquals(key3, slave3); + slaveCurrent = KEY_SLAVE.getCurrentKey(); + assertEquals(key3, slaveCurrent); + LOG.debug("Slave current key: "+slaveCurrent.getKeyId()); + + // verify that the expired key has been removed + assertNull(KEY_SLAVE.getKey(key1.getKeyId())); + + // bring up a new slave + Configuration conf = TEST_UTIL.getConfiguration(); + ZooKeeperWatcher zk = newZK(conf, "server3", new MockAbortable()); + KEY_SLAVE2 = new AuthenticationTokenSecretManager( + conf, zk, "server3", 60*60*1000, 60*1000); + KEY_SLAVE2.start(); + + Thread.sleep(1000); + // verify the new slave has current keys (and not expired) + slave2 = KEY_SLAVE2.getKey(key2.getKeyId()); + assertNotNull(slave2); + assertEquals(key2, slave2); + slave3 = KEY_SLAVE2.getKey(key3.getKeyId()); + assertNotNull(slave3); + assertEquals(key3, slave3); + slaveCurrent = KEY_SLAVE2.getCurrentKey(); + assertEquals(key3, slaveCurrent); + assertNull(KEY_SLAVE2.getKey(key1.getKeyId())); + + // test leader failover + KEY_MASTER.stop(); + + // wait for master to stop + Thread.sleep(1000); + assertFalse(KEY_MASTER.isMaster()); + + // check for a new master + AuthenticationTokenSecretManager[] mgrs = + new AuthenticationTokenSecretManager[]{ KEY_SLAVE, KEY_SLAVE2 }; + AuthenticationTokenSecretManager newMaster = null; + int tries = 0; + while (newMaster == null && tries++ < 5) { + for (AuthenticationTokenSecretManager mgr : mgrs) { + if (mgr.isMaster()) { + newMaster = mgr; + break; + } + } + if (newMaster == null) { + Thread.sleep(500); + } + } + assertNotNull(newMaster); + + AuthenticationKey current = newMaster.getCurrentKey(); + // new master will immediately roll the current key, so it's current may be greater + assertTrue(current.getKeyId() >= slaveCurrent.getKeyId()); + LOG.debug("New master, current key: "+current.getKeyId()); + + // roll the current key again on new master and verify the key ID increments + newMaster.rollCurrentKey(); + AuthenticationKey newCurrent = newMaster.getCurrentKey(); + LOG.debug("New master, rolled new current key: "+newCurrent.getKeyId()); + assertTrue(newCurrent.getKeyId() > current.getKeyId()); + + // add another slave + ZooKeeperWatcher zk3 = newZK(conf, "server4", new MockAbortable()); + KEY_SLAVE3 = new AuthenticationTokenSecretManager( + conf, zk3, "server4", 60*60*1000, 60*1000); + KEY_SLAVE3.start(); + Thread.sleep(5000); + + // check master failover again + newMaster.stop(); + + // wait for master to stop + Thread.sleep(5000); + assertFalse(newMaster.isMaster()); + + // check for a new master + mgrs = new AuthenticationTokenSecretManager[]{ KEY_SLAVE, KEY_SLAVE2, KEY_SLAVE3 }; + newMaster = null; + tries = 0; + while (newMaster == null && tries++ < 5) { + for (AuthenticationTokenSecretManager mgr : mgrs) { + if (mgr.isMaster()) { + newMaster = mgr; + break; + } + } + if (newMaster == null) { + Thread.sleep(500); + } + } + assertNotNull(newMaster); + + AuthenticationKey current2 = newMaster.getCurrentKey(); + // new master will immediately roll the current key, so it's current may be greater + assertTrue(current2.getKeyId() >= newCurrent.getKeyId()); + LOG.debug("New master 2, current key: "+current2.getKeyId()); + + // roll the current key again on new master and verify the key ID increments + newMaster.rollCurrentKey(); + AuthenticationKey newCurrent2 = newMaster.getCurrentKey(); + LOG.debug("New master 2, rolled new current key: "+newCurrent2.getKeyId()); + assertTrue(newCurrent2.getKeyId() > current2.getKeyId()); + } + + private static ZooKeeperWatcher newZK(Configuration conf, String name, + Abortable abort) throws Exception { + Configuration copy = HBaseConfiguration.create(conf); + ZooKeeperWatcher zk = new ZooKeeperWatcher(copy, name, abort); + return zk; + } +}