kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Remove TLS renegotiation code
Date Fri, 27 Oct 2017 15:40:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4e8ad90b9 -> f4e9c84c5


MINOR: Remove TLS renegotiation code

This has been disabled since the start and since
it's removed in TLS 1.3, there are no plans to
ever support it.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #4034 from ijuma/remove-tls-renegotiation-support


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f4e9c84c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f4e9c84c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f4e9c84c

Branch: refs/heads/trunk
Commit: f4e9c84c52fbb7ec6a4ca50c9ac35cacbd0df082
Parents: 4e8ad90
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Fri Oct 27 16:40:39 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Oct 27 16:40:39 2017 +0100

----------------------------------------------------------------------
 .../kafka/common/network/SslTransportLayer.java | 47 +++++++-------
 .../kafka/common/network/SslSelectorTest.java   | 68 +-------------------
 .../common/network/SslTransportLayerTest.java   |  2 +-
 3 files changed, 26 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e9c84c/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index f5e1e70..51ebbc1 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -57,7 +57,6 @@ public class SslTransportLayer implements TransportLayer {
     private final SSLEngine sslEngine;
     private final SelectionKey key;
     private final SocketChannel socketChannel;
-    private final boolean enableRenegotiation;
 
     private HandshakeStatus handshakeStatus;
     private SSLEngineResult handshakeResult;
@@ -69,25 +68,23 @@ public class SslTransportLayer implements TransportLayer {
     private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
 
     public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine
sslEngine) throws IOException {
-        // Disable renegotiation by default until we have fixed the known issues with the
existing implementation
-        SslTransportLayer transportLayer = new SslTransportLayer(channelId, key, sslEngine,
false);
+        SslTransportLayer transportLayer = new SslTransportLayer(channelId, key, sslEngine);
         transportLayer.startHandshake();
         return transportLayer;
     }
 
     // Prefer `create`, only use this in tests
-    SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, boolean enableRenegotiation)
throws IOException {
+    SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException
{
         this.channelId = channelId;
         this.key = key;
         this.socketChannel = (SocketChannel) key.channel();
         this.sslEngine = sslEngine;
-        this.enableRenegotiation = enableRenegotiation;
     }
 
-    /**
-     * starts sslEngine handshake process
-     */
+    // Visible for testing
     protected void startHandshake() throws IOException {
+        if (state != null)
+            throw new IllegalStateException("startHandshake() can only be called once, state
" + state);
 
         this.netReadBuffer = ByteBuffer.allocate(netReadBufferSize());
         this.netWriteBuffer = ByteBuffer.allocate(netWriteBufferSize());
@@ -240,9 +237,10 @@ public class SslTransportLayer implements TransportLayer {
     */
     @Override
     public void handshake() throws IOException {
-        // Reset state to support renegotiation. This can be removed if renegotiation support
is removed.
         if (state == State.READY)
-            state = State.HANDSHAKE;
+            throw renegotiationException();
+        if (state == State.CLOSING)
+            throw closingException();
 
         int read = 0;
         try {
@@ -369,12 +367,13 @@ public class SslTransportLayer implements TransportLayer {
         }
     }
 
-    private void renegotiate() throws IOException {
-        if (!enableRenegotiation)
-            throw new SSLHandshakeException("Renegotiation is not supported");
-        handshake();
+    private SSLHandshakeException renegotiationException() throws IOException {
+        return new SSLHandshakeException("Renegotiation is not supported");
     }
 
+    private IllegalStateException closingException() {
+        throw new IllegalStateException("Channel is in closing state");
+    }
 
     /**
      * Executes the SSLEngine tasks needed.
@@ -513,10 +512,10 @@ public class SslTransportLayer implements TransportLayer {
                 netReadBuffer.compact();
                 // handle ssl renegotiation.
                 if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING
&& unwrapResult.getStatus() == Status.OK) {
-                    log.trace("SSLChannel Read begin renegotiation channelId {}, appReadBuffer
pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
-                              channelId, appReadBuffer.position(), netReadBuffer.position(),
netWriteBuffer.position());
-                    renegotiate();
-                    break;
+                    log.trace("Renegotiation requested, but it is not supported, channelId
{}, " +
+                        "appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
channelId,
+                        appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    throw renegotiationException();
                 }
 
                 if (unwrapResult.getStatus() == Status.OK) {
@@ -615,8 +614,10 @@ public class SslTransportLayer implements TransportLayer {
     @Override
     public int write(ByteBuffer src) throws IOException {
         int written = 0;
-        if (state == State.CLOSING) throw new IllegalStateException("Channel is in closing
state");
-        if (state != State.READY) return written;
+        if (state == State.CLOSING)
+            throw closingException();
+        if (state != State.READY)
+            return written;
 
         if (!flush(netWriteBuffer))
             return written;
@@ -626,10 +627,8 @@ public class SslTransportLayer implements TransportLayer {
         netWriteBuffer.flip();
 
         //handle ssl renegotiation
-        if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING &&
wrapResult.getStatus() == Status.OK) {
-            renegotiate();
-            return written;
-        }
+        if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING &&
wrapResult.getStatus() == Status.OK)
+            throw renegotiationException();
 
         if (wrapResult.getStatus() == Status.OK) {
             written = wrapResult.bytesConsumed();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e9c84c/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index f6af817..bf2e77c 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.memory.SimpleMemoryPool;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
-import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestSslUtils;
@@ -31,7 +30,6 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
@@ -80,72 +78,10 @@ public class SslSelectorTest extends SelectorTest {
     }
 
     /**
-     * Tests that SSL renegotiation initiated by the server are handled correctly by the
client
-     * @throws Exception
+     * Renegotiation is not supported since it is potentially unsafe and it has been removed
in TLS 1.3
      */
     @Test
-    public void testRenegotiation() throws Exception {
-        ChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT) {
-            @Override
-            protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String
id, SelectionKey key, String host) throws IOException {
-                SocketChannel socketChannel = (SocketChannel) key.channel();
-                SslTransportLayer transportLayer = new SslTransportLayer(id, key,
-                    sslFactory.createSslEngine(host, socketChannel.socket().getPort()),
-                    true);
-                transportLayer.startHandshake();
-                return transportLayer;
-            }
-        };
-        channelBuilder.configure(sslClientConfigs);
-        Selector selector = new Selector(5000, metrics, time, "MetricGroup2", channelBuilder,
new LogContext());
-        try {
-            int reqs = 500;
-            String node = "0";
-            // create connections
-            InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-            selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-            // send echo requests and receive responses
-            int requests = 0;
-            int responses = 0;
-            int renegotiates = 0;
-            while (!selector.isChannelReady(node)) {
-                selector.poll(1000L);
-            }
-            selector.send(createSend(node, node + "-" + 0));
-            requests++;
-
-            // loop until we complete all requests
-            while (responses < reqs) {
-                selector.poll(0L);
-                if (responses >= 100 && renegotiates == 0) {
-                    renegotiates++;
-                    server.renegotiate();
-                }
-                assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
-
-                // handle any responses we may have gotten
-                for (NetworkReceive receive : selector.completedReceives()) {
-                    String[] pieces = asString(receive).split("-");
-                    assertEquals("Should be in the form 'conn-counter'", 2, pieces.length);
-                    assertEquals("Check the source", receive.source(), pieces[0]);
-                    assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position());
-                    assertEquals("Check the request counter", responses, Integer.parseInt(pieces[1]));
-                    responses++;
-                }
-
-                // prepare new sends for the next round
-                for (int i = 0; i < selector.completedSends().size() && requests
< reqs && selector.isChannelReady(node); i++, requests++) {
-                    selector.send(createSend(node, node + "-" + requests));
-                }
-            }
-        } finally {
-            selector.close();
-        }
-    }
-
-    @Test
-    public void testDisabledRenegotiation() throws Exception {
+    public void testRenegotiationFails() throws Exception {
         String node = "0";
         // create connections
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e9c84c/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index a782627..7cc2808 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -817,7 +817,7 @@ public class SslTransportLayerTest {
             private final AtomicInteger numDelayedFlushesRemaining;
 
             public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine)
throws IOException {
-                super(channelId, key, sslEngine, false);
+                super(channelId, key, sslEngine);
                 this.netReadBufSize = new ResizeableBufferSize(netReadBufSizeOverride);
                 this.netWriteBufSize = new ResizeableBufferSize(netWriteBufSizeOverride);
                 this.appBufSize = new ResizeableBufferSize(appBufSizeOverride);


Mime
View raw message