Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 23015104C0 for ; Fri, 13 Feb 2015 19:33:26 +0000 (UTC) Received: (qmail 50604 invoked by uid 500); 13 Feb 2015 19:33:26 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 50489 invoked by uid 500); 13 Feb 2015 19:33:26 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 50471 invoked by uid 99); 13 Feb 2015 19:33:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Feb 2015 19:33:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BFA63E07F3; Fri, 13 Feb 2015 19:33:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Fri, 13 Feb 2015 19:33:27 -0000 Message-Id: <4218794efbfa480cb79a471ab2d07e68@git.apache.org> In-Reply-To: <5f6fb70a98c84cd3af3ebccb397e8250@git.apache.org> References: <5f6fb70a98c84cd3af3ebccb397e8250@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/7] accumulo git commit: ACCUMULO-3513 Add delegation token support for kerberos configurations http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java index f85505d..150f0d3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java @@ -24,8 +24,10 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.DelegationToken; import org.apache.accumulo.core.client.security.tokens.KerberosToken; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism; import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.server.security.SystemCredentials.SystemToken; import org.apache.accumulo.server.security.UserImpersonation; @@ -81,6 +83,19 @@ public class TCredentialsUpdatingInvocationHandler implements InvocationHandl } Class tokenClass = getTokenClassFromName(tcreds.tokenClassName); + + // The Accumulo principal extracted from the SASL transport + final String principal = UGIAssumingProcessor.rpcPrincipal(); + + // If we authenticated the user over DIGEST-MD5 and they have a DelegationToken, the principals should match + if (SaslMechanism.DIGEST_MD5 == UGIAssumingProcessor.rpcMechanism() && DelegationToken.class.isAssignableFrom(tokenClass)) { + if (!principal.equals(tcreds.principal)) { + log.warn("{} issued RPC with delegation token over DIGEST-MD5 as the Accumulo principal {}. Disallowing RPC", principal, tcreds.principal); + throw new ThriftSecurityException("RPC principal did not match provided Accumulo principal", SecurityErrorCode.BAD_CREDENTIALS); + } + return; + } + // If the authentication token isn't a KerberosToken if (!KerberosToken.class.isAssignableFrom(tokenClass) && !SystemToken.class.isAssignableFrom(tokenClass)) { // Don't include messages about SystemToken since it's internal @@ -88,9 +103,6 @@ public class TCredentialsUpdatingInvocationHandler implements InvocationHandl throw new ThriftSecurityException("Did not receive a valid token", SecurityErrorCode.BAD_CREDENTIALS); } - // The Accumulo principal extracted from the SASL transport - final String principal = UGIAssumingProcessor.rpcPrincipal(); - if (null == principal) { log.debug("Found KerberosToken in TCredentials, but did not receive principal from SASL processor"); throw new ThriftSecurityException("Did not extract principal from Thrift SASL processor", SecurityErrorCode.BAD_CREDENTIALS); http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index f1f8963..558b02e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -34,7 +34,6 @@ import javax.net.ssl.SSLServerSocket; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.rpc.SaslConnectionParams; import org.apache.accumulo.core.rpc.SslConnectionParams; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory; @@ -150,7 +149,7 @@ public class TServerUtils { try { HostAndPort addr = HostAndPort.fromParts(hostname, port); return TServerUtils.startTServer(addr, serverType, timedProcessor, serverName, threadName, minThreads, simpleTimerThreadpoolSize, - timeBetweenThreadChecks, maxMessageSize, service.getServerSslParams(), service.getServerSaslParams(), service.getClientTimeoutInMillis()); + timeBetweenThreadChecks, maxMessageSize, service.getServerSslParams(), service.getSaslParams(), service.getClientTimeoutInMillis()); } catch (TTransportException ex) { log.error("Unable to start TServer", ex); if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) { @@ -380,7 +379,7 @@ public class TServerUtils { } public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout, - SaslConnectionParams params, final String serverName, String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks) + SaslServerConnectionParams params, final String serverName, String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks) throws TTransportException { // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the TThreadPoolServer does, // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it open()'s which will fail @@ -388,7 +387,7 @@ public class TServerUtils { log.info("Creating SASL thread pool thrift server on listening on {}:{}", address.getHostText(), address.getPort()); TServerSocket transport = new TServerSocket(address.getPort(), (int) socketTimeout); - final String hostname, fqdn; + String hostname, fqdn; try { hostname = InetAddress.getByName(address.getHostText()).getCanonicalHostName(); fqdn = InetAddress.getLocalHost().getCanonicalHostName(); @@ -396,10 +395,15 @@ public class TServerUtils { throw new TTransportException(e); } + // If we can't get a real hostname from the provided host test, use the hostname from DNS for localhost + if ("0.0.0.0".equals(hostname)) { + hostname = fqdn; + } + // ACCUMULO-3497 an easy sanity check we can perform for the user when SASL is enabled. Clients and servers have to agree upon the FQDN // so that the SASL handshake can occur. If the provided hostname doesn't match the FQDN for this host, fail quickly and inform them to update // their configuration. - if (!"0.0.0.0".equals(hostname) && !hostname.equals(fqdn)) { + if (!hostname.equals(fqdn)) { log.error( "Expected hostname of '{}' but got '{}'. Ensure the entries in the Accumulo hosts files (e.g. masters, slaves) are the FQDN for each host when using SASL.", fqdn, hostname); @@ -413,7 +417,7 @@ public class TServerUtils { throw new TTransportException(e); } - log.trace("Logged in as {}, creating TSsaslServerTransport factory as {}/{}", serverUser, params.getKerberosServerPrimary(), hostname); + log.debug("Logged in as {}, creating TSaslServerTransport factory with {}/{}", serverUser, params.getKerberosServerPrimary(), hostname); // Make the SASL transport factory with the instance and primary from the kerberos server principal, SASL properties // and the SASL callback handler from Hadoop to ensure authorization ID is the authentication ID. Despite the 'protocol' argument seeming to be useless, it @@ -422,6 +426,14 @@ public class TServerUtils { saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, params.getKerberosServerPrimary(), hostname, params.getSaslProperties(), new SaslRpcServer.SaslGssCallbackHandler()); + if (null != params.getSecretManager()) { + log.info("Adding DIGEST-MD5 server definition for delegation tokens"); + saslTransportFactory.addServerDefinition(ThriftUtil.DIGEST_MD5, params.getKerberosServerPrimary(), hostname, params.getSaslProperties(), + new SaslServerDigestCallbackHandler(params.getSecretManager())); + } else { + log.info("SecretManager is null, not adding support for delegation token authentication"); + } + // Make sure the TTransportFactory is performing a UGI.doAs TTransportFactory ugiTransportFactory = new UGIAssumingTransportFactory(saslTransportFactory, serverUser); @@ -440,7 +452,7 @@ public class TServerUtils { public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, ThriftServerType serverType, TProcessor processor, String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, - SaslConnectionParams saslParams, long serverSocketTimeout) throws TTransportException { + SaslServerConnectionParams saslParams, long serverSocketTimeout) throws TTransportException { if (ThriftServerType.SASL == serverType) { processor = updateSaslProcessor(serverType, processor); @@ -452,11 +464,11 @@ public class TServerUtils { /** * @see #startTServer(HostAndPort, ThriftServerType, TimedProcessor, TProtocolFactory, String, String, int, int, long, long, SslConnectionParams, - * SaslConnectionParams, long) + * org.apache.accumulo.core.rpc.SaslConnectionParams, long) */ public static ServerAddress startTServer(HostAndPort address, ThriftServerType serverType, TimedProcessor processor, String serverName, String threadName, - int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, SaslConnectionParams saslParams, - long serverSocketTimeout) throws TTransportException { + int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, + SaslServerConnectionParams saslParams, long serverSocketTimeout) throws TTransportException { return startTServer(address, serverType, processor, ThriftUtil.protocolFactory(), serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, serverSocketTimeout); } @@ -468,7 +480,7 @@ public class TServerUtils { */ public static ServerAddress startTServer(HostAndPort address, ThriftServerType serverType, TimedProcessor processor, TProtocolFactory protocolFactory, String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, - SaslConnectionParams saslParams, long serverSocketTimeout) throws TTransportException { + SaslServerConnectionParams saslParams, long serverSocketTimeout) throws TTransportException { // This is presently not supported. It's hypothetically possible, I believe, to work, but it would require changes in how the transports // work at the Thrift layer to ensure that both the SSL and SASL handshakes function. SASL's quality of protection addresses privacy issues. http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java index ab106a6..48d18f4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java @@ -20,6 +20,7 @@ import java.io.IOException; import javax.security.sasl.SaslServer; +import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism; import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; @@ -40,6 +41,8 @@ public class UGIAssumingProcessor implements TProcessor { private static final Logger log = LoggerFactory.getLogger(UGIAssumingProcessor.class); public static final ThreadLocal rpcPrincipal = new ThreadLocal(); + public static final ThreadLocal rpcMechanism = new ThreadLocal(); + private final TProcessor wrapped; private final UserGroupInformation loginUser; @@ -60,6 +63,14 @@ public class UGIAssumingProcessor implements TProcessor { return rpcPrincipal.get(); } + public static ThreadLocal getRpcPrincipalThreadLocal() { + return rpcPrincipal; + } + + public static SaslMechanism rpcMechanism() { + return rpcMechanism.get(); + } + @Override public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException { TTransport trans = inProt.getTransport(); @@ -71,20 +82,42 @@ public class UGIAssumingProcessor implements TProcessor { String authId = saslServer.getAuthorizationID(); String endUser = authId; - log.trace("Received SASL RPC from {}", endUser); + SaslMechanism mechanism; + try { + mechanism = SaslMechanism.get(saslServer.getMechanismName()); + } catch (Exception e) { + log.error("Failed to process RPC with SASL mechanism {}", saslServer.getMechanismName()); + throw e; + } - UserGroupInformation clientUgi = UserGroupInformation.createProxyUser(endUser, loginUser); - final String remoteUser = clientUgi.getUserName(); + switch (mechanism) { + case GSSAPI: + UserGroupInformation clientUgi = UserGroupInformation.createProxyUser(endUser, loginUser); + final String remoteUser = clientUgi.getUserName(); - try { - // Set the principal in the ThreadLocal for access to get authorizations - rpcPrincipal.set(remoteUser); + try { + // Set the principal in the ThreadLocal for access to get authorizations + rpcPrincipal.set(remoteUser); - return wrapped.process(inProt, outProt); - } finally { - // Unset the principal after we're done using it just to be sure that it's not incorrectly - // used in the same thread down the line. - rpcPrincipal.set(null); + return wrapped.process(inProt, outProt); + } finally { + // Unset the principal after we're done using it just to be sure that it's not incorrectly + // used in the same thread down the line. + rpcPrincipal.set(null); + } + case DIGEST_MD5: + // The CallbackHandler, after deserializing the TokenIdentifier in the name, has already updated + // the rpcPrincipal for us. We don't need to do it again here. + try { + rpcMechanism.set(mechanism); + return wrapped.process(inProt, outProt); + } finally { + // Unset the mechanism after we're done using it just to be sure that it's not incorrectly + // used in the same thread down the line. + rpcMechanism.set(null); + } + default: + throw new IllegalArgumentException("Cannot process SASL mechanism " + mechanism); } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java index cc7a7cd..283cba3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java @@ -454,4 +454,18 @@ public class AuditedSecurityOperation extends SecurityOperation { throw e; } } + + public static final String DELEGATION_TOKEN_AUDIT_TEMPLATE = "requested delegation token"; + + @Override + public boolean canObtainDelegationToken(TCredentials credentials) throws ThriftSecurityException { + try { + boolean result = super.canObtainDelegationToken(credentials); + audit(credentials, result, DELEGATION_TOKEN_AUDIT_TEMPLATE); + return result; + } catch (ThriftSecurityException e) { + audit(credentials, false, DELEGATION_TOKEN_AUDIT_TEMPLATE); + throw e; + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java index 7adb46e..0b0f212 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java @@ -830,4 +830,8 @@ public class SecurityOperation { return hasSystemPermissionWithNamespaceId(credentials, SystemPermission.ALTER_NAMESPACE, namespaceId, false); } + public boolean canObtainDelegationToken(TCredentials credentials) throws ThriftSecurityException { + authenticate(credentials); + return hasSystemPermission(credentials, SystemPermission.OBTAIN_DELEGATION_TOKEN, false); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java index 51d50a1..6a915c6 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java @@ -33,7 +33,6 @@ import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; -import org.apache.accumulo.core.rpc.SaslConnectionParams; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.util.Base64; @@ -69,8 +68,7 @@ public final class SystemCredentials extends Credentials { check_permission(); String principal = SYSTEM_PRINCIPAL; AccumuloConfiguration conf = SiteConfiguration.getInstance(); - SaslConnectionParams saslParams = SaslConnectionParams.forConfig(conf); - if (null != saslParams) { + if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { // Use the server's kerberos principal as the Accumulo principal. We could also unwrap the principal server-side, but the principal for SystemCredentials // isnt' actually used anywhere, so it really doesn't matter. We can't include the kerberos principal in the SystemToken as it would break equality when // different Accumulo servers are using different kerberos principals are their accumulo principal http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationKey.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationKey.java b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationKey.java new file mode 100644 index 0000000..134502a --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationKey.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.server.security.delegation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import javax.crypto.SecretKey; + +import org.apache.accumulo.core.security.thrift.TAuthenticationKey; +import org.apache.accumulo.core.util.ThriftMessageUtil; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +/** + * Represents a secret key used for signing and verifying authentication tokens by {@link AuthenticationTokenSecretManager}. + */ +public class AuthenticationKey implements Writable { + private TAuthenticationKey authKey; + private SecretKey secret; + + public AuthenticationKey() { + // for Writable + } + + public AuthenticationKey(int keyId, long creationDate, long expirationDate, SecretKey key) { + checkNotNull(key); + authKey = new TAuthenticationKey(ByteBuffer.wrap(key.getEncoded())); + authKey.setCreationDate(creationDate); + authKey.setKeyId(keyId); + authKey.setExpirationDate(expirationDate); + this.secret = key; + } + + public int getKeyId() { + checkNotNull(authKey); + return authKey.getKeyId(); + } + + public long getCreationDate() { + checkNotNull(authKey); + return authKey.getCreationDate(); + } + + public void setCreationDate(long creationDate) { + checkNotNull(authKey); + authKey.setCreationDate(creationDate); + } + + public long getExpirationDate() { + checkNotNull(authKey); + return authKey.getExpirationDate(); + } + + public void setExpirationDate(long expirationDate) { + checkNotNull(authKey); + authKey.setExpirationDate(expirationDate); + } + + SecretKey getKey() { + return secret; + } + + void setKey(SecretKey secret) { + this.secret = secret; + } + + @Override + public int hashCode() { + if (null == authKey) { + return 1; + } + HashCodeBuilder hcb = new HashCodeBuilder(29, 31); + hcb.append(authKey.getKeyId()).append(authKey.getExpirationDate()).append(authKey.getCreationDate()).append(secret.getEncoded()); + return hcb.toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof AuthenticationKey)) { + return false; + } + AuthenticationKey other = (AuthenticationKey) obj; + // authKey might be null due to writable nature + if (null == authKey && null != other.authKey) { + return false; + } + return authKey.equals(other.authKey); + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("AuthenticationKey["); + if (null == authKey) { + buf.append("null]"); + } else { + buf.append("id=").append(authKey.getKeyId()).append(", expiration=").append(authKey.getExpirationDate()).append(", creation=") + .append(authKey.getCreationDate()).append("]"); + } + return buf.toString(); + } + + @Override + public void write(DataOutput out) throws IOException { + if (null == authKey) { + WritableUtils.writeVInt(out, 0); + return; + } + ThriftMessageUtil util = new ThriftMessageUtil(); + ByteBuffer serialized = util.serialize(authKey); + WritableUtils.writeVInt(out, serialized.limit() - serialized.arrayOffset()); + out.write(serialized.array(), serialized.arrayOffset(), serialized.limit()); + } + + @Override + public void readFields(DataInput in) throws IOException { + int length = WritableUtils.readVInt(in); + if (0 == length) { + return; + } + + ThriftMessageUtil util = new ThriftMessageUtil(); + byte[] bytes = new byte[length]; + in.readFully(bytes); + authKey = util.deserialize(bytes, new TAuthenticationKey()); + secret = AuthenticationTokenSecretManager.createSecretKey(authKey.getSecret()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManager.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManager.java b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManager.java new file mode 100644 index 0000000..3582cfd --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManager.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.security.delegation; + +import java.util.List; + +import org.apache.accumulo.core.util.Daemon; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Service that handles generation of the secret key used to create delegation tokens. + */ +public class AuthenticationTokenKeyManager extends Daemon { + private static final Logger log = LoggerFactory.getLogger(AuthenticationTokenKeyManager.class); + + private final AuthenticationTokenSecretManager secretManager; + private final ZooAuthenticationKeyDistributor keyDistributor; + + private long lastKeyUpdate = 0; + private long keyUpdateInterval; + private long tokenMaxLifetime; + private int idSeq = 0; + private volatile boolean keepRunning = true, initialized = false; + + /** + * Construct the key manager which will generate new AuthenticationKeys to generate and verify delegation tokens + * + * @param mgr + * The SecretManager in use + * @param dist + * The implementation to distribute AuthenticationKeys to ZooKeeper + * @param keyUpdateInterval + * The frequency, in milliseconds, that new AuthenticationKeys are created + * @param tokenMaxLifetime + * The lifetime, in milliseconds, of generated AuthenticationKeys (and subsequently delegation tokens). + */ + public AuthenticationTokenKeyManager(AuthenticationTokenSecretManager mgr, ZooAuthenticationKeyDistributor dist, long keyUpdateInterval, + long tokenMaxLifetime) { + super("Delegation Token Key Manager"); + this.secretManager = mgr; + this.keyDistributor = dist; + this.keyUpdateInterval = keyUpdateInterval; + this.tokenMaxLifetime = tokenMaxLifetime; + } + + @VisibleForTesting + void setKeepRunning(boolean keepRunning) { + this.keepRunning = keepRunning; + } + + public boolean isInitialized() { + return initialized; + } + + public void gracefulStop() { + keepRunning = false; + } + + @Override + public void run() { + // Make sure to initialize the secret manager with keys already in ZK + updateStateFromCurrentKeys(); + initialized = true; + + while (keepRunning) { + long now = System.currentTimeMillis(); + + _run(now); + + try { + Thread.sleep(5000); + } catch (InterruptedException ie) { + log.debug("Interrupted waiting for next update", ie); + } + } + } + + @VisibleForTesting + void updateStateFromCurrentKeys() { + try { + List currentKeys = keyDistributor.getCurrentKeys(); + if (!currentKeys.isEmpty()) { + for (AuthenticationKey key : currentKeys) { + // Ensure that we don't create new Keys with duplicate keyIds for keys that already exist + // It's not a big concern if we happen to duplicate keyIds for already expired keys. + if (key.getKeyId() > idSeq) { + idSeq = key.getKeyId(); + } + secretManager.addKey(key); + } + log.info("Added {} existing AuthenticationKeys into the local cache from ZooKeeper", currentKeys.size()); + + // Try to use the last key instead of creating a new one right away. This will present more expected + // functionality if the active master happens to die for some reasonn + AuthenticationKey currentKey = secretManager.getCurrentKey(); + if (null != currentKey) { + log.info("Updating last key update to {} from current secret manager key", currentKey.getCreationDate()); + lastKeyUpdate = currentKey.getCreationDate(); + } + } + } catch (KeeperException | InterruptedException e) { + log.warn("Failed to fetch existing AuthenticationKeys from ZooKeeper"); + } + } + + @VisibleForTesting + long getLastKeyUpdate() { + return lastKeyUpdate; + } + + @VisibleForTesting + int getIdSeq() { + return idSeq; + } + + /** + * Internal "run" method which performs the actual work. + * + * @param now + * The current time in millis since epoch. + */ + void _run(long now) { + // clear any expired keys + int removedKeys = secretManager.removeExpiredKeys(keyDistributor); + if (removedKeys > 0) { + log.debug("Removed {} expired keys from the local cache", removedKeys); + } + + if (lastKeyUpdate + keyUpdateInterval < now) { + log.debug("Key update interval passed, creating new authentication key"); + + // Increment the idSeq and use the new value as the unique ID + AuthenticationKey newKey = new AuthenticationKey(++idSeq, now, now + tokenMaxLifetime, secretManager.generateSecret()); + + log.debug("Created new {}", newKey.toString()); + + // Will set to be the current key given the idSeq + secretManager.addKey(newKey); + + // advertise it to tabletservers + try { + keyDistributor.advertise(newKey); + } catch (KeeperException | InterruptedException e) { + log.error("Failed to advertise AuthenticationKey in ZooKeeper. Exiting.", e); + throw new RuntimeException(e); + } + + lastKeyUpdate = now; + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenSecretManager.java b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenSecretManager.java new file mode 100644 index 0000000..99173d2 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenSecretManager.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.server.security.delegation; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import javax.crypto.SecretKey; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.admin.DelegationTokenConfig; +import org.apache.accumulo.core.client.security.tokens.DelegationToken; +import org.apache.accumulo.core.security.AuthenticationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; + +/** + * Manages an internal list of secret keys used to sign new authentication tokens as they are generated, and to validate existing tokens used for + * authentication. + * + * Each TabletServer, in addition to the Master, has an instance of this {@link SecretManager} so that each can authenticate requests from clients presenting + * delegation tokens. The Master will also run an instance of {@link AuthenticationTokenKeyManager} which handles generation of new keys and removal of old + * keys. That class will call the methods here to ensure the in-memory cache is consistent with what is advertised in ZooKeeper. + */ +public class AuthenticationTokenSecretManager extends SecretManager { + + private static final Logger log = LoggerFactory.getLogger(AuthenticationTokenSecretManager.class); + + private final Instance instance; + private final long tokenMaxLifetime; + private final ConcurrentHashMap allKeys = new ConcurrentHashMap(); + private AuthenticationKey currentKey; + + /** + * Create a new secret manager instance for generating keys. + * + * @param instance + * Accumulo instance + * @param tokenMaxLifetime + * Maximum age (in milliseconds) before a token expires and is no longer valid + */ + public AuthenticationTokenSecretManager(Instance instance, long tokenMaxLifetime) { + checkNotNull(instance); + checkArgument(tokenMaxLifetime > 0, "Max lifetime must be positive"); + this.instance = instance; + this.tokenMaxLifetime = tokenMaxLifetime; + } + + @Override + protected byte[] createPassword(AuthenticationTokenIdentifier identifier) { + DelegationTokenConfig cfg = identifier.getConfig(); + + long now = System.currentTimeMillis(); + final AuthenticationKey secretKey = currentKey; + identifier.setKeyId(secretKey.getKeyId()); + identifier.setIssueDate(now); + long expiration = now + tokenMaxLifetime; + // Catch overflow + if (expiration < now) { + expiration = Long.MAX_VALUE; + } + identifier.setExpirationDate(expiration); + + // Limit the lifetime if the user requests it + if (null != cfg) { + long requestedLifetime = cfg.getTokenLifetime(TimeUnit.MILLISECONDS); + if (0 < requestedLifetime) { + long requestedExpirationDate = identifier.getIssueDate() + requestedLifetime; + // Catch overflow again + if (requestedExpirationDate < identifier.getIssueDate()) { + requestedExpirationDate = Long.MAX_VALUE; + } + // Ensure that the user doesn't try to extend the expiration date -- they may only limit it + if (requestedExpirationDate > identifier.getExpirationDate()) { + throw new RuntimeException("Requested token lifetime exceeds configured maximum"); + } + log.trace("Overriding token expiration date from {} to {}", identifier.getExpirationDate(), requestedExpirationDate); + identifier.setExpirationDate(requestedExpirationDate); + } + } + + identifier.setInstanceId(instance.getInstanceID()); + return createPassword(identifier.getBytes(), secretKey.getKey()); + } + + @Override + public byte[] retrievePassword(AuthenticationTokenIdentifier identifier) throws InvalidToken { + long now = System.currentTimeMillis(); + if (identifier.getExpirationDate() < now) { + throw new InvalidToken("Token has expired"); + } + if (identifier.getIssueDate() > now) { + throw new InvalidToken("Token issued in the future"); + } + AuthenticationKey masterKey = allKeys.get(identifier.getKeyId()); + if (masterKey == null) { + throw new InvalidToken("Unknown master key for token (id=" + identifier.getKeyId() + ")"); + } + // regenerate the password + return createPassword(identifier.getBytes(), masterKey.getKey()); + } + + @Override + public AuthenticationTokenIdentifier createIdentifier() { + // Return our TokenIdentifier implementation + return new AuthenticationTokenIdentifier(); + } + + /** + * Generates a delegation token for the user with the provided {@code username}. + * + * @param username + * The client to generate the delegation token for. + * @param cfg + * A configuration object for obtaining the delegation token + * @return A delegation token for {@code username} created using the {@link #currentKey}. + */ + public Entry,AuthenticationTokenIdentifier> generateToken(String username, DelegationTokenConfig cfg) + throws AccumuloException { + checkNotNull(username); + checkNotNull(cfg); + + final AuthenticationTokenIdentifier id = new AuthenticationTokenIdentifier(username, cfg); + + final StringBuilder svcName = new StringBuilder(DelegationToken.SERVICE_NAME); + if (null != id.getInstanceId()) { + svcName.append("-").append(id.getInstanceId()); + } + // Create password will update the state on the identifier given currentKey. Need to call this before serializing the identifier + byte[] password; + try { + password = createPassword(id); + } catch (RuntimeException e) { + throw new AccumuloException(e.getMessage()); + } + // The use of the ServiceLoader inside Token doesn't work to automatically get the Identifier + // Explicitly returning the identifier also saves an extra deserialization + Token token = new Token(id.getBytes(), password, id.getKind(), new Text(svcName.toString())); + return Maps.immutableEntry(token, id); + } + + /** + * Add the provided {@code key} to the in-memory copy of all {@link AuthenticationKey}s. + * + * @param key + * The key to add. + */ + public synchronized void addKey(AuthenticationKey key) { + checkNotNull(key); + + log.debug("Adding AuthenticationKey with keyId {}", key.getKeyId()); + + allKeys.put(key.getKeyId(), key); + if (currentKey == null || key.getKeyId() > currentKey.getKeyId()) { + currentKey = key; + } + } + + /** + * Removes the {@link AuthenticationKey} from the local cache of keys using the provided {@link keyId}. + * + * @param keyId + * The unique ID for the {@link AuthenticationKey} to remove. + * @return True if the key was removed, otherwise false. + */ + synchronized boolean removeKey(Integer keyId) { + checkNotNull(keyId); + + log.debug("Removing AuthenticatioKey with keyId {}", keyId); + + return null != allKeys.remove(keyId); + } + + /** + * The current {@link AuthenticationKey}, may be null. + * + * @return The current key, or null. + */ + @VisibleForTesting + AuthenticationKey getCurrentKey() { + return currentKey; + } + + @VisibleForTesting + Map getKeys() { + return allKeys; + } + + /** + * Inspect each key cached in {@link #allKeys} and remove it if the expiration date has passed. For each removed local {@link AuthenticationKey}, the key is + * also removed from ZooKeeper using the provided {@code keyDistributor} instance. + * + * @param keyDistributor + * ZooKeeper key distribution class + */ + synchronized int removeExpiredKeys(ZooAuthenticationKeyDistributor keyDistributor) { + long now = System.currentTimeMillis(); + int keysRemoved = 0; + Iterator> iter = allKeys.entrySet().iterator(); + while (iter.hasNext()) { + Entry entry = iter.next(); + AuthenticationKey key = entry.getValue(); + if (key.getExpirationDate() < now) { + log.debug("Removing expired delegation token key {}", key.getKeyId()); + iter.remove(); + keysRemoved++; + try { + keyDistributor.remove(key); + } catch (KeeperException | InterruptedException e) { + log.error("Failed to remove AuthenticationKey from ZooKeeper. Exiting", e); + throw new RuntimeException(e); + } + } + } + return keysRemoved; + } + + synchronized boolean isCurrentKeySet() { + return null != currentKey; + } + + /** + * Atomic operation to remove all AuthenticationKeys + */ + public synchronized void removeAllKeys() { + allKeys.clear(); + currentKey = null; + } + + @Override + protected SecretKey generateSecret() { + // Method in the parent is a different package, provide the explicit override so we can use it directly in our package. + return super.generateSecret(); + } + + public static SecretKey createSecretKey(byte[] raw) { + return SecretManager.createSecretKey(raw); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyDistributor.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyDistributor.java b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyDistributor.java new file mode 100644 index 0000000..515b036 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyDistributor.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.security.delegation; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.accumulo.fate.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Class that manages distribution of {@link AuthenticationKey}s, Accumulo's secret in the delegation token model, to other Accumulo nodes via ZooKeeper. + */ +public class ZooAuthenticationKeyDistributor { + private static final Logger log = LoggerFactory.getLogger(ZooAuthenticationKeyDistributor.class); + + private final ZooReaderWriter zk; + private final String baseNode; + private boolean initialized = false; + + public ZooAuthenticationKeyDistributor(ZooReaderWriter zk, String baseNode) { + checkNotNull(zk); + checkNotNull(baseNode); + this.zk = zk; + this.baseNode = baseNode; + } + + /** + * Ensures that ZooKeeper is in a correct state to perform distribution of {@link AuthenticationKey}s. + */ + public synchronized void initialize() throws KeeperException, InterruptedException { + if (initialized) { + return; + } + + if (!zk.exists(baseNode)) { + if (!zk.putPrivatePersistentData(baseNode, new byte[0], NodeExistsPolicy.FAIL)) { + throw new AssertionError("Got false from putPrivatePersistentData method"); + } + } else { + List acls = zk.getACL(baseNode, new Stat()); + if (1 == acls.size()) { + ACL actualAcl = acls.get(0), expectedAcl = ZooUtil.PRIVATE.get(0); + Id actualId = actualAcl.getId(); + // The expected outcome from ZooUtil.PRIVATE + if (actualAcl.getPerms() == expectedAcl.getPerms() && actualId.getScheme().equals("digest") && actualId.getId().startsWith("accumulo:")) { + initialized = true; + return; + } + } else { + log.error("Saw more than one ACL on the node"); + } + + log.error("Expected {} to have ACLs {} but was {}", baseNode, ZooUtil.PRIVATE, acls); + throw new IllegalStateException("Delegation token secret key node in ZooKeeper is not protected."); + } + + initialized = true; + } + + /** + * Fetch all {@link AuthenticationKey}s currently stored in ZooKeeper beneath the configured {@code baseNode}. + * + * @return A list of {@link AuthenticationKey}s + */ + public List getCurrentKeys() throws KeeperException, InterruptedException { + checkState(initialized, "Not initialized"); + List children = zk.getChildren(baseNode); + + // Shortcircuit to avoid a list creation + if (children.isEmpty()) { + return Collections. emptyList(); + } + + // Deserialize each byte[] into an AuthenticationKey + List keys = new ArrayList<>(children.size()); + for (String child : children) { + byte[] data = zk.getData(qualifyPath(child), null); + if (null != data) { + AuthenticationKey key = new AuthenticationKey(); + try { + key.readFields(new DataInputStream(new ByteArrayInputStream(data))); + } catch (IOException e) { + throw new AssertionError("Error reading from in-memory buffer which should not happen", e); + } + keys.add(key); + } + } + + return keys; + } + + /** + * Add the given {@link AuthenticationKey} to ZooKeeper. + * + * @param newKey + * The key to add to ZooKeeper + */ + public synchronized void advertise(AuthenticationKey newKey) throws KeeperException, InterruptedException { + checkState(initialized, "Not initialized"); + checkNotNull(newKey); + + // Make sure the node doesn't already exist + String path = qualifyPath(newKey); + if (zk.exists(path)) { + log.warn("AuthenticationKey with ID '{}' already exists in ZooKeeper", newKey.getKeyId()); + return; + } + + // Serialize it + ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); + try { + newKey.write(new DataOutputStream(baos)); + } catch (IOException e) { + throw new AssertionError("Should not get exception writing to in-memory buffer", e); + } + + byte[] serializedKey = baos.toByteArray(); + + log.debug("Advertising AuthenticationKey with keyId {} in ZooKeeper at {}", newKey.getKeyId(), path); + + // Put it into ZK with the private ACL + zk.putPrivatePersistentData(path, serializedKey, NodeExistsPolicy.FAIL); + } + + /** + * Remove the given {@link AuthenticationKey} from ZooKeeper. If the node for the provided {@code key} doesn't exist in ZooKeeper, a warning is printed but an + * error is not thrown. Since there is only a single process managing ZooKeeper at one time, any inconsistencies should be client error. + * + * @param key + * The key to remove from ZooKeeper + */ + public synchronized void remove(AuthenticationKey key) throws KeeperException, InterruptedException { + checkState(initialized, "Not initialized"); + checkNotNull(key); + + String path = qualifyPath(key); + if (!zk.exists(path)) { + log.warn("AuthenticationKey with ID '{}' doesn't exist in ZooKeeper", key.getKeyId()); + return; + } + + log.debug("Removing AuthenticationKey with keyId {} from ZooKeeper at {}", key.getKeyId(), path); + + // Delete the node, any version + zk.delete(path, -1); + } + + String qualifyPath(String keyId) { + return baseNode + "/" + keyId; + } + + String qualifyPath(AuthenticationKey key) { + return qualifyPath(Integer.toString(key.getKeyId())); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyWatcher.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyWatcher.java new file mode 100644 index 0000000..2913343 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyWatcher.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.security.delegation; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; + +import org.apache.accumulo.fate.zookeeper.ZooReader; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Watch ZooKeeper to notice changes in the published keys so that authenticate can properly occur using delegation tokens. + */ +public class ZooAuthenticationKeyWatcher implements Watcher { + private static final Logger log = LoggerFactory.getLogger(ZooAuthenticationKeyWatcher.class); + + private final AuthenticationTokenSecretManager secretManager; + private final ZooReader zk; + private final String baseNode; + + public ZooAuthenticationKeyWatcher(AuthenticationTokenSecretManager secretManager, ZooReader zk, String baseNode) { + this.secretManager = secretManager; + this.zk = zk; + this.baseNode = baseNode; + } + + @Override + public void process(WatchedEvent event) { + if (EventType.None == event.getType()) { + switch (event.getState()) { + case Disconnected: // Intentional fall through of case + case Expired: // ZooReader is handling the Expiration of the original ZooKeeper object for us + log.debug("ZooKeeper connection disconnected, clearing secret manager"); + secretManager.removeAllKeys(); + break; + case SyncConnected: + log.debug("ZooKeeper reconnected, updating secret manager"); + try { + updateAuthKeys(); + } catch (KeeperException | InterruptedException e) { + log.error("Failed to update secret manager after ZooKeeper reconnect"); + } + break; + default: + log.warn("Unhandled: " + event); + } + + // Nothing more to do for EventType.None + return; + } + + String path = event.getPath(); + if (null == path) { + return; + } + + if (!path.startsWith(baseNode)) { + log.info("Ignoring event for path: {}", path); + return; + } + + try { + if (path.equals(baseNode)) { + processBaseNode(event); + } else { + processChildNode(event); + } + } catch (KeeperException | InterruptedException e) { + log.error("Failed to communicate with ZooKeeper", e); + } + } + + /** + * Process the {@link WatchedEvent} for the base znode that the {@link AuthenticationKey}s are stored in. + */ + void processBaseNode(WatchedEvent event) throws KeeperException, InterruptedException { + switch (event.getType()) { + case NodeDeleted: + // The parent node was deleted, no children are possible, remove all keys + log.debug("Parent ZNode was deleted, removing all AuthenticationKeys"); + secretManager.removeAllKeys(); + break; + case None: + // Not connected, don't care + break; + case NodeCreated: // intentional fall-through to NodeChildrenChanged + case NodeChildrenChanged: + // Process each child, and reset the watcher on the parent node. We know that the node exists + updateAuthKeys(event.getPath()); + break; + case NodeDataChanged: + // The data on the parent changed. We aren't storing anything there so it's a noop + break; + default: + log.warn("Unsupported event type: {}", event.getType()); + break; + } + } + + /** + * Entry point to seed the local {@link AuthenticationKey} cache from ZooKeeper and set the first watcher for future updates in ZooKeeper. + */ + public void updateAuthKeys() throws KeeperException, InterruptedException { + // Might cause two watchers on baseNode, but only at startup for each tserver. + if (zk.exists(baseNode, this)) { + log.info("Added {} existing AuthenticationKeys to local cache from ZooKeeper", updateAuthKeys(baseNode)); + } + } + + private int updateAuthKeys(String path) throws KeeperException, InterruptedException { + int keysAdded = 0; + for (String child : zk.getChildren(path, this)) { + String childPath = path + "/" + child; + // Get the node data and reset the watcher + AuthenticationKey key = deserializeKey(zk.getData(childPath, this, null)); + secretManager.addKey(key); + keysAdded++; + } + return keysAdded; + } + + /** + * Process the {@link WatchedEvent} for a node which represents an {@link AuthenticationKey} + */ + void processChildNode(WatchedEvent event) throws KeeperException, InterruptedException { + final String path = event.getPath(); + switch (event.getType()) { + case NodeDeleted: + // Key expired + if (null == path) { + log.error("Got null path for NodeDeleted event"); + return; + } + + // Pull off the base ZK path and the '/' separator + String childName = path.substring(baseNode.length() + 1); + secretManager.removeKey(Integer.parseInt(childName)); + break; + case None: + // Not connected, don't care. We'll update when we're reconnected + break; + case NodeCreated: + // New key created + if (null == path) { + log.error("Got null path for NodeCreated event"); + return; + } + // Get the data and reset the watcher + AuthenticationKey key = deserializeKey(zk.getData(path, this, null)); + log.debug("Adding AuthenticationKey with keyId {}", key.getKeyId()); + secretManager.addKey(key); + break; + case NodeDataChanged: + // Key changed, could happen on restart after not running Accumulo. + if (null == path) { + log.error("Got null path for NodeDataChanged event"); + return; + } + // Get the data and reset the watcher + AuthenticationKey newKey = deserializeKey(zk.getData(path, this, null)); + // Will overwrite the old key if one exists + secretManager.addKey(newKey); + break; + case NodeChildrenChanged: + // no children for the children.. + log.warn("Unexpected NodeChildrenChanged event for authentication key node {}", path); + break; + default: + log.warn("Unsupported event type: {}", event.getType()); + break; + } + } + + /** + * Deserialize the bytes into an {@link AuthenticationKey} + */ + AuthenticationKey deserializeKey(byte[] serializedKey) { + AuthenticationKey key = new AuthenticationKey(); + try { + key.readFields(new DataInputStream(new ByteArrayInputStream(serializedKey))); + } catch (IOException e) { + throw new AssertionError("Failed to read from an in-memory buffer"); + } + return key; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java index 08fa55b..369fa89 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java @@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.DelegationToken; import org.apache.accumulo.core.client.security.tokens.KerberosToken; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.SiteConfiguration; @@ -131,7 +132,7 @@ public class KerberosAuthenticator implements Authenticator { } // User is authenticated at the transport layer -- nothing extra is necessary - if (token instanceof KerberosToken) { + if (token instanceof KerberosToken || token instanceof DelegationToken) { return true; } return false; http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java b/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java index 49a60a6..92b6be8 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java @@ -16,6 +16,10 @@ */ package org.apache.accumulo.server; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.security.PrivilegedExceptionAction; import java.util.Iterator; import java.util.Map.Entry; @@ -24,12 +28,15 @@ import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; import org.apache.accumulo.core.client.impl.ClientContext; import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; -import org.apache.accumulo.core.rpc.SaslConnectionParams; +import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.server.conf.ServerConfigurationFactory; +import org.apache.accumulo.server.rpc.SaslServerConnectionParams; import org.apache.accumulo.server.rpc.ThriftServerType; +import org.apache.accumulo.server.security.SystemCredentials.SystemToken; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.security.UserGroupInformation; @@ -69,17 +76,27 @@ public class AccumuloServerContextTest { final AccumuloConfiguration conf = ClientContext.convertClientConfig(clientConf); SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class); + EasyMock.expect(siteConfig.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)).andReturn(true); + + // Deal with SystemToken being private + PasswordToken pw = new PasswordToken("fake"); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + pw.write(new DataOutputStream(baos)); + SystemToken token = new SystemToken(); + token.readFields(new DataInputStream(new ByteArrayInputStream(baos.toByteArray()))); + ServerConfigurationFactory factory = EasyMock.createMock(ServerConfigurationFactory.class); EasyMock.expect(factory.getConfiguration()).andReturn(conf).anyTimes(); EasyMock.expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes(); EasyMock.expect(factory.getInstance()).andReturn(instance).anyTimes(); AccumuloServerContext context = EasyMock.createMockBuilder(AccumuloServerContext.class).addMockedMethod("enforceKerberosLogin") - .addMockedMethod("getConfiguration").addMockedMethod("getServerConfigurationFactory").createMock(); + .addMockedMethod("getConfiguration").addMockedMethod("getServerConfigurationFactory").addMockedMethod("getCredentials").createMock(); context.enforceKerberosLogin(); EasyMock.expectLastCall().anyTimes(); EasyMock.expect(context.getConfiguration()).andReturn(conf).anyTimes(); EasyMock.expect(context.getServerConfigurationFactory()).andReturn(factory).anyTimes(); + EasyMock.expect(context.getCredentials()).andReturn(new Credentials("accumulo/hostname@FAKE.COM", token)).once(); // Just make the SiteConfiguration delegate to our ClientConfiguration (by way of the AccumuloConfiguration) // Presently, we only need get(Property) and iterator(). @@ -101,8 +118,8 @@ public class AccumuloServerContextTest { EasyMock.replay(factory, context, siteConfig); Assert.assertEquals(ThriftServerType.SASL, context.getThriftServerType()); - SaslConnectionParams saslParams = context.getServerSaslParams(); - Assert.assertEquals(SaslConnectionParams.forConfig(conf), saslParams); + SaslServerConnectionParams saslParams = context.getSaslParams(); + Assert.assertEquals(new SaslServerConnectionParams(conf, token), saslParams); Assert.assertEquals(username, saslParams.getPrincipal()); EasyMock.verify(factory, context, siteConfig); http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslDigestCallbackHandlerTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslDigestCallbackHandlerTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslDigestCallbackHandlerTest.java new file mode 100644 index 0000000..6c965ff --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslDigestCallbackHandlerTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.rpc; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.Map.Entry; + +import javax.crypto.KeyGenerator; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; + +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.admin.DelegationTokenConfig; +import org.apache.accumulo.core.rpc.SaslDigestCallbackHandler; +import org.apache.accumulo.core.security.AuthenticationTokenIdentifier; +import org.apache.accumulo.server.security.delegation.AuthenticationKey; +import org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretManager; +import org.apache.hadoop.security.token.Token; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class SaslDigestCallbackHandlerTest { + + /** + * Allows access to the methods on SaslDigestCallbackHandler + */ + private static class SaslTestDigestCallbackHandler extends SaslDigestCallbackHandler { + @Override + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { + throw new UnsupportedOperationException(); + } + } + + // From org.apache.hadoop.security.token.SecretManager + private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA1"; + private static final int KEY_LENGTH = 64; + private static KeyGenerator keyGen; + + @BeforeClass + public static void setupKeyGenerator() throws Exception { + // From org.apache.hadoop.security.token.SecretManager + keyGen = KeyGenerator.getInstance(DEFAULT_HMAC_ALGORITHM); + keyGen.init(KEY_LENGTH); + } + + private SaslTestDigestCallbackHandler handler; + private DelegationTokenConfig cfg; + + @Before + public void setup() { + handler = new SaslTestDigestCallbackHandler(); + cfg = new DelegationTokenConfig(); + } + + @Test + public void testIdentifierSerialization() throws IOException { + AuthenticationTokenIdentifier identifier = new AuthenticationTokenIdentifier("user", 1, 100l, 1000l, "instanceid"); + byte[] serialized = identifier.getBytes(); + String name = handler.encodeIdentifier(serialized); + + byte[] reserialized = handler.decodeIdentifier(name); + assertArrayEquals(serialized, reserialized); + + AuthenticationTokenIdentifier copy = new AuthenticationTokenIdentifier(); + copy.readFields(new DataInputStream(new ByteArrayInputStream(reserialized))); + + assertEquals(identifier, copy); + } + + @Test + public void testTokenSerialization() throws Exception { + Instance instance = createMock(Instance.class); + AuthenticationTokenSecretManager secretManager = new AuthenticationTokenSecretManager(instance, 1000l); + expect(instance.getInstanceID()).andReturn("instanceid"); + + replay(instance); + + secretManager.addKey(new AuthenticationKey(1, 0l, 100l, keyGen.generateKey())); + Entry,AuthenticationTokenIdentifier> entry = secretManager.generateToken("user", cfg); + byte[] password = entry.getKey().getPassword(); + char[] encodedPassword = handler.encodePassword(password); + + char[] computedPassword = handler.getPassword(secretManager, entry.getValue()); + + verify(instance); + + assertArrayEquals(computedPassword, encodedPassword); + } + + @Test + public void testTokenAndIdentifierSerialization() throws Exception { + Instance instance = createMock(Instance.class); + AuthenticationTokenSecretManager secretManager = new AuthenticationTokenSecretManager(instance, 1000l); + expect(instance.getInstanceID()).andReturn("instanceid"); + + replay(instance); + + secretManager.addKey(new AuthenticationKey(1, 0l, 1000 * 100l, keyGen.generateKey())); + Entry,AuthenticationTokenIdentifier> entry = secretManager.generateToken("user", cfg); + byte[] password = entry.getKey().getPassword(); + char[] encodedPassword = handler.encodePassword(password); + String name = handler.encodeIdentifier(entry.getValue().getBytes()); + + byte[] decodedIdentifier = handler.decodeIdentifier(name); + AuthenticationTokenIdentifier identifier = new AuthenticationTokenIdentifier(); + identifier.readFields(new DataInputStream(new ByteArrayInputStream(decodedIdentifier))); + char[] computedPassword = handler.getPassword(secretManager, identifier); + + verify(instance); + + assertArrayEquals(computedPassword, encodedPassword); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslServerConnectionParamsTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslServerConnectionParamsTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslServerConnectionParamsTest.java new file mode 100644 index 0000000..39bf9e4 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslServerConnectionParamsTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.rpc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.security.PrivilegedExceptionAction; +import java.util.Map; + +import javax.security.sasl.Sasl; + +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.rpc.SaslConnectionParams; +import org.apache.accumulo.core.rpc.SaslConnectionParams.QualityOfProtection; +import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism; +import org.apache.accumulo.server.security.SystemCredentials.SystemToken; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; + +public class SaslServerConnectionParamsTest { + + private UserGroupInformation testUser; + private String username; + + @Before + public void setup() throws Exception { + System.setProperty("java.security.krb5.realm", "accumulo"); + System.setProperty("java.security.krb5.kdc", "fake"); + Configuration conf = new Configuration(false); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(conf); + testUser = UserGroupInformation.createUserForTesting("test_user", new String[0]); + username = testUser.getUserName(); + } + + @Test + public void testDefaultParamsAsServer() throws Exception { + testUser.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + final ClientConfiguration clientConf = ClientConfiguration.loadDefault(); + + // The primary is the first component of the principal + final String primary = "accumulo"; + clientConf.withSasl(true, primary); + + final AccumuloConfiguration rpcConf = ClientContext.convertClientConfig(clientConf); + assertEquals("true", clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED)); + + // Deal with SystemToken being private + PasswordToken pw = new PasswordToken("fake"); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + pw.write(new DataOutputStream(baos)); + SystemToken token = new SystemToken(); + token.readFields(new DataInputStream(new ByteArrayInputStream(baos.toByteArray()))); + + final SaslConnectionParams saslParams = new SaslServerConnectionParams(rpcConf, token); + assertEquals(primary, saslParams.getKerberosServerPrimary()); + assertEquals(SaslMechanism.GSSAPI, saslParams.getMechanism()); + assertNull(saslParams.getCallbackHandler()); + + final QualityOfProtection defaultQop = QualityOfProtection.get(Property.RPC_SASL_QOP.getDefaultValue()); + assertEquals(defaultQop, saslParams.getQualityOfProtection()); + + Map properties = saslParams.getSaslProperties(); + assertEquals(1, properties.size()); + assertEquals(defaultQop.getQuality(), properties.get(Sasl.QOP)); + assertEquals(username, saslParams.getPrincipal()); + return null; + } + }); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationKeyTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationKeyTest.java b/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationKeyTest.java new file mode 100644 index 0000000..02e22aa --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationKeyTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.security.delegation; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import javax.crypto.KeyGenerator; +import javax.crypto.SecretKey; + +import org.junit.BeforeClass; +import org.junit.Test; + +public class AuthenticationKeyTest { + // From org.apache.hadoop.security.token.SecretManager + private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA1"; + private static final int KEY_LENGTH = 64; + private static KeyGenerator keyGen; + + @BeforeClass + public static void setupKeyGenerator() throws Exception { + // From org.apache.hadoop.security.token.SecretManager + keyGen = KeyGenerator.getInstance(DEFAULT_HMAC_ALGORITHM); + keyGen.init(KEY_LENGTH); + } + + @Test(expected = NullPointerException.class) + public void testNullSecretKey() { + new AuthenticationKey(0, 0, 0, null); + } + + @Test + public void testAuthKey() { + SecretKey secretKey = keyGen.generateKey(); + int keyId = 20; + long creationDate = 38383838l, expirationDate = 83838383l; + AuthenticationKey authKey = new AuthenticationKey(keyId, creationDate, expirationDate, secretKey); + assertEquals(secretKey, authKey.getKey()); + assertEquals(keyId, authKey.getKeyId()); + assertEquals(expirationDate, authKey.getExpirationDate()); + + // Empty instance + AuthenticationKey badCopy = new AuthenticationKey(); + + assertNotEquals(badCopy, authKey); + assertNotEquals(badCopy.hashCode(), authKey.hashCode()); + + // Different object, same arguments + AuthenticationKey goodCopy = new AuthenticationKey(keyId, creationDate, expirationDate, secretKey); + assertEquals(authKey, goodCopy); + assertEquals(authKey.hashCode(), goodCopy.hashCode()); + } + + @Test + public void testWritable() throws IOException { + SecretKey secretKey = keyGen.generateKey(); + int keyId = 20; + long creationDate = 38383838l, expirationDate = 83838383l; + AuthenticationKey authKey = new AuthenticationKey(keyId, creationDate, expirationDate, secretKey); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + authKey.write(out); + byte[] serialized = baos.toByteArray(); + + DataInputStream in = new DataInputStream(new ByteArrayInputStream(serialized)); + AuthenticationKey copy = new AuthenticationKey(); + copy.readFields(in); + + assertEquals(authKey, copy); + assertEquals(authKey.hashCode(), copy.hashCode()); + assertEquals(secretKey, copy.getKey()); + assertEquals(keyId, copy.getKeyId()); + assertEquals(expirationDate, copy.getExpirationDate()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManagerTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManagerTest.java b/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManagerTest.java new file mode 100644 index 0000000..bc2968a --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManagerTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.security.delegation; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; + +import javax.crypto.KeyGenerator; +import javax.crypto.SecretKey; + +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AuthenticationTokenKeyManagerTest { + private static final Logger log = LoggerFactory.getLogger(AuthenticationTokenKeyManagerTest.class); + + // From org.apache.hadoop.security.token.SecretManager + private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA1"; + private static final int KEY_LENGTH = 64; + private static KeyGenerator keyGen; + + @BeforeClass + public static void setupKeyGenerator() throws Exception { + // From org.apache.hadoop.security.token.SecretManager + keyGen = KeyGenerator.getInstance(DEFAULT_HMAC_ALGORITHM); + keyGen.init(KEY_LENGTH); + } + + private AuthenticationTokenSecretManager secretManager; + private ZooAuthenticationKeyDistributor zooDistributor; + + @Before + public void setupMocks() { + secretManager = createMock(AuthenticationTokenSecretManager.class); + zooDistributor = createMock(ZooAuthenticationKeyDistributor.class); + } + + @Test + public void testIntervalNotPassed() { + long updateInterval = 5 * 1000l; + long tokenLifetime = 100 * 1000l; + AuthenticationTokenKeyManager keyManager = new AuthenticationTokenKeyManager(secretManager, zooDistributor, updateInterval, tokenLifetime); + + // Have never updated the key + assertEquals(0l, keyManager.getLastKeyUpdate()); + + // Always check for expired keys to remove + expect(secretManager.removeExpiredKeys(zooDistributor)).andReturn(0); + + replay(secretManager, zooDistributor); + + // Run at time 0. Last run time is still 0. 0 + 5000 > 0, so we won't generate a new key + keyManager._run(0); + + verify(secretManager, zooDistributor); + } + + @Test + public void testIntervalHasPassed() throws Exception { + long updateInterval = 0 * 1000l; + long tokenLifetime = 100 * 1000l; + long runTime = 10l; + SecretKey secretKey = keyGen.generateKey(); + + AuthenticationKey authKey = new AuthenticationKey(1, runTime, runTime + tokenLifetime, secretKey); + AuthenticationTokenKeyManager keyManager = new AuthenticationTokenKeyManager(secretManager, zooDistributor, updateInterval, tokenLifetime); + + // Have never updated the key + assertEquals(0l, keyManager.getLastKeyUpdate()); + + // Always check for expired keys to remove + expect(secretManager.removeExpiredKeys(zooDistributor)).andReturn(0); + expect(secretManager.generateSecret()).andReturn(secretKey); + secretManager.addKey(authKey); + expectLastCall().once(); + zooDistributor.advertise(authKey); + expectLastCall().once(); + + replay(secretManager, zooDistributor); + + // Run at time 10. Last run time is still 0. 0 + 10 > 0, so we will generate a new key + keyManager._run(runTime); + + verify(secretManager, zooDistributor); + + // Last key update time should match when we ran + assertEquals(runTime, keyManager.getLastKeyUpdate()); + // KeyManager uses the incremented value for the new AuthKey (the current idSeq will match the keyId for the last generated key) + assertEquals(authKey.getKeyId(), keyManager.getIdSeq()); + } + + @Test(timeout = 30 * 1000) + public void testStopLoop() throws InterruptedException { + final AuthenticationTokenKeyManager keyManager = EasyMock.createMockBuilder(AuthenticationTokenKeyManager.class).addMockedMethod("_run") + .addMockedMethod("updateStateFromCurrentKeys").createMock(); + final CountDownLatch latch = new CountDownLatch(1); + + // Mock out the _run and updateStateFromCurrentKeys method so we just get the logic from "run()" + keyManager._run(EasyMock.anyLong()); + expectLastCall().once(); + keyManager.updateStateFromCurrentKeys(); + expectLastCall().once(); + + replay(keyManager); + + keyManager.setKeepRunning(true); + + // Wrap another Runnable around our KeyManager so we know when the thread is actually run as it's "async" when the method will actually be run after we call + // thread.start() + Thread t = new Thread(new Runnable() { + @Override + public void run() { + log.info("Thread running"); + latch.countDown(); + keyManager.run(); + } + }); + + log.info("Starting thread"); + t.start(); + + // Wait for the thread to start + latch.await(); + log.info("Latch fired"); + + // Wait a little bit to let the first call to _run() happen (avoid exiting the loop before any calls to _run()) + Thread.sleep(1000); + + log.info("Finished waiting, stopping keymanager"); + + keyManager.gracefulStop(); + + log.info("Waiting for thread to exit naturally"); + + t.join(); + + verify(keyManager); + } + + @Test + public void testExistingKeysAreAddedAtStartup() throws Exception { + long updateInterval = 0 * 1000l; + long tokenLifetime = 100 * 1000l; + SecretKey secretKey1 = keyGen.generateKey(), secretKey2 = keyGen.generateKey(); + + AuthenticationKey authKey1 = new AuthenticationKey(1, 0, tokenLifetime, secretKey1), authKey2 = new AuthenticationKey(2, tokenLifetime, tokenLifetime * 2, + secretKey2); + AuthenticationTokenKeyManager keyManager = new AuthenticationTokenKeyManager(secretManager, zooDistributor, updateInterval, tokenLifetime); + + // Have never updated the key + assertEquals(0l, keyManager.getLastKeyUpdate()); + + // Always check for expired keys to remove + expect(zooDistributor.getCurrentKeys()).andReturn(Arrays.asList(authKey1, authKey2)); + secretManager.addKey(authKey1); + expectLastCall().once(); + secretManager.addKey(authKey2); + expectLastCall().once(); + expect(secretManager.getCurrentKey()).andReturn(authKey2).once(); + + replay(secretManager, zooDistributor); + + // Initialize the state from zookeeper + keyManager.updateStateFromCurrentKeys(); + + verify(secretManager, zooDistributor); + + assertEquals(authKey2.getKeyId(), keyManager.getIdSeq()); + assertEquals(authKey2.getCreationDate(), keyManager.getLastKeyUpdate()); + } +}