From commits-return-10502-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Mon Oct 8 11:26:58 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1349D180652 for ; Mon, 8 Oct 2018 11:26:57 +0200 (CEST) Received: (qmail 11559 invoked by uid 500); 8 Oct 2018 09:26:57 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 11549 invoked by uid 99); 8 Oct 2018 09:26:57 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Oct 2018 09:26:57 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 7474482C17; Mon, 8 Oct 2018 09:26:56 +0000 (UTC) Date: Mon, 08 Oct 2018 09:26:55 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 2.1 updated: KAFKA-7462: Make token optional for OAuthBearerLoginModule (#5733) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153899081468.8682.7838413999579243984@gitbox.apache.org> From: rsivaram@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/2.1 X-Git-Reftype: branch X-Git-Oldrev: 5ad2dc36dd71aef04674200ca0afa4188edb54ba X-Git-Newrev: 6c42934dcfe358c242fc57f4a938b1139b96d720 X-Git-Rev: 6c42934dcfe358c242fc57f4a938b1139b96d720 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 6c42934 KAFKA-7462: Make token optional for OAuthBearerLoginModule (#5733) 6c42934 is described below commit 6c42934dcfe358c242fc57f4a938b1139b96d720 Author: Rajini Sivaram AuthorDate: Mon Oct 8 10:14:25 2018 +0100 KAFKA-7462: Make token optional for OAuthBearerLoginModule (#5733) OAuthBearerLoginModule is used both on the server-side and client-side (similar to login modules for other mechanisms). OAUTHBEARER tokens are client credentials used only on the client-side to authenticate with servers, but the current implementation requires tokens to be provided on the server-side even if OAUTHBEARER is not used for inter-broker communication. This commit makes tokens optional for server-side login context to allow brokers to be configured without a token when OAUT [...] Reviewers: Ron Dagostino , Jun Rao --- .../oauthbearer/OAuthBearerLoginModule.java | 107 ++++++++++++++------- .../oauthbearer/OAuthBearerTokenCallback.java | 4 +- .../OAuthBearerUnsecuredLoginCallbackHandler.java | 8 ++ .../authenticator/SaslAuthenticatorTest.java | 35 +++++++ 4 files changed, 119 insertions(+), 35 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java index 1dcd199..e3a7810 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java @@ -236,6 +236,21 @@ import org.slf4j.LoggerFactory; * @see SaslConfigs#SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC */ public class OAuthBearerLoginModule implements LoginModule { + + /** + * Login state transitions: + * Initial state: NOT_LOGGED_IN + * login() : NOT_LOGGED_IN => LOGGED_IN_NOT_COMMITTED + * commit() : LOGGED_IN_NOT_COMMITTED => COMMITTED + * abort() : LOGGED_IN_NOT_COMMITTED => NOT_LOGGED_IN + * logout() : Any state => NOT_LOGGED_IN + */ + private enum LoginState { + NOT_LOGGED_IN, + LOGGED_IN_NOT_COMMITTED, + COMMITTED + } + /** * The SASL Mechanism name for OAuth 2: {@code OAUTHBEARER} */ @@ -248,6 +263,7 @@ public class OAuthBearerLoginModule implements LoginModule { private OAuthBearerToken myCommittedToken = null; private SaslExtensions extensionsRequiringCommit = null; private SaslExtensions myCommittedExtensions = null; + private LoginState loginState; static { OAuthBearerSaslClientProvider.initialize(); // not part of public API @@ -266,17 +282,29 @@ public class OAuthBearerLoginModule implements LoginModule { @Override public boolean login() throws LoginException { - if (tokenRequiringCommit != null) - throw new IllegalStateException(String.format( + if (loginState == LoginState.LOGGED_IN_NOT_COMMITTED) { + if (tokenRequiringCommit != null) + throw new IllegalStateException(String.format( "Already have an uncommitted token with private credential token count=%d", committedTokenCount())); - if (myCommittedToken != null) - throw new IllegalStateException(String.format( + else + throw new IllegalStateException("Already logged in without a token"); + } + if (loginState == LoginState.COMMITTED) { + if (myCommittedToken != null) + throw new IllegalStateException(String.format( "Already have a committed token with private credential token count=%d; must login on another login context or logout here first before reusing the same login context", committedTokenCount())); + else + throw new IllegalStateException("Login has already been committed without a token"); + } identifyToken(); - identifyExtensions(); + if (tokenRequiringCommit != null) + identifyExtensions(); + else + log.debug("Logged in without a token, this login cannot be used to establish client connections"); + loginState = LoginState.LOGGED_IN_NOT_COMMITTED; log.info("Login succeeded; invoke commit() to commit it; current committed token count={}", committedTokenCount()); return true; @@ -292,7 +320,7 @@ public class OAuthBearerLoginModule implements LoginModule { } tokenRequiringCommit = tokenCallback.token(); - if (tokenRequiringCommit == null) { + if (tokenCallback.errorCode() != null) { log.info("Login failed: {} : {} (URI={})", tokenCallback.errorCode(), tokenCallback.errorDescription(), tokenCallback.errorUri()); throw new LoginException(tokenCallback.errorDescription()); @@ -322,64 +350,77 @@ public class OAuthBearerLoginModule implements LoginModule { @Override public boolean logout() { - if (tokenRequiringCommit != null) + if (loginState == LoginState.LOGGED_IN_NOT_COMMITTED) throw new IllegalStateException( "Cannot call logout() immediately after login(); need to first invoke commit() or abort()"); - if (myCommittedToken == null) { + if (loginState != LoginState.COMMITTED) { if (log.isDebugEnabled()) log.debug("Nothing here to log out"); return false; } - log.info("Logging out my token; current committed token count = {}", committedTokenCount()); - for (Iterator iterator = subject.getPrivateCredentials().iterator(); iterator.hasNext();) { - Object privateCredential = iterator.next(); - if (privateCredential == myCommittedToken) { - iterator.remove(); - myCommittedToken = null; - break; + if (myCommittedToken != null) { + log.info("Logging out my token; current committed token count = {}", committedTokenCount()); + for (Iterator iterator = subject.getPrivateCredentials().iterator(); iterator.hasNext(); ) { + Object privateCredential = iterator.next(); + if (privateCredential == myCommittedToken) { + iterator.remove(); + myCommittedToken = null; + break; + } } - } - log.info("Done logging out my token; committed token count is now {}", committedTokenCount()); + log.info("Done logging out my token; committed token count is now {}", committedTokenCount()); + } else + log.debug("No tokens to logout for this login"); - log.info("Logging out my extensions"); - if (subject.getPublicCredentials().removeIf(e -> myCommittedExtensions == e)) - myCommittedExtensions = null; - log.info("Done logging out my extensions"); + if (myCommittedExtensions != null) { + log.info("Logging out my extensions"); + if (subject.getPublicCredentials().removeIf(e -> myCommittedExtensions == e)) + myCommittedExtensions = null; + log.info("Done logging out my extensions"); + } else + log.debug("No extensions to logout for this login"); + loginState = LoginState.NOT_LOGGED_IN; return true; } @Override public boolean commit() { - if (tokenRequiringCommit == null) { + if (loginState != LoginState.LOGGED_IN_NOT_COMMITTED) { if (log.isDebugEnabled()) log.debug("Nothing here to commit"); return false; } - log.info("Committing my token; current committed token count = {}", committedTokenCount()); - subject.getPrivateCredentials().add(tokenRequiringCommit); - myCommittedToken = tokenRequiringCommit; - tokenRequiringCommit = null; - log.info("Done committing my token; committed token count is now {}", committedTokenCount()); + if (tokenRequiringCommit != null) { + log.info("Committing my token; current committed token count = {}", committedTokenCount()); + subject.getPrivateCredentials().add(tokenRequiringCommit); + myCommittedToken = tokenRequiringCommit; + tokenRequiringCommit = null; + log.info("Done committing my token; committed token count is now {}", committedTokenCount()); + } else + log.debug("No tokens to commit, this login cannot be used to establish client connections"); - subject.getPublicCredentials().add(extensionsRequiringCommit); - myCommittedExtensions = extensionsRequiringCommit; - extensionsRequiringCommit = null; + if (extensionsRequiringCommit != null) { + subject.getPublicCredentials().add(extensionsRequiringCommit); + myCommittedExtensions = extensionsRequiringCommit; + extensionsRequiringCommit = null; + } + loginState = LoginState.COMMITTED; return true; } @Override public boolean abort() { - if (tokenRequiringCommit != null) { + if (loginState == LoginState.LOGGED_IN_NOT_COMMITTED) { log.info("Login aborted"); tokenRequiringCommit = null; extensionsRequiringCommit = null; + loginState = LoginState.NOT_LOGGED_IN; return true; } - if (log.isDebugEnabled()) - log.debug("Nothing here to abort"); + log.debug("Nothing here to abort"); return false; } diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallback.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallback.java index 62ce492..3f4f269 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallback.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallback.java @@ -90,10 +90,10 @@ public class OAuthBearerTokenCallback implements Callback { * Set the token. All error-related values are cleared. * * @param token - * the mandatory token to set + * the optional token to set */ public void token(OAuthBearerToken token) { - this.token = Objects.requireNonNull(token); + this.token = token; this.errorCode = null; this.errorDescription = null; this.errorUri = null; diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java index 8d259e3..e7a4f2c 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java @@ -182,6 +182,14 @@ public class OAuthBearerUnsecuredLoginCallbackHandler implements AuthenticateCal private void handleTokenCallback(OAuthBearerTokenCallback callback) { if (callback.token() != null) throw new IllegalArgumentException("Callback had a token already"); + if (moduleOptions.isEmpty()) { + log.debug("Token not provided, this login cannot be used to establish client connections"); + callback.token(null); + return; + } + if (moduleOptions.keySet().stream().noneMatch(name -> !name.startsWith(EXTENSION_PREFIX))) { + throw new OAuthBearerConfigException("Extensions provided in login context without a token"); + } String principalClaimNameValue = optionValue(PRINCIPAL_CLAIM_NAME_OPTION); String principalClaimName = principalClaimNameValue != null && !principalClaimNameValue.trim().isEmpty() ? principalClaimNameValue.trim() diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index dfefabb..297cba5 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -93,6 +93,7 @@ import org.apache.kafka.common.security.authenticator.TestDigestLoginModule.Dige import org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -1208,6 +1209,40 @@ public class SaslAuthenticatorTest { } /** + * Tests OAUTHBEARER client channels without tokens for the server. + */ + @Test + public void testValidSaslOauthBearerMechanismWithoutServerTokens() throws Exception { + String node = "0"; + SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; + saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER"); + saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Arrays.asList("OAUTHBEARER")); + saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG, + TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.singletonMap("unsecuredLoginStringClaim_sub", TestJaasConfig.USERNAME))); + saslServerConfigs.put("listener.name.sasl_ssl.oauthbearer." + SaslConfigs.SASL_JAAS_CONFIG, + TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.emptyMap())); + + // Server without a token should start up successfully and authenticate clients. + server = createEchoServer(securityProtocol); + createAndCheckClientConnection(securityProtocol, node); + + // Client without a token should fail to connect + saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG, + TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.emptyMap())); + createAndCheckClientConnectionFailure(securityProtocol, node); + + // Server with extensions, but without a token should fail to start up since it could indicate a configuration error + saslServerConfigs.put("listener.name.sasl_ssl.oauthbearer." + SaslConfigs.SASL_JAAS_CONFIG, + TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.singletonMap("unsecuredLoginExtension_test", "something"))); + try { + createEchoServer(securityProtocol); + fail("Server created with invalid login config containing extensions without a token"); + } catch (Throwable e) { + assertTrue("Unexpected exception " + Utils.stackTrace(e), e.getCause() instanceof LoginException); + } + } + + /** * Tests OAUTHBEARER fails the connection when the client presents a token with * insufficient scope . */