kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7119: Handle transient Kerberos errors on server side (#5509)
Date Thu, 16 Aug 2018 08:02:36 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 634f9af  KAFKA-7119: Handle transient Kerberos errors on server side (#5509)
634f9af is described below

commit 634f9af8c0454f46a2510c12c3a155c6fca876e9
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Thu Aug 16 09:02:24 2018 +0100

    KAFKA-7119: Handle transient Kerberos errors on server side (#5509)
    
    Don't report retriable Kerberos errors on the server-side as authentication failures to
clients.
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 .../authenticator/SaslClientAuthenticator.java     |  74 +--------------
 .../authenticator/SaslServerAuthenticator.java     |  25 ++++--
 .../common/security/kerberos/KerberosError.java    | 100 +++++++++++++++++++++
 .../kafka/server/GssapiAuthenticationTest.scala    |   3 +-
 4 files changed, 120 insertions(+), 82 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 8d6549d..8934e8e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import org.apache.kafka.common.utils.Java;
+import org.apache.kafka.common.security.kerberos.KerberosError;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +52,6 @@ import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.security.Principal;
@@ -376,7 +375,7 @@ public class SaslClientAuthenticator implements Authenticator {
             Throwable cause = e.getCause();
             // Treat transient Kerberos errors as non-fatal SaslExceptions that are processed
as I/O exceptions
             // and all other failures as fatal SaslAuthenticationException.
-            if (kerberosError != null && kerberosError.retriable)
+            if (kerberosError != null && kerberosError.retriable())
                 throw new SaslException(error, cause);
             else
                 throw new SaslAuthenticationException(error, cause);
@@ -443,73 +442,4 @@ public class SaslClientAuthenticator implements Authenticator {
         }
     }
 
-    /**
-     * Kerberos exceptions that may require special handling. The standard Kerberos error
codes
-     * for these errors are retrieved using KrbException#errorCode() from the underlying
Kerberos
-     * exception thrown during {@link SaslClient#evaluateChallenge(byte[])}.
-     */
-    private enum KerberosError {
-        // (Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)
-        // This is retriable, but included here to add extra logging for this case.
-        SERVER_NOT_FOUND(7, false),
-        // (Mechanism level: Client not yet valid - try again later (21))
-        CLIENT_NOT_YET_VALID(21, true),
-        // (Mechanism level: Ticket not yet valid (33) - Ticket not yet valid)])
-        // This could be a small timing window.
-        TICKET_NOT_YET_VALID(33, true),
-        // (Mechanism level: Request is a replay (34) - Request is a replay)
-        // Replay detection used to prevent DoS attacks can result in false positives, so
retry on error.
-        REPLAY(34, true);
-
-
-        private static final Class<?> KRB_EXCEPTION_CLASS;
-        private static final Method KRB_EXCEPTION_RETURN_CODE_METHOD;
-
-        static {
-            try {
-                if (Java.isIbmJdk()) {
-                    KRB_EXCEPTION_CLASS = Class.forName("com.ibm.security.krb5.internal.KrbException");
-                } else {
-                    KRB_EXCEPTION_CLASS = Class.forName("sun.security.krb5.KrbException");
-                }
-                KRB_EXCEPTION_RETURN_CODE_METHOD = KRB_EXCEPTION_CLASS.getMethod("returnCode");
-            } catch (Exception e) {
-                throw new KafkaException("Kerberos exceptions could not be initialized",
e);
-            }
-        }
-
-        private final int errorCode;
-        private final boolean retriable;
-
-        KerberosError(int errorCode, boolean retriable) {
-            this.errorCode = errorCode;
-            this.retriable = retriable;
-        }
-
-        private static KerberosError fromException(Exception exception) {
-            Throwable cause = exception.getCause();
-            while (cause != null && !KRB_EXCEPTION_CLASS.isInstance(cause)) {
-                cause = cause.getCause();
-            }
-            if (cause == null)
-                return null;
-            else {
-                try {
-                    Integer errorCode = (Integer) KRB_EXCEPTION_RETURN_CODE_METHOD.invoke(cause);
-                    return fromErrorCode(errorCode);
-                } catch (Exception e) {
-                    LOG.trace("Kerberos return code could not be determined from {} due to
{}", exception, e);
-                    return null;
-                }
-            }
-        }
-
-        private static KerberosError fromErrorCode(int errorCode) {
-            for (KerberosError error : values()) {
-                if (error.errorCode == errorCode)
-                    return error;
-            }
-            return null;
-        }
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index a3f8162..e8f77a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -49,6 +49,7 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
+import org.apache.kafka.common.security.kerberos.KerberosError;
 import org.apache.kafka.common.security.kerberos.KerberosName;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.scram.ScramLoginModule;
@@ -267,11 +268,9 @@ public class SaslServerAuthenticator implements Authenticator {
                     default:
                         break;
                 }
-            } catch (SaslException | AuthenticationException e) {
+            } catch (AuthenticationException e) {
                 // Exception will be propagated after response is sent to client
-                AuthenticationException authException = (e instanceof AuthenticationException)
?
-                        (AuthenticationException) e : new AuthenticationException("SASL authentication
failed", e);
-                setSaslState(SaslState.FAILED, authException);
+                setSaslState(SaslState.FAILED, e);
             } catch (Exception e) {
                 // In the case of IOExceptions and other unexpected exceptions, fail immediately
                 saslState = SaslState.FAILED;
@@ -378,12 +377,20 @@ public class SaslServerAuthenticator implements Authenticator {
                 // For versions with SASL_AUTHENTICATE header, send a response to SASL_AUTHENTICATE
request even if token is empty.
                 ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER : ByteBuffer.wrap(responseToken);
                 sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.NONE,
null, responseBuf));
-            } catch (SaslAuthenticationException | SaslException e) {
-                String errorMessage = e instanceof SaslAuthenticationException ? e.getMessage()
:
-                    "Authentication failed due to invalid credentials with SASL mechanism
" + saslMechanism;
-                sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED,
-                        errorMessage));
+            } catch (SaslAuthenticationException e) {
+                sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED,
e.getMessage()));
                 throw e;
+            } catch (SaslException e) {
+                KerberosError kerberosError = KerberosError.fromException(e);
+                if (kerberosError != null && kerberosError.retriable()) {
+                    // Handle retriable Kerberos exceptions as I/O exceptions rather than
authentication exceptions
+                    throw e;
+                } else {
+                    String errorMessage = "Authentication failed due to invalid credentials
with SASL mechanism " + saslMechanism;
+                    sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED,
+                            errorMessage));
+                    throw new SaslAuthenticationException(errorMessage, e);
+                }
             }
         }
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
new file mode 100644
index 0000000..c6be441
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.kerberos;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
+import org.apache.kafka.common.utils.Java;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.sasl.SaslClient;
+import java.lang.reflect.Method;
+
+/**
+ * Kerberos exceptions that may require special handling. The standard Kerberos error codes
+ * for these errors are retrieved using KrbException#errorCode() from the underlying Kerberos
+ * exception thrown during {@link SaslClient#evaluateChallenge(byte[])}.
+ */
+public enum KerberosError {
+    // (Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)
+    // This is retriable, but included here to add extra logging for this case.
+    SERVER_NOT_FOUND(7, false),
+    // (Mechanism level: Client not yet valid - try again later (21))
+    CLIENT_NOT_YET_VALID(21, true),
+    // (Mechanism level: Ticket not yet valid (33) - Ticket not yet valid)])
+    // This could be a small timing window.
+    TICKET_NOT_YET_VALID(33, true),
+    // (Mechanism level: Request is a replay (34) - Request is a replay)
+    // Replay detection used to prevent DoS attacks can result in false positives, so retry
on error.
+    REPLAY(34, true);
+
+    private static final Logger log = LoggerFactory.getLogger(SaslClientAuthenticator.class);
+    private static final Class<?> KRB_EXCEPTION_CLASS;
+    private static final Method KRB_EXCEPTION_RETURN_CODE_METHOD;
+
+    static {
+        try {
+            if (Java.isIbmJdk()) {
+                KRB_EXCEPTION_CLASS = Class.forName("com.ibm.security.krb5.internal.KrbException");
+            } else {
+                KRB_EXCEPTION_CLASS = Class.forName("sun.security.krb5.KrbException");
+            }
+            KRB_EXCEPTION_RETURN_CODE_METHOD = KRB_EXCEPTION_CLASS.getMethod("returnCode");
+        } catch (Exception e) {
+            throw new KafkaException("Kerberos exceptions could not be initialized", e);
+        }
+    }
+
+    private final int errorCode;
+    private final boolean retriable;
+
+    KerberosError(int errorCode, boolean retriable) {
+        this.errorCode = errorCode;
+        this.retriable = retriable;
+    }
+
+    public boolean retriable() {
+        return retriable;
+    }
+
+    public static KerberosError fromException(Exception exception) {
+        Throwable cause = exception.getCause();
+        while (cause != null && !KRB_EXCEPTION_CLASS.isInstance(cause)) {
+            cause = cause.getCause();
+        }
+        if (cause == null)
+            return null;
+        else {
+            try {
+                Integer errorCode = (Integer) KRB_EXCEPTION_RETURN_CODE_METHOD.invoke(cause);
+                return fromErrorCode(errorCode);
+            } catch (Exception e) {
+                log.trace("Kerberos return code could not be determined from {} due to {}",
exception, e);
+                return null;
+            }
+        }
+    }
+
+    private static KerberosError fromErrorCode(int errorCode) {
+        for (KerberosError error : values()) {
+            if (error.errorCode == errorCode)
+                return error;
+        }
+        return null;
+    }
+}
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 04166c6..74b2a15 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -113,7 +113,8 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup
{
           val disconnectState = selector.disconnected().get(nodeId)
           // Verify that disconnect state is not AUTHENTICATION_FAILED
           if (disconnectState != null)
-            assertEquals(ChannelState.State.AUTHENTICATE, disconnectState.state())
+            assertEquals(s"Authentication failed with exception ${disconnectState.exception()}",
+              ChannelState.State.AUTHENTICATE, disconnectState.state())
           selector.isChannelReady(nodeId) || disconnectState != null
         }, "Client not ready or disconnected within timeout")
         if (selector.isChannelReady(nodeId))


Mime
View raw message