hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r1775675 - in /httpcomponents/httpcore/trunk: httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/ httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ h...
Date Thu, 22 Dec 2016 15:32:35 GMT
Author: olegk
Date: Thu Dec 22 15:32:35 2016
New Revision: 1775675

URL: http://svn.apache.org/viewvc?rev=1775675&view=rev
Log:
Redesign of graceful I/O reactor shutdown logic

Modified:
    httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/AsyncRequester.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/AsyncServer.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/IOReactorExecutor.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequest.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestImpl.java

Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java?rev=1775675&r1=1775674&r2=1775675&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
Thu Dec 22 15:32:35 2016
@@ -90,7 +90,7 @@ public class TestDefaultListeningIOReact
                     }
                 };
             }
-        }, reactorConfig);
+        }, reactorConfig, null);
     }
 
     @After

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java?rev=1775675&r1=1775674&r2=1775675&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
Thu Dec 22 15:32:35 2016
@@ -280,7 +280,7 @@ abstract class AbstractHttp1StreamDuplex
         } finally {
             outputLock.unlock();
         }
-        if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) < 0) {
+        if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
             if (isOutputReady()) {
                 produceOutput();
             } else {

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/AsyncRequester.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/AsyncRequester.java?rev=1775675&r1=1775674&r2=1775675&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/AsyncRequester.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/AsyncRequester.java
Thu Dec 22 15:32:35 2016
@@ -68,8 +68,10 @@ public class AsyncRequester extends IORe
     DefaultConnectingIOReactor createIOReactor(
             final IOEventHandlerFactory ioEventHandlerFactory,
             final IOReactorConfig ioReactorConfig,
-            final ThreadFactory threadFactory) throws IOException {
-        return new DefaultConnectingIOReactor(ioEventHandlerFactory, ioReactorConfig, threadFactory);
+            final ThreadFactory threadFactory,
+            final Callback<IOSession> sessionShutdownCallback) throws IOException {
+        return new DefaultConnectingIOReactor(
+                ioEventHandlerFactory, ioReactorConfig, threadFactory, sessionShutdownCallback);
     }
 
     private NamedEndpoint toEndpoint(final HttpHost host) {

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/AsyncServer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/AsyncServer.java?rev=1775675&r1=1775674&r2=1775675&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/AsyncServer.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/AsyncServer.java
Thu Dec 22 15:32:35 2016
@@ -53,16 +53,14 @@ public class AsyncServer extends IOReact
                 sessionShutdownCallback);
     }
 
-    public AsyncServer(final IOReactorConfig ioReactorConfig, final ExceptionListener exceptionListener)
{
-        this(ioReactorConfig, exceptionListener, null);
-    }
-
     @Override
     DefaultListeningIOReactor createIOReactor(
             final IOEventHandlerFactory ioEventHandlerFactory,
             final IOReactorConfig ioReactorConfig,
-            final ThreadFactory threadFactory) throws IOException {
-        return new DefaultListeningIOReactor(ioEventHandlerFactory, ioReactorConfig, threadFactory);
+            final ThreadFactory threadFactory,
+            final Callback<IOSession> sessionShutdownCallback) throws IOException {
+        return new DefaultListeningIOReactor(
+                ioEventHandlerFactory, ioReactorConfig, threadFactory, sessionShutdownCallback);
     }
 
     public ListenerEndpoint listen(final InetSocketAddress address) {

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/IOReactorExecutor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/IOReactorExecutor.java?rev=1775675&r1=1775674&r2=1775675&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/IOReactorExecutor.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/IOReactorExecutor.java
Thu Dec 22 15:32:35 2016
@@ -52,9 +52,9 @@ abstract class IOReactorExecutor<T exten
 
     private final IOReactorConfig ioReactorConfig;
     private final ExceptionListener exceptionListener;
-    private final Callback<IOSession> sessionShutdownCallback;
     private final ExecutorService executorService;
     private final ThreadFactory workerThreadFactory;
+    private final Callback<IOSession> sessionShutdownCallback;
     private final AtomicReference<T> ioReactorRef;
     private final AtomicReference<Status> status;
 
@@ -67,9 +67,9 @@ abstract class IOReactorExecutor<T exten
         super();
         this.ioReactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
         this.exceptionListener = exceptionListener;
-        this.sessionShutdownCallback = sessionShutdownCallback;
         this.executorService = Executors.newSingleThreadExecutor(threadFactory);
         this.workerThreadFactory = workerThreadFactory;
+        this.sessionShutdownCallback = sessionShutdownCallback;
         this.ioReactorRef = new AtomicReference<>(null);
         this.status = new AtomicReference<>(Status.READY);
     }
@@ -77,14 +77,16 @@ abstract class IOReactorExecutor<T exten
     abstract T createIOReactor(
             IOEventHandlerFactory ioEventHandlerFactory,
             IOReactorConfig ioReactorConfig,
-            ThreadFactory threadFactory) throws IOException;
+            ThreadFactory threadFactory,
+            Callback<IOSession> sessionShutdownCallback) throws IOException;
 
     protected void execute(final IOEventHandlerFactory ioEventHandlerFactory) throws IOException
{
         Args.notNull(ioEventHandlerFactory, "Handler factory");
         if (ioReactorRef.compareAndSet(null, createIOReactor(
                 ioEventHandlerFactory,
                 ioReactorConfig,
-                workerThreadFactory != null ? workerThreadFactory : new ThreadFactoryImpl("i/o
dispatch")))) {
+                workerThreadFactory != null ? workerThreadFactory : new ThreadFactoryImpl("i/o
dispatch"),
+                sessionShutdownCallback))) {
             if (status.compareAndSet(Status.READY, Status.RUNNING)) {
                 executorService.execute(new Runnable() {
 
@@ -135,9 +137,6 @@ abstract class IOReactorExecutor<T exten
     private void initiateShutdown(final T ioReactor) {
         if (status.compareAndSet(Status.RUNNING, Status.TERMINATED)) {
             ioReactor.initiateShutdown();
-            if (sessionShutdownCallback != null) {
-                ioReactor.enumSessions(sessionShutdownCallback);
-            }
         }
     }
 

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java?rev=1775675&r1=1775674&r2=1775675&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
Thu Dec 22 15:32:35 2016
@@ -95,6 +95,7 @@ public abstract class AbstractMultiworke
     private final int workerCount;
     private final IOEventHandlerFactory eventHandlerFactory;
     private final ThreadFactory threadFactory;
+    private final Callback<IOSession> sessionShutdownCallback;
     private final IOReactorImpl[] dispatchers;
     private final Worker[] workers;
     private final Thread[] threads;
@@ -121,7 +122,8 @@ public abstract class AbstractMultiworke
     public AbstractMultiworkerIOReactor(
             final IOEventHandlerFactory eventHandlerFactory,
             final IOReactorConfig reactorConfig,
-            final ThreadFactory threadFactory) throws IOReactorException {
+            final ThreadFactory threadFactory,
+            final Callback<IOSession> sessionShutdownCallback) throws IOReactorException
{
         super();
         this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
         this.reactorConfig = reactorConfig != null ? reactorConfig : IOReactorConfig.DEFAULT;
@@ -130,11 +132,8 @@ public abstract class AbstractMultiworke
         } catch (final IOException ex) {
             throw new IOReactorException("Failure opening selector", ex);
         }
-        if (threadFactory != null) {
-            this.threadFactory = threadFactory;
-        } else {
-            this.threadFactory = new DefaultThreadFactory();
-        }
+        this.threadFactory = threadFactory != null ? threadFactory : new DefaultThreadFactory();
+        this.sessionShutdownCallback = sessionShutdownCallback;
         this.auditLog = new ArrayList<>();
         this.workerCount = this.reactorConfig.getIoThreadCount();
         this.dispatchers = new IOReactorImpl[workerCount];
@@ -144,16 +143,10 @@ public abstract class AbstractMultiworke
         this.shutdownMutex = new Object();
     }
 
-    /**
-     * Creates an instance of AbstractMultiworkerIOReactor with default configuration.
-     *
-     * @throws IOReactorException in case if a non-recoverable I/O error.
-     *
-     * @since 4.2
-     */
     public AbstractMultiworkerIOReactor(
-            final IOEventHandlerFactory eventHandlerFactory) throws IOReactorException {
-        this(eventHandlerFactory, null, null);
+            final IOEventHandlerFactory eventHandlerFactory,
+            final Callback<IOSession> sessionShutdownCallback) throws IOReactorException
{
+        this(eventHandlerFactory, null, null, sessionShutdownCallback);
     }
 
     @Override
@@ -258,7 +251,11 @@ public abstract class AbstractMultiworke
         try {
             // Start I/O dispatchers
             for (int i = 0; i < this.dispatchers.length; i++) {
-                final IOReactorImpl dispatcher = new IOReactorImpl(this.eventHandlerFactory,
this.reactorConfig, this.exceptionHandler);
+                final IOReactorImpl dispatcher = new IOReactorImpl(
+                        this.eventHandlerFactory,
+                        this.reactorConfig,
+                        this.exceptionHandler,
+                        this.sessionShutdownCallback);
                 this.dispatchers[i] = dispatcher;
             }
             for (int i = 0; i < this.workerCount; i++) {
@@ -392,22 +389,6 @@ public abstract class AbstractMultiworke
         }
     }
 
-    /**
-     * Enumerates all active sessions
-     *
-     * @since 5.0
-     */
-    public void enumSessions(final Callback<IOSession> callback) {
-        if (callback == null) {
-            return;
-        }
-        for (IOReactorImpl dispatcher: dispatchers) {
-            if (dispatcher != null) {
-                dispatcher.enumSessions(callback);
-            }
-        }
-    }
-
     @Override
     public void initiateShutdown() {
         if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN))
{

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java?rev=1775675&r1=1775674&r2=1775675&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
Thu Dec 22 15:32:35 2016
@@ -40,6 +40,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ThreadFactory;
 
+import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.net.NamedEndpoint;
 import org.apache.hc.core5.util.Asserts;
 
@@ -58,41 +59,22 @@ public class DefaultConnectingIOReactor
     private final long selectInterval;
     private long lastTimeoutCheck;
 
-    /**
-     * Creates an instance of DefaultConnectingIOReactor with the given configuration.
-     *
-     * @param eventHandlerFactory the factory to create I/O event handlers.
-     * @param reactorConfig I/O reactor configuration.
-     * @param threadFactory the factory to create threads.
-     *   Can be {@code null}.
-     * @throws IOReactorException in case if a non-recoverable I/O error.
-     *
-     * @since 5.0
-     */
     public DefaultConnectingIOReactor(
             final IOEventHandlerFactory eventHandlerFactory,
             final IOReactorConfig reactorConfig,
-            final ThreadFactory threadFactory) throws IOReactorException {
-        super(eventHandlerFactory, reactorConfig, threadFactory);
+            final ThreadFactory threadFactory,
+            final Callback<IOSession> sessionShutdownCallback) throws IOReactorException
{
+        super(eventHandlerFactory, reactorConfig, threadFactory, sessionShutdownCallback);
         this.requestQueue = new ConcurrentLinkedQueue<>();
         this.selectInterval = this.reactorConfig.getSelectInterval();
         this.lastTimeoutCheck = System.currentTimeMillis();
     }
 
-    /**
-     * Creates an instance of DefaultConnectingIOReactor with the given configuration.
-     *
-     * @param eventHandlerFactory the factory to create I/O event handlers.
-     * @param config I/O reactor configuration.
-     *   Can be {@code null}.
-     * @throws IOReactorException in case if a non-recoverable I/O error.
-     *
-     * @since 5.0
-     */
     public DefaultConnectingIOReactor(
             final IOEventHandlerFactory eventHandlerFactory,
-            final IOReactorConfig config) throws IOReactorException {
-        this(eventHandlerFactory, config, null);
+            final IOReactorConfig config,
+            final Callback<IOSession> sessionShutdownCallback) throws IOReactorException
{
+        this(eventHandlerFactory, config, null, sessionShutdownCallback);
     }
 
     /**

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java?rev=1775675&r1=1775674&r2=1775675&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
Thu Dec 22 15:32:35 2016
@@ -42,6 +42,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ThreadFactory;
 
+import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.util.Asserts;
 
 /**
@@ -74,8 +75,9 @@ public class DefaultListeningIOReactor e
     public DefaultListeningIOReactor(
             final IOEventHandlerFactory eventHandlerFactory,
             final IOReactorConfig config,
-            final ThreadFactory threadFactory) throws IOReactorException {
-        super(eventHandlerFactory, config, threadFactory);
+            final ThreadFactory threadFactory,
+            final Callback<IOSession> sessionShutdownCallback) throws IOReactorException
{
+        super(eventHandlerFactory, config, threadFactory, sessionShutdownCallback);
         this.requestQueue = new ConcurrentLinkedQueue<>();
         this.endpoints = Collections.synchronizedSet(new HashSet<ListenerEndpointImpl>());
         this.pausedEndpoints = new HashSet<>();
@@ -93,8 +95,9 @@ public class DefaultListeningIOReactor e
      */
     public DefaultListeningIOReactor(
             final IOEventHandlerFactory eventHandlerFactory,
-            final IOReactorConfig config) throws IOReactorException {
-        this(eventHandlerFactory, config, null);
+            final IOReactorConfig config,
+            final Callback<IOSession> sessionShutdownCallback) throws IOReactorException
{
+        this(eventHandlerFactory, config, null, sessionShutdownCallback);
     }
 
     /**

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java?rev=1775675&r1=1775674&r2=1775675&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
Thu Dec 22 15:32:35 2016
@@ -27,6 +27,7 @@
 
 package org.apache.hc.core5.reactor;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.nio.channels.CancelledKeyException;
@@ -39,8 +40,10 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hc.core5.concurrent.Cancellable;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.util.Args;
 
@@ -57,19 +60,24 @@ class IOReactorImpl implements IOReactor
     private final Queue<ManagedIOSession> closedSessions;
     private final Queue<PendingSession> pendingSessions;
     private final AtomicReference<IOReactorStatus> status;
+    private final AtomicBoolean shutdownInitiated;
     private final Object shutdownMutex;
     private final IOReactorExceptionHandler exceptionHandler;
+    private final Callback<IOSession> sessionShutdownCallback;
 
     private volatile long lastTimeoutCheck;
 
     IOReactorImpl(
             final IOEventHandlerFactory eventHandlerFactory,
             final IOReactorConfig reactorConfig,
-            final IOReactorExceptionHandler exceptionHandler) {
+            final IOReactorExceptionHandler exceptionHandler,
+            final Callback<IOSession> sessionShutdownCallback) {
         super();
         this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
         this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
         this.exceptionHandler = exceptionHandler;
+        this.sessionShutdownCallback = sessionShutdownCallback;
+        this.shutdownInitiated = new AtomicBoolean(false);
         this.closedSessions = new ConcurrentLinkedQueue<>();
         this.pendingSessions = new ConcurrentLinkedQueue<>();
         try {
@@ -86,6 +94,21 @@ class IOReactorImpl implements IOReactor
         return this.status.get();
     }
 
+    private void closeQuietly(final Closeable closeable) {
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (final IOException ignore) {
+            }
+        }
+    }
+
+    private void cancelQuietly(final Cancellable cancellable) {
+        if (cancellable != null) {
+            cancellable.cancel();
+        }
+    }
+
     void enqueuePendingSession(final SocketChannel socketChannel, final SessionRequestImpl
sessionRequest) {
         Args.notNull(socketChannel, "SocketChannel");
         this.pendingSessions.add(new PendingSession(socketChannel, sessionRequest));
@@ -130,6 +153,9 @@ class IOReactorImpl implements IOReactor
                 }
 
                 if (this.status.get().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
+                    if (this.shutdownInitiated.compareAndSet(false, true)) {
+                        initiateSessionShutdown();
+                    }
                     closePendingSessions();
                 }
                 if (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) == 0) {
@@ -141,8 +167,7 @@ class IOReactorImpl implements IOReactor
                     processEvents(this.selector.selectedKeys());
                 }
 
-                // Validate active channels
-                validate(this.selector.keys());
+                validateActiveChannels();
 
                 // Process closed sessions
                 processClosedSessions();
@@ -153,7 +178,11 @@ class IOReactorImpl implements IOReactor
                 }
 
                 // Exit select loop if graceful shutdown has been completed
-                if (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) > 0 &&
this.selector.keys().isEmpty()) {
+                if (this.status.get().compareTo(IOReactorStatus.SHUTTING_DOWN) == 0
+                        && this.selector.keys().isEmpty()) {
+                    this.status.set(IOReactorStatus.SHUT_DOWN);
+                }
+                if (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) == 0) {
                     break;
                 }
             }
@@ -173,14 +202,24 @@ class IOReactorImpl implements IOReactor
         }
     }
 
-    private void validate(final Set<SelectionKey> keys) {
+    private void initiateSessionShutdown() {
+        if (this.sessionShutdownCallback != null) {
+            final Set<SelectionKey> keys = this.selector.keys();
+            for (final SelectionKey key : keys) {
+                final ManagedIOSession session = (ManagedIOSession) key.attachment();
+                if (session != null) {
+                    this.sessionShutdownCallback.execute(session);
+                }
+            }
+        }
+    }
+
+    private void validateActiveChannels() {
         final long currentTime = System.currentTimeMillis();
         if( (currentTime - this.lastTimeoutCheck) >= this.reactorConfig.getSelectInterval())
{
             this.lastTimeoutCheck = currentTime;
-            if (keys != null) {
-                for (final SelectionKey key : keys) {
-                    timeoutCheck(key, currentTime);
-                }
+            for (final SelectionKey key : this.selector.keys()) {
+                timeoutCheck(key, currentTime);
             }
         }
     }
@@ -294,51 +333,24 @@ class IOReactorImpl implements IOReactor
     }
 
     private void closePendingSessions() {
-        PendingSession pendingSession;
-        while ((pendingSession = this.pendingSessions.poll()) != null) {
-            final SessionRequestImpl sessionRequest = pendingSession.sessionRequest;
-            if (sessionRequest != null) {
-                sessionRequest.cancel();
-            }
-            final SocketChannel channel = pendingSession.socketChannel;
-            try {
-                channel.close();
-            } catch (final IOException ignore) {
+        for (;;) {
+            final PendingSession pendingSession = this.pendingSessions.poll();
+            if (pendingSession == null) {
+                break;
+            } else {
+                cancelQuietly(pendingSession.sessionRequest);
+                closeQuietly(pendingSession.socketChannel);
             }
         }
     }
 
     private void closeActiveChannels() {
-        try {
-            final Set<SelectionKey> keys = this.selector.keys();
-            for (final SelectionKey key : keys) {
-                final ManagedIOSession session = (ManagedIOSession) key.attachment();
-                if (session != null) {
-                    session.close();
-                }
-            }
-            this.selector.close();
-        } catch (final IOException ignore) {
-        }
-    }
-
-    void enumSessions(final Callback<IOSession> callback) {
-        if (this.selector.isOpen()) {
-            try {
-                final Set<SelectionKey> keys = this.selector.keys();
-                for (final SelectionKey key : keys) {
-                    final ManagedIOSession session = (ManagedIOSession) key.attachment();
-                    if (session != null) {
-                        try {
-                            callback.execute(session);
-                        } catch (CancelledKeyException ex) {
-                            session.close();
-                        }
-                    }
-                }
-            } catch (ClosedSelectorException ignore) {
-            }
+        final Set<SelectionKey> keys = this.selector.keys();
+        for (final SelectionKey key : keys) {
+            final ManagedIOSession session = (ManagedIOSession) key.attachment();
+            closeQuietly(session);
         }
+        closeQuietly(this.selector);
     }
 
     @Override
@@ -360,7 +372,7 @@ class IOReactorImpl implements IOReactor
 
     @Override
     public void initiateShutdown() {
-        if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUT_DOWN))
{
+        if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN))
{
             selector.wakeup();
         }
     }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequest.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequest.java?rev=1775675&r1=1775674&r2=1775675&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequest.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequest.java
Thu Dec 22 15:32:35 2016
@@ -30,6 +30,7 @@ package org.apache.hc.core5.reactor;
 import java.io.IOException;
 import java.net.SocketAddress;
 
+import org.apache.hc.core5.concurrent.Cancellable;
 import org.apache.hc.core5.net.NamedEndpoint;
 
 /**
@@ -41,7 +42,7 @@ import org.apache.hc.core5.net.NamedEndp
  *
  * @since 4.0
  */
-public interface SessionRequest {
+public interface SessionRequest extends Cancellable {
 
     /**
      * Returns remote endpoint.
@@ -122,11 +123,4 @@ public interface SessionRequest {
      */
     int getConnectTimeout();
 
-    /**
-     * Cancels the request. Invocation of this method will set the status of
-     * the request to completed and will unblock threads blocked in
-     * the {{@link #waitFor()}} method.
-     */
-    void cancel();
-
 }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestImpl.java?rev=1775675&r1=1775674&r2=1775675&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestImpl.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestImpl.java
Thu Dec 22 15:32:35 2016
@@ -207,9 +207,9 @@ public class SessionRequestImpl implemen
     }
 
     @Override
-    public void cancel() {
+    public boolean cancel() {
         if (this.completed) {
-            return;
+            return false;
         }
         this.completed = true;
         final SelectionKey key = this.key;
@@ -228,6 +228,7 @@ public class SessionRequestImpl implemen
             }
             notifyAll();
         }
+        return true;
     }
 
 }



Mime
View raw message