kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7505: Process incoming bytes on write error to report SSL failures (#5800)
Date Thu, 18 Oct 2018 14:08:53 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 53d0674  KAFKA-7505: Process incoming bytes on write error to report SSL failures
(#5800)
53d0674 is described below

commit 53d0674662f965f12bdbf2720a076de687f5be70
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Thu Oct 18 15:08:42 2018 +0100

    KAFKA-7505: Process incoming bytes on write error to report SSL failures (#5800)
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../kafka/common/network/SslTransportLayer.java    | 52 ++++++++++++++--------
 1 file changed, 33 insertions(+), 19 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index a5ff06d..a6696f7 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -254,11 +254,12 @@ public class SslTransportLayer implements TransportLayer {
             throw closingException();
 
         int read = 0;
+        boolean readable = key.isReadable();
         try {
             // Read any available bytes before attempting any writes to ensure that handshake
failures
             // reported by the peer are processed even if writes fail (since peer closes
connection
             // if handshake fails)
-            if (key.isReadable())
+            if (readable)
                 read = readFromSocketChannel();
 
             doHandshake();
@@ -267,15 +268,16 @@ public class SslTransportLayer implements TransportLayer {
         } catch (IOException e) {
             maybeThrowSslAuthenticationException();
 
-            // this exception could be due to a write. If there is data available to unwrap,
-            // process the data so that any SSL handshake exceptions are reported
-            if (handshakeStatus == HandshakeStatus.NEED_UNWRAP && netReadBuffer.position()
> 0) {
-                try {
-                    handshakeUnwrap(false);
-                } catch (SSLException e1) {
-                    maybeProcessHandshakeFailure(e1, false, e);
-                }
+            // This exception could be due to a write. If there is data available to unwrap
in the buffer, or data available
+            // in the socket channel to read and unwrap, process the data so that any SSL
handshake exceptions are reported.
+            try {
+                do {
+                    handshakeUnwrap(false, true);
+                } while (readable && readFromSocketChannel() > 0);
+            } catch (SSLException e1) {
+                maybeProcessHandshakeFailure(e1, false, e);
             }
+
             // If we get here, this is not a handshake failure, throw the original IOException
             throw e;
         }
@@ -334,7 +336,7 @@ public class SslTransportLayer implements TransportLayer {
                 log.trace("SSLHandshake NEED_UNWRAP channelId {}, appReadBuffer pos {}, netReadBuffer
pos {}, netWriteBuffer pos {}",
                           channelId, appReadBuffer.position(), netReadBuffer.position(),
netWriteBuffer.position());
                 do {
-                    handshakeResult = handshakeUnwrap(read);
+                    handshakeResult = handshakeUnwrap(read, false);
                     if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
                         int currentAppBufferSize = applicationBufferSize();
                         appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentAppBufferSize);
@@ -456,12 +458,13 @@ public class SslTransportLayer implements TransportLayer {
     }
 
     /**
-    * Perform handshake unwrap
-    * @param doRead boolean
-    * @return SSLEngineResult
-    * @throws IOException
-    */
-    private SSLEngineResult handshakeUnwrap(boolean doRead) throws IOException {
+     * Perform handshake unwrap
+     * @param doRead boolean If true, read more from the socket channel
+     * @param ignoreHandshakeStatus If true, continue to unwrap if data available regardless
of handshake status
+     * @return SSLEngineResult
+     * @throws IOException
+     */
+    private SSLEngineResult handshakeUnwrap(boolean doRead, boolean ignoreHandshakeStatus)
throws IOException {
         log.trace("SSLHandshake handshakeUnwrap {}", channelId);
         SSLEngineResult result;
         int read = 0;
@@ -470,6 +473,7 @@ public class SslTransportLayer implements TransportLayer {
         boolean cont;
         do {
             //prepare the buffer with the incoming data
+            int position = netReadBuffer.position();
             netReadBuffer.flip();
             result = sslEngine.unwrap(netReadBuffer, appReadBuffer);
             netReadBuffer.compact();
@@ -478,8 +482,9 @@ public class SslTransportLayer implements TransportLayer {
                 result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
                 handshakeStatus = runDelegatedTasks();
             }
-            cont = result.getStatus() == SSLEngineResult.Status.OK &&
-                handshakeStatus == HandshakeStatus.NEED_UNWRAP;
+            cont = (result.getStatus() == SSLEngineResult.Status.OK &&
+                    handshakeStatus == HandshakeStatus.NEED_UNWRAP) ||
+                    (ignoreHandshakeStatus && netReadBuffer.position() != position);
             log.trace("SSLHandshake handshakeUnwrap: handshakeStatus {} status {}", handshakeStatus,
result.getStatus());
         } while (netReadBuffer.position() != 0 && cont);
 
@@ -828,8 +833,17 @@ public class SslTransportLayer implements TransportLayer {
 
         state = State.HANDSHAKE_FAILED;
         handshakeException = new SslAuthenticationException("SSL handshake failed", sslException);
-        if (!flush || flush(netWriteBuffer))
+
+        // Attempt to flush any outgoing bytes. If flush doesn't complete, delay exception
handling until outgoing bytes
+        // are flushed. If write fails because remote end has closed the channel, log the
I/O exception and  continue to
+        // handle the handshake failure as an authentication exception.
+        try {
+            if (!flush || flush(netWriteBuffer))
+                throw handshakeException;
+        } catch (IOException e) {
+            log.debug("Failed to flush all bytes before closing channel", e);
             throw handshakeException;
+        }
     }
 
     // SSL handshake failures are typically thrown as SSLHandshakeException, SSLProtocolException,


Mime
View raw message