kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3378; Client blocks forever if SocketChannel connects instantly
Date Fri, 18 Mar 2016 23:13:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4332175c1 -> b6c6291e1


KAFKA-3378; Client blocks forever if SocketChannel connects instantly

This is a different implementation to the one in #1085 by Larkin Lowrey (llowrey). The hard
part here was actually finding the problem and all credit goes to llowrey.

This PR also fixes our handling of `finishConnect` (we now check the return value).

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao

Closes #1094 from ijuma/KAFKA-3378-instantly-connecting-socket-channels


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b6c6291e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b6c6291e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b6c6291e

Branch: refs/heads/trunk
Commit: b6c6291e1f42b2ca1fbcdaf662a2a868c8881921
Parents: 4332175
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Fri Mar 18 16:13:33 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Fri Mar 18 16:13:33 2016 -0700

----------------------------------------------------------------------
 .../kafka/common/network/KafkaChannel.java      |   4 +-
 .../common/network/PlaintextTransportLayer.java |   8 +-
 .../apache/kafka/common/network/Selector.java   | 161 ++++++++++---------
 .../kafka/common/network/SslTransportLayer.java |   8 +-
 .../kafka/common/network/TransportLayer.java    |   2 +-
 5 files changed, 101 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b6c6291e/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index defcc24..f72f91b 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -68,8 +68,8 @@ public class KafkaChannel {
     }
 
 
-    public void finishConnect() throws IOException {
-        transportLayer.finishConnect();
+    public boolean finishConnect() throws IOException {
+        return transportLayer.finishConnect();
     }
 
     public boolean isConnected() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6c6291e/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
index 8949e5e..3db4345 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
@@ -50,9 +50,11 @@ public class PlaintextTransportLayer implements TransportLayer {
     }
 
     @Override
-    public void finishConnect() throws IOException {
-        socketChannel.finishConnect();
-        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
+    public boolean finishConnect() throws IOException {
+        boolean connected = socketChannel.finishConnect();
+        if (connected)
+            key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
+        return connected;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6c6291e/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 8bb3348..f9e232d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -24,6 +24,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -41,7 +42,6 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,6 +84,7 @@ public class Selector implements Selectable {
     private final List<Send> completedSends;
     private final List<NetworkReceive> completedReceives;
     private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
+    private final Set<SelectionKey> immediatelyConnectedKeys;
     private final List<String> disconnected;
     private final List<String> connected;
     private final List<String> failedSends;
@@ -114,18 +115,19 @@ public class Selector implements Selectable {
         this.time = time;
         this.metricGrpPrefix = metricGrpPrefix;
         this.metricTags = metricTags;
-        this.channels = new HashMap<String, KafkaChannel>();
-        this.completedSends = new ArrayList<Send>();
-        this.completedReceives = new ArrayList<NetworkReceive>();
-        this.stagedReceives = new HashMap<KafkaChannel, Deque<NetworkReceive>>();
-        this.connected = new ArrayList<String>();
-        this.disconnected = new ArrayList<String>();
-        this.failedSends = new ArrayList<String>();
+        this.channels = new HashMap<>();
+        this.completedSends = new ArrayList<>();
+        this.completedReceives = new ArrayList<>();
+        this.stagedReceives = new HashMap<>();
+        this.immediatelyConnectedKeys = new HashSet<>();
+        this.connected = new ArrayList<>();
+        this.disconnected = new ArrayList<>();
+        this.failedSends = new ArrayList<>();
         this.sensors = new SelectorMetrics(metrics);
         this.channelBuilder = channelBuilder;
         // initial capacity and load factor are default, we set them explicitly because we
want to set accessOrder = true
-        this.lruConnections = new LinkedHashMap<String, Long>(16, .75F, true);
-        currentTimeNanos = new SystemTime().nanoseconds();
+        this.lruConnections = new LinkedHashMap<>(16, .75F, true);
+        currentTimeNanos = time.nanoseconds();
         nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
         this.metricsPerConnection = metricsPerConnection;
     }
@@ -161,8 +163,9 @@ public class Selector implements Selectable {
         if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
             socket.setReceiveBufferSize(receiveBufferSize);
         socket.setTcpNoDelay(true);
+        boolean connected;
         try {
-            socketChannel.connect(address);
+            connected = socketChannel.connect(address);
         } catch (UnresolvedAddressException e) {
             socketChannel.close();
             throw new IOException("Can't resolve address: " + address, e);
@@ -174,6 +177,13 @@ public class Selector implements Selectable {
         KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
         key.attach(channel);
         this.channels.put(id, channel);
+
+        if (connected) {
+            // OP_CONNECT won't trigger for immediately connected channels
+            log.debug("Immediately connected to node {}", channel.id());
+            immediatelyConnectedKeys.add(key);
+            key.interestOps(0);
+        }
     }
 
     /**
@@ -206,17 +216,15 @@ public class Selector implements Selectable {
             close(id);
         try {
             this.nioSelector.close();
-        } catch (IOException e) {
+        } catch (IOException | SecurityException e) {
             log.error("Exception closing nioSelector:", e);
-        } catch (SecurityException se) {
-            log.error("Exception closing nioSelector:", se);
         }
         sensors.close();
         channelBuilder.close();
     }
 
     /**
-     * Queue the given request for sending in the subsequent {@poll(long)} calls
+     * Queue the given request for sending in the subsequent {@link #poll(long)} calls
      * @param send The request to send
      */
     public void send(Send send) {
@@ -235,7 +243,7 @@ public class Selector implements Selectable {
      *
      * When this call is completed the user can check for completed sends, receives, connections
or disconnects using
      * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link
#disconnected()}. These
-     * lists will be cleared at the beginning of each {@link #poll(long)} call and repopulated
by the call if there is
+     * lists will be cleared at the beginning of each `poll` call and repopulated by the
call if there is
      * any completed I/O.
      *
      * In the "Plaintext" setting, we are using socketChannel to read & write to the
network. But for the "SSL" setting,
@@ -258,9 +266,12 @@ public class Selector implements Selectable {
     public void poll(long timeout) throws IOException {
         if (timeout < 0)
             throw new IllegalArgumentException("timeout should be >= 0");
+
         clear();
-        if (hasStagedReceives())
+
+        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
             timeout = 0;
+
         /* check ready keys */
         long startSelect = time.nanoseconds();
         int readyKeys = select(timeout);
@@ -268,72 +279,78 @@ public class Selector implements Selectable {
         currentTimeNanos = endSelect;
         this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
 
-        if (readyKeys > 0) {
-            Set<SelectionKey> keys = this.nioSelector.selectedKeys();
-            Iterator<SelectionKey> iter = keys.iterator();
-            while (iter.hasNext()) {
-                SelectionKey key = iter.next();
-                iter.remove();
-                KafkaChannel channel = channel(key);
-
-                // register all per-connection metrics at once
-                sensors.maybeRegisterConnectionMetrics(channel.id());
-                lruConnections.put(channel.id(), currentTimeNanos);
-
-                try {
-                    /* complete any connections that have finished their handshake */
-                    if (key.isConnectable()) {
-                        channel.finishConnect();
+        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
+            pollSelectionKeys(this.nioSelector.selectedKeys());
+            pollSelectionKeys(immediatelyConnectedKeys);
+        }
+
+        addToCompletedReceives();
+
+        long endIo = time.nanoseconds();
+        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
+        maybeCloseOldestConnection();
+    }
+
+    private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys) {
+        Iterator<SelectionKey> iterator = selectionKeys.iterator();
+        while (iterator.hasNext()) {
+            SelectionKey key = iterator.next();
+            iterator.remove();
+            KafkaChannel channel = channel(key);
+
+            // register all per-connection metrics at once
+            sensors.maybeRegisterConnectionMetrics(channel.id());
+            lruConnections.put(channel.id(), currentTimeNanos);
+
+            try {
+
+                /* complete any connections that have finished their handshake */
+                if (key.isConnectable()) {
+                    if (channel.finishConnect()) {
                         this.connected.add(channel.id());
                         this.sensors.connectionCreated.record();
-                    }
+                    } else
+                        continue;
+                }
 
-                    /* if channel is not ready finish prepare */
-                    if (channel.isConnected() && !channel.ready())
-                        channel.prepare();
+                /* if channel is not ready finish prepare */
+                if (channel.isConnected() && !channel.ready())
+                    channel.prepare();
 
-                    /* if channel is ready read from any connections that have readable data
*/
-                    if (channel.ready() && key.isReadable() && !hasStagedReceive(channel))
{
-                        NetworkReceive networkReceive;
-                        while ((networkReceive = channel.read()) != null)
-                            addToStagedReceives(channel, networkReceive);
-                    }
+                /* if channel is ready read from any connections that have readable data
*/
+                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel))
{
+                    NetworkReceive networkReceive;
+                    while ((networkReceive = channel.read()) != null)
+                        addToStagedReceives(channel, networkReceive);
+                }
 
-                    /* if channel is ready write to any sockets that have space in their
buffer and for which we have data */
-                    if (channel.ready() && key.isWritable()) {
-                        Send send = channel.write();
-                        if (send != null) {
-                            this.completedSends.add(send);
-                            this.sensors.recordBytesSent(channel.id(), send.size());
-                        }
+                /* if channel is ready write to any sockets that have space in their buffer
and for which we have data */
+                if (channel.ready() && key.isWritable()) {
+                    Send send = channel.write();
+                    if (send != null) {
+                        this.completedSends.add(send);
+                        this.sensors.recordBytesSent(channel.id(), send.size());
                     }
+                }
 
-                    /* cancel any defunct sockets */
-                    if (!key.isValid()) {
-                        close(channel);
-                        this.disconnected.add(channel.id());
-                    }
-                } catch (Exception e) {
-                    String desc = channel.socketDescription();
-                    if (e instanceof IOException)
-                        log.debug("Connection with {} disconnected", desc, e);
-                    else
-                        log.warn("Unexpected error from {}; closing connection", desc, e);
+                /* cancel any defunct sockets */
+                if (!key.isValid()) {
                     close(channel);
                     this.disconnected.add(channel.id());
                 }
+
+            } catch (Exception e) {
+                String desc = channel.socketDescription();
+                if (e instanceof IOException)
+                    log.debug("Connection with {} disconnected", desc, e);
+                else
+                    log.warn("Unexpected error from {}; closing connection", desc, e);
+                close(channel);
+                this.disconnected.add(channel.id());
             }
         }
-
-        addToCompletedReceives();
-
-        long endIo = time.nanoseconds();
-        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
-        maybeCloseOldestConnection();
     }
 
-
-
     @Override
     public List<Send> completedSends() {
         return this.completedSends;
@@ -468,9 +485,7 @@ public class Selector implements Selectable {
     @Override
     public boolean isChannelReady(String id) {
         KafkaChannel channel = this.channels.get(id);
-        if (channel == null)
-            return false;
-        return channel.ready();
+        return channel != null && channel.ready();
     }
 
     private KafkaChannel channelOrFail(String id) {
@@ -645,7 +660,7 @@ public class Selector implements Selectable {
                 if (nodeRequest == null) {
                     String metricGrpName = metricGrpPrefix + "-node-metrics";
 
-                    Map<String, String> tags = new LinkedHashMap<String, String>(metricTags);
+                    Map<String, String> tags = new LinkedHashMap<>(metricTags);
                     tags.put("node-id", "node-" + connectionId);
 
                     nodeRequest = sensor(nodeRequestName);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6c6291e/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 27e2ea9..d18d6b7 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -106,9 +106,11 @@ public class SslTransportLayer implements TransportLayer {
      * does socketChannel.finishConnect()
      */
     @Override
-    public void finishConnect() throws IOException {
-        socketChannel.finishConnect();
-        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
+    public boolean finishConnect() throws IOException {
+        boolean connected = socketChannel.finishConnect();
+        if (connected)
+            key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
+        return connected;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6c6291e/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
index 258d89d..092df4d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
@@ -43,7 +43,7 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
     /**
      * Finishes the process of connecting a socket channel.
      */
-    void finishConnect() throws IOException;
+    boolean finishConnect() throws IOException;
 
     /**
      * disconnect socketChannel


Mime
View raw message