kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-6258; SSLTransportLayer should keep reading from socket until either the buffer is full or the socket has no more data
Date Mon, 18 Dec 2017 19:16:30 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e86f70ed2 -> 066bfc314


KAFKA-6258; SSLTransportLayer should keep reading from socket until either the buffer is full
or the socket has no more data

When consumer uses plaintext and there is remaining data in consumer's buffer, consumer.poll()
will read all data available from the socket buffer to consumer buffer. However, if consumer
uses ssl and there is remaining data, consumer.poll() may only read 16 KB (the size of SslTransportLayer.appReadBuffer)
from socket buffer. This will reduce efficient of consumer.poll() by asking user to call more
poll() to get the same amount of data.

Furthermore, we observe that for users who naively sleep a constant time after each consumer.poll(),
some partition will lag behind after they switch from plaintext to ssl. Here is the explanation
why this can happen.

Say there are 1 partition of 1MB/sec and 9 partition of 32KB/sec. Leaders of these partitions
are all different and consumer is consuming these 10 partitions. Let's also assume that socket
read buffer size is large enough and consume sleeps 1 sec between consumer.poll(). 1 sec is
long enough for consumer to receive the FetchResponse back from broker.

When consumer uses plaintext, each consumer.poll() will read all data from the socket buffer
and it means 1 MB data is read from each partition.

When consumer uses ssl, each consumer.poll() is likely to find that there is some data available
in the memory. In this case consumer only reads 16 KB data from other sockets, particularly
the socket for the broker with the large partition. Then the throughput of the large partition
will be limited to 16KB/sec.

Arguably user should not sleep 1 sec if its consumer is lagging behind. But on Kafka dev side
it is nice to keep the previous behavior and optimize consumer.poll() to read as much data
from socket as possible.

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Jason Gustafson <jason@confluent.io>

Closes #4248 from lindong28/KAFKA-6258


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/066bfc31
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/066bfc31
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/066bfc31

Branch: refs/heads/trunk
Commit: 066bfc314c912aa90283a1c1c53a958237d1adff
Parents: e86f70e
Author: Dong Lin <lindong28@gmail.com>
Authored: Mon Dec 18 11:15:09 2017 -0800
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon Dec 18 11:15:09 2017 -0800

----------------------------------------------------------------------
 .../kafka/common/network/SslTransportLayer.java | 25 ++++++----
 .../kafka/common/network/NioEchoServer.java     |  9 +++-
 .../kafka/common/network/SslSelectorTest.java   | 51 +++++++++++++++++++-
 .../common/network/SslTransportLayerTest.java   | 48 ++++++++++++++++++
 4 files changed, 121 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/066bfc31/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 69ca037..49f1d66 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -89,7 +89,7 @@ public class SslTransportLayer implements TransportLayer {
         this.netReadBuffer = ByteBuffer.allocate(netReadBufferSize());
         this.netWriteBuffer = ByteBuffer.allocate(netWriteBufferSize());
         this.appReadBuffer = ByteBuffer.allocate(applicationBufferSize());
-        
+
         //clear & set netRead & netWrite buffers
         netWriteBuffer.position(0);
         netWriteBuffer.limit(0);
@@ -482,7 +482,8 @@ public class SslTransportLayer implements TransportLayer {
 
 
     /**
-    * Reads a sequence of bytes from this channel into the given buffer.
+    * Reads a sequence of bytes from this channel into the given buffer. Reads as much as
possible
+    * until either the dst buffer is full or there is no more data in the socket.
     *
     * @param dst The buffer into which bytes are to be transferred
     * @return The number of bytes read, possible zero or -1 if the channel has reached end-of-stream
@@ -500,8 +501,10 @@ public class SslTransportLayer implements TransportLayer {
             read = readFromAppBuffer(dst);
         }
 
-        int netread = 0;
-        if (dst.remaining() > 0) {
+        boolean isClosed = false;
+        // Each loop reads at most once from the socket.
+        while (dst.remaining() > 0) {
+            int netread = 0;
             netReadBuffer = Utils.ensureCapacity(netReadBuffer, netReadBufferSize());
             if (netReadBuffer.remaining() > 0)
                 netread = readFromSocketChannel();
@@ -547,15 +550,19 @@ public class SslTransportLayer implements TransportLayer {
                     // If data has been read and unwrapped, return the data. Close will be
handled on the next poll.
                     if (appReadBuffer.position() == 0 && read == 0)
                         throw new EOFException();
-                    else
+                    else {
+                        isClosed = true;
                         break;
+                    }
                 }
             }
+            if (read == 0 && netread < 0)
+                throw new EOFException("EOF during read");
+            if (netread <= 0 || isClosed)
+                break;
         }
         // If data has been read and unwrapped, return the data even if end-of-stream, channel
will be closed
         // on a subsequent poll.
-        if (read == 0 && netread < 0)
-            throw new EOFException("EOF during read");
         return read;
     }
 
@@ -771,7 +778,7 @@ public class SslTransportLayer implements TransportLayer {
     protected int netReadBufferSize() {
         return sslEngine.getSession().getPacketBufferSize();
     }
-    
+
     protected int netWriteBufferSize() {
         return sslEngine.getSession().getPacketBufferSize();
     }
@@ -779,7 +786,7 @@ public class SslTransportLayer implements TransportLayer {
     protected int applicationBufferSize() {
         return sslEngine.getSession().getApplicationBufferSize();
     }
-    
+
     protected ByteBuffer netReadBuffer() {
         return netReadBuffer;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/066bfc31/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index ad587b9..e27a31d 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -62,6 +62,7 @@ public class NioEchoServer extends Thread {
     private volatile WritableByteChannel outputChannel;
     private final CredentialCache credentialCache;
     private final Metrics metrics;
+    private int numSent = 0;
 
     public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig
config,
             String serverHost, ChannelBuilder channelBuilder, CredentialCache credentialCache)
throws Exception {
@@ -157,8 +158,10 @@ public class NioEchoServer extends Thread {
                         selector.unmute(channelId);
                     }
                 }
-                for (Send send : selector.completedSends())
+                for (Send send : selector.completedSends()) {
                     selector.unmute(send.destination());
+                    numSent += 1;
+                }
 
             }
         } catch (IOException e) {
@@ -166,6 +169,10 @@ public class NioEchoServer extends Thread {
         }
     }
 
+    public int numSent() {
+        return numSent;
+    }
+
     private String id(SocketChannel channel) {
         return channel.socket().getLocalAddress().getHostAddress() + ":" + channel.socket().getLocalPort()
+ "-" +
                 channel.socket().getInetAddress().getHostAddress() + ":" + channel.socket().getPort();

http://git-wip-us.apache.org/repos/asf/kafka/blob/066bfc31/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index dc062ea..96c7bc2 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.kafka.common.network;
 
+import java.nio.channels.SelectionKey;
+import javax.net.ssl.SSLEngine;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.memory.SimpleMemoryPool;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestCondition;
@@ -83,8 +86,14 @@ public class SslSelectorTest extends SelectorTest {
     public void testDisconnectWithIntermediateBufferedBytes() throws Exception {
         int requestSize = 100 * 1024;
         final String node = "0";
-        connect(node, new InetSocketAddress("localhost", server.port));
         String request = TestUtils.randomString(requestSize);
+
+        this.selector.close();
+
+        this.channelBuilder = new TestSslChannelBuilder(Mode.CLIENT);
+        this.channelBuilder.configure(sslClientConfigs);
+        this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder,
new LogContext());
+        connect(node, new InetSocketAddress("localhost", server.port));
         selector.send(createSend(node, request));
 
         TestUtils.waitForCondition(new TestCondition() {
@@ -142,7 +151,7 @@ public class SslSelectorTest extends SelectorTest {
         Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false,
true, Mode.SERVER, trustStoreFile, "server");
         channelBuilder = new SslChannelBuilder(Mode.SERVER);
         channelBuilder.configure(sslServerConfigs);
-        selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",

+        selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",
                 new HashMap<String, String>(), true, false, channelBuilder, pool, new
LogContext());
 
         try (ServerSocketChannel ss = ServerSocketChannel.open()) {
@@ -224,4 +233,42 @@ public class SslSelectorTest extends SelectorTest {
         return new SslSender(serverAddress, payload);
     }
 
+    private static class TestSslChannelBuilder extends SslChannelBuilder {
+
+        public TestSslChannelBuilder(Mode mode) {
+            super(mode);
+        }
+
+        @Override
+        protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id,
SelectionKey key, String host) throws IOException {
+            SocketChannel socketChannel = (SocketChannel) key.channel();
+            SSLEngine sslEngine = sslFactory.createSslEngine(host, socketChannel.socket().getPort());
+            TestSslTransportLayer transportLayer = new TestSslTransportLayer(id, key, sslEngine);
+            transportLayer.startHandshake();
+            return transportLayer;
+        }
+
+        /*
+         * TestSslTransportLayer will read from socket once every two tries. This increases
+         * the chance that there will be bytes buffered in the transport layer after read().
+         */
+        class TestSslTransportLayer extends SslTransportLayer {
+            boolean muteSocket = false;
+
+            public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine)
throws IOException {
+                super(channelId, key, sslEngine);
+            }
+
+            @Override
+            protected int readFromSocketChannel() throws IOException {
+                if (muteSocket) {
+                    muteSocket = false;
+                    return 0;
+                }
+                muteSocket = true;
+                return super.readFromSocketChannel();
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/066bfc31/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 7cc2808..1f55246 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.network;
 
+import java.util.List;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
@@ -28,6 +29,7 @@ import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -540,6 +542,52 @@ public class SslTransportLayerTest {
     }
 
     /**
+     * selector.poll() should be able to fetch more data than netReadBuffer from the socket.
+     */
+    @Test
+    public void testSelectorPollReadSize() throws Exception {
+        String node = "0";
+        server = createEchoServer(SecurityProtocol.SSL);
+        createSelector(sslClientConfigs, 16384, 16384, 16384);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
+        selector.connect(node, addr, 102400, 102400);
+        NetworkTestUtils.checkClientConnection(selector, node, 81920, 1);
+
+        // Send a message of 80K. This is 5X as large as the socket buffer. It should take
at least three selector.poll()
+        // to read this message from socket if the SslTransportLayer.read() does not read
all data from socket buffer.
+        String message = TestUtils.randomString(81920);
+        selector.send(new NetworkSend(node, ByteBuffer.wrap(message.getBytes())));
+
+        // Send the message to echo server
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                try {
+                    selector.poll(100L);
+                } catch (IOException e) {
+                    return false;
+                }
+                return selector.completedSends().size() > 0;
+            }
+        }, "Timed out waiting for message to be sent");
+
+        // Wait for echo server to send the message back
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return server.numSent() >= 2;
+            }
+        }, "Timed out waiting for echo server to send message");
+
+        // Read the message from socket with only one poll()
+        selector.poll(1000L);
+
+        List<NetworkReceive> receiveList = selector.completedReceives();
+        assertEquals(1, receiveList.size());
+        assertEquals(message, new String(Utils.toArray(receiveList.get(0).payload())));
+    }
+
+    /**
      * Tests handling of BUFFER_UNDERFLOW during unwrap when network read buffer is smaller
than SSL session packet buffer size.
      */
     @Test


Mime
View raw message