kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 1.1 updated: MINOR: Use enum for close mode in Selector instead of two booleans (#4559)
Date Tue, 13 Feb 2018 18:23:16 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 3bd07ec  MINOR: Use enum for close mode in Selector instead of two booleans (#4559)
3bd07ec is described below

commit 3bd07eca98ccb992b2033112fceba9ab62ef917c
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Tue Feb 13 10:19:33 2018 -0800

    MINOR: Use enum for close mode in Selector instead of two booleans (#4559)
---
 .../org/apache/kafka/common/network/Selector.java  | 45 +++++++++++++---------
 1 file changed, 26 insertions(+), 19 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index ed037b3..1b2d1a2 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -87,6 +87,18 @@ public class Selector implements Selectable, AutoCloseable {
 
     public static final long NO_IDLE_TIMEOUT_MS = -1;
 
+    private enum CloseMode {
+        GRACEFUL(true),            // process outstanding staged receives, notify disconnect
+        NOTIFY_ONLY(true),         // discard any outstanding receives, notify disconnect
+        DISCARD_NO_NOTIFY(false);  // discard any outstanding receives, no disconnect notification
+
+        boolean notifyDisconnect;
+
+        CloseMode(boolean notifyDisconnect) {
+            this.notifyDisconnect = notifyDisconnect;
+        }
+    }
+
     private final Logger log;
     private final java.nio.channels.Selector nioSelector;
     private final Map<String, KafkaChannel> channels;
@@ -327,7 +339,7 @@ public class Selector implements Selectable, AutoCloseable {
                 channel.state(ChannelState.FAILED_SEND);
                 // ensure notification via `disconnected` when `failedSends` are processed
in the next poll
                 this.failedSends.add(connectionId);
-                close(channel, false, false);
+                close(channel, CloseMode.DISCARD_NO_NOTIFY);
                 if (!(e instanceof CancelledKeyException)) {
                     log.error("Unexpected exception during send, closing connection {} and
rethrowing exception {}",
                             connectionId, e);
@@ -507,7 +519,7 @@ public class Selector implements Selectable, AutoCloseable {
 
                 /* cancel any defunct sockets */
                 if (!key.isValid())
-                    close(channel, true, true);
+                    close(channel, CloseMode.GRACEFUL);
 
             } catch (Exception e) {
                 String desc = channel.socketDescription();
@@ -517,7 +529,7 @@ public class Selector implements Selectable, AutoCloseable {
                     log.debug("Connection with {} disconnected due to authentication exception",
desc, e);
                 else
                     log.warn("Unexpected error from {}; closing connection", desc, e);
-                close(channel, !sendFailed, true);
+                close(channel, sendFailed ? CloseMode.NOTIFY_ONLY : CloseMode.GRACEFUL);
             } finally {
                 maybeRecordTimePerConnection(channel, channelStartTimeNanos);
             }
@@ -627,7 +639,7 @@ public class Selector implements Selectable, AutoCloseable {
                     log.trace("About to close the idle connection from {} due to being idle
for {} millis",
                             connectionId, (currentTimeNanos - expiredConnection.getValue())
/ 1000 / 1000);
                 channel.state(ChannelState.EXPIRED);
-                close(channel, true, true);
+                close(channel, CloseMode.GRACEFUL);
             }
         }
     }
@@ -681,7 +693,7 @@ public class Selector implements Selectable, AutoCloseable {
             // There is no disconnect notification for local close, but updating
             // channel state here anyway to avoid confusion.
             channel.state(ChannelState.LOCAL_CLOSE);
-            close(channel, false, false);
+            close(channel, CloseMode.DISCARD_NO_NOTIFY);
         } else {
             KafkaChannel closingChannel = this.closingChannels.remove(id);
             // Close any closing channel, leave the channel in the state in which closing
was triggered
@@ -692,20 +704,15 @@ public class Selector implements Selectable, AutoCloseable {
 
     /**
      * Begin closing this connection.
+     * If 'closeMode' is `CloseMode.GRACEFUL`, the channel is disconnected here, but staged
receives
+     * are processed. The channel is closed when there are no outstanding receives or if
a send is
+     * requested. For other values of `closeMode`, outstanding receives are discarded and
the channel
+     * is closed immediately.
      *
-     * If 'processOutstanding' is true, the channel is disconnected here, but staged receives
are
-     * processed. The channel is closed when there are no outstanding receives or if a send
-     * is requested. The channel will be added to disconnect list when it is actually closed.
-     *
-     * If 'processOutstanding' is false, outstanding receives are discarded and the channel
is
-     * closed immediately. The channel will not be added to disconnected list and it is the
-     * responsibility of the caller to handle disconnect notifications.
+     * The channel will be added to disconnect list when it is actually closed if `closeMode.notifyDisconnect`
+     * is true.
      */
-    private void close(KafkaChannel channel, boolean processOutstanding, boolean notifyDisconnect)
{
-
-        if (processOutstanding && !notifyDisconnect)
-            throw new IllegalStateException("Disconnect notification required for remote
disconnect after processing outstanding requests");
-
+    private void close(KafkaChannel channel, CloseMode closeMode) {
         channel.disconnect();
 
         // Ensure that `connected` does not have closed channels. This could happen if `prepare`
throws an exception
@@ -719,12 +726,12 @@ public class Selector implements Selectable, AutoCloseable {
         // a send fails or all outstanding receives are processed. Mute state of disconnected
channels
         // are tracked to ensure that requests are processed one-by-one by the broker to
preserve ordering.
         Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
-        if (processOutstanding && deque != null && !deque.isEmpty()) {
+        if (closeMode == CloseMode.GRACEFUL && deque != null && !deque.isEmpty())
{
             // stagedReceives will be moved to completedReceives later along with receives
from other channels
             closingChannels.put(channel.id(), channel);
             log.debug("Tracking closing connection {} to process outstanding requests", channel.id());
         } else
-            doClose(channel, notifyDisconnect);
+            doClose(channel, closeMode.notifyDisconnect);
         this.channels.remove(channel.id());
 
         if (idleExpiryManager != null)

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message