hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject [3/7] httpcomponents-core git commit: Rewrite of I/O reactor internal channel management; more efficient handling of outgoing connection requests
Date Thu, 15 Jun 2017 05:59:34 GMT
Rewrite of I/O reactor internal channel management; more efficient handling of outgoing connection requests


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/a7f77921
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/a7f77921
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/a7f77921

Branch: refs/heads/master
Commit: a7f77921627552a5a1b0c2e6e0d7a0b4dba07a69
Parents: 43714ad
Author: Oleg Kalnichevski <olegk@apache.org>
Authored: Tue Jun 13 14:27:12 2017 +0200
Committer: Oleg Kalnichevski <olegk@apache.org>
Committed: Wed Jun 14 19:07:19 2017 +0200

----------------------------------------------------------------------
 .../http/examples/Http2FileServerExample.java   |  19 -
 .../nio/bootstrap/H2RequesterBootstrap.java     |  18 +-
 .../impl/nio/bootstrap/H2ServerBootstrap.java   |  20 +-
 .../impl/nio/bootstrap/Http2AsyncRequester.java |   7 +-
 .../hc/core5/testing/nio/AsyncRequester.java    |   5 +-
 .../hc/core5/testing/nio/AsyncServer.java       |   6 +-
 .../hc/core5/testing/nio/IOReactorExecutor.java |  57 +--
 .../nio/TestDefaultListeningIOReactor.java      |  80 +--
 .../http/examples/AsyncFileServerExample.java   |  19 -
 .../http/impl/bootstrap/AsyncRequester.java     |  26 +-
 .../impl/bootstrap/AsyncRequesterBootstrap.java |  29 +-
 .../core5/http/impl/bootstrap/AsyncServer.java  |  24 +-
 .../impl/bootstrap/AsyncServerBootstrap.java    |  20 +-
 .../http/impl/bootstrap/HttpAsyncRequester.java |  11 +-
 .../http/impl/bootstrap/HttpAsyncServer.java    |  13 +-
 .../http/impl/bootstrap/IOReactorExecutor.java  | 112 ----
 .../reactor/AbstractMultiworkerIOReactor.java   | 508 -------------------
 .../reactor/AbstractSingleCoreIOReactor.java    | 168 ++++++
 .../hc/core5/reactor/ConnectingIOReactor.java   |   2 +-
 .../hc/core5/reactor/ConnectionAcceptor.java    |  81 +++
 .../hc/core5/reactor/ConnectionInitiator.java   |   2 +-
 .../reactor/DefaultConnectingIOReactor.java     | 244 +++------
 .../reactor/DefaultListeningIOReactor.java      | 289 ++++-------
 .../org/apache/hc/core5/reactor/IOReactor.java  |  10 -
 .../hc/core5/reactor/IOReactorException.java    |  53 --
 .../reactor/IOReactorExceptionHandler.java      |  52 --
 .../apache/hc/core5/reactor/IOReactorImpl.java  | 418 ---------------
 .../hc/core5/reactor/IOReactorService.java      |  39 ++
 .../reactor/IOReactorShutdownException.java     |  41 ++
 .../hc/core5/reactor/IOReactorStatus.java       |  12 +-
 .../hc/core5/reactor/IOReactorWorker.java       |  57 +++
 .../apache/hc/core5/reactor/IOSessionImpl.java  |   8 -
 .../hc/core5/reactor/InternalChannel.java       |  81 +++
 .../core5/reactor/InternalConnectChannel.java   | 101 ++++
 .../hc/core5/reactor/InternalDataChannel.java   | 310 +++++++++++
 .../reactor/InternalDataChannelFactory.java     |  43 ++
 .../hc/core5/reactor/InternalIOSession.java     | 330 ------------
 .../hc/core5/reactor/ListeningIOReactor.java    |  48 +-
 .../hc/core5/reactor/MultiCoreIOReactor.java    | 137 +++++
 .../apache/hc/core5/reactor/SessionRequest.java |  10 +-
 .../hc/core5/reactor/SessionRequestHandle.java  |  59 ---
 .../hc/core5/reactor/SessionRequestImpl.java    |   6 +-
 .../hc/core5/reactor/SingleCoreIOReactor.java   | 337 ++++++++++++
 .../reactor/SingleCoreListeningIOReactor.java   | 235 +++++++++
 44 files changed, 1857 insertions(+), 2290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FileServerExample.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FileServerExample.java b/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FileServerExample.java
index d95afbb..5f5fe03 100644
--- a/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FileServerExample.java
+++ b/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FileServerExample.java
@@ -29,17 +29,14 @@ package org.apache.hc.core5.http.examples;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Locale;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hc.core5.http.ConnectionClosedException;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.EndpointDetails;
-import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpConnection;
 import org.apache.hc.core5.http.HttpException;
@@ -91,22 +88,6 @@ public class Http2FileServerExample {
 
         final HttpAsyncServer server = H2ServerBootstrap.bootstrap()
                 .setIOReactorConfig(config)
-                .setExceptionListener(new ExceptionListener() {
-
-                    @Override
-                    public void onError(final Exception ex) {
-                        if (ex instanceof ConnectionClosedException) {
-                            return;
-                        }
-                        if (ex instanceof SocketTimeoutException) {
-                            System.out.println("Timeout");
-                        } else if (ex instanceof IOException) {
-                            System.out.println("I/O error: " + ex.getMessage());
-                        } else {
-                            ex.printStackTrace();
-                        }
-                    }
-                })
                 .setConnectionListener(new ConnectionListener() {
 
                     @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java
index e856800..61e86a8 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java
@@ -52,7 +52,6 @@ import org.apache.hc.core5.pool.ConnPoolListener;
 import org.apache.hc.core5.pool.ConnPoolPolicy;
 import org.apache.hc.core5.pool.StrictConnPool;
 import org.apache.hc.core5.reactor.IOReactorConfig;
-import org.apache.hc.core5.reactor.IOReactorException;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.TimeValue;
@@ -252,17 +251,12 @@ public class H2RequesterBootstrap {
                 http2StreamHandlerFactory,
                 versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE,
                 connectionListener);
-        try {
-            return new Http2AsyncRequester(
-                    versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE,
-                    ioReactorConfig,
-                    ioEventHandlerFactory,
-                    connPool,
-                    tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(),
-                    exceptionListener);
-        } catch (final IOReactorException ex) {
-            throw new IllegalStateException(ex);
-        }
+        return new Http2AsyncRequester(
+                versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE,
+                ioReactorConfig,
+                ioEventHandlerFactory,
+                connPool,
+                tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy());
     }
 
     private static class PushConsumerEntry {

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java
index 60eff6f..8842d7b 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java
@@ -30,7 +30,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hc.core5.function.Supplier;
-import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.config.CharCodingConfig;
 import org.apache.hc.core5.http.config.H1Config;
 import org.apache.hc.core5.http.impl.ConnectionListener;
@@ -59,7 +58,6 @@ import org.apache.hc.core5.http2.ssl.H2ServerTlsStrategy;
 import org.apache.hc.core5.net.InetAddressUtils;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
-import org.apache.hc.core5.reactor.IOReactorException;
 import org.apache.hc.core5.util.Args;
 
 /**
@@ -76,7 +74,6 @@ public class H2ServerBootstrap {
     private H2Config h2Config;
     private H1Config h1Config;
     private TlsStrategy tlsStrategy;
-    private ExceptionListener exceptionListener;
     private ConnectionListener connectionListener;
     private Http2StreamListener http2StreamListener;
     private Http1StreamListener http1StreamListener;
@@ -156,14 +153,6 @@ public class H2ServerBootstrap {
     }
 
     /**
-     * Assigns {@link ExceptionListener} instance.
-     */
-    public final H2ServerBootstrap setExceptionListener(final ExceptionListener exceptionListener) {
-        this.exceptionListener = exceptionListener;
-        return this;
-    }
-
-    /**
      * Assigns {@link ConnectionListener} instance.
      */
     public final H2ServerBootstrap setConnectionListener(final ConnectionListener connectionListener) {
@@ -264,14 +253,7 @@ public class H2ServerBootstrap {
                 versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE,
                 tlsStrategy != null ? tlsStrategy : new H2ServerTlsStrategy(new int[] {443, 8443}),
                 connectionListener);
-        try {
-            return new HttpAsyncServer(
-                    ioEventHandlerFactory,
-                    ioReactorConfig,
-                    exceptionListener);
-        } catch (final IOReactorException ex) {
-            throw new IllegalStateException(ex);
-        }
+        return new HttpAsyncServer(ioEventHandlerFactory, ioReactorConfig);
     }
 
     private static class HandlerEntry {

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2AsyncRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2AsyncRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2AsyncRequester.java
index 5594fea..ef24e38 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2AsyncRequester.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2AsyncRequester.java
@@ -30,7 +30,6 @@ package org.apache.hc.core5.http2.impl.nio.bootstrap;
 import java.util.concurrent.Future;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
-import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
@@ -39,7 +38,6 @@ import org.apache.hc.core5.http2.HttpVersionPolicy;
 import org.apache.hc.core5.pool.ControlledConnPool;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
-import org.apache.hc.core5.reactor.IOReactorException;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.util.TimeValue;
 
@@ -55,9 +53,8 @@ public class Http2AsyncRequester extends HttpAsyncRequester {
             final IOReactorConfig ioReactorConfig,
             final IOEventHandlerFactory eventHandlerFactory,
             final ControlledConnPool<HttpHost, IOSession> connPool,
-            final TlsStrategy tlsStrategy,
-            final ExceptionListener exceptionListener) throws IOReactorException {
-        super(ioReactorConfig, eventHandlerFactory, connPool, tlsStrategy, exceptionListener);
+            final TlsStrategy tlsStrategy) {
+        super(ioReactorConfig, eventHandlerFactory, connPool, tlsStrategy);
         this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
     }
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java
index 02be34c..dca65f8 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java
@@ -32,7 +32,6 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.concurrent.ThreadFactory;
 
-import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.URIScheme;
@@ -50,9 +49,7 @@ import org.apache.hc.core5.util.TimeValue;
 public class AsyncRequester extends IOReactorExecutor<DefaultConnectingIOReactor> implements ConnectionInitiator {
 
     public AsyncRequester(final IOReactorConfig ioReactorConfig) {
-        super(ioReactorConfig,
-                new DefaultThreadFactory("connector", true),
-                new DefaultThreadFactory("requester-dispatch", true));
+        super(ioReactorConfig, null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java
index 8698134..2f5d24f 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java
@@ -32,7 +32,6 @@ import java.net.InetSocketAddress;
 import java.util.Set;
 import java.util.concurrent.ThreadFactory;
 
-import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.reactor.DefaultListeningIOReactor;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
@@ -43,7 +42,7 @@ import org.apache.hc.core5.reactor.ListenerEndpoint;
 public class AsyncServer extends IOReactorExecutor<DefaultListeningIOReactor> {
 
     public AsyncServer(final IOReactorConfig ioReactorConfig) {
-        super(ioReactorConfig, new DefaultThreadFactory("listener", true), new DefaultThreadFactory("server-dispatch", true));
+        super(ioReactorConfig, null);
     }
 
     @Override
@@ -52,8 +51,7 @@ public class AsyncServer extends IOReactorExecutor<DefaultListeningIOReactor> {
             final IOReactorConfig ioReactorConfig,
             final ThreadFactory threadFactory,
             final Callback<IOSession> sessionShutdownCallback) throws IOException {
-        return new DefaultListeningIOReactor(
-                ioEventHandlerFactory, ioReactorConfig, threadFactory, null, sessionShutdownCallback);
+        return new DefaultListeningIOReactor(ioEventHandlerFactory, ioReactorConfig, threadFactory, sessionShutdownCallback);
     }
 
     public ListenerEndpoint listen(final InetSocketAddress address) {

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
index e0ed7e4..e686251 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
@@ -28,57 +28,35 @@
 package org.apache.hc.core5.testing.nio;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.function.Callback;
-import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
 import org.apache.hc.core5.io.ShutdownType;
-import org.apache.hc.core5.reactor.AbstractMultiworkerIOReactor;
-import org.apache.hc.core5.reactor.ExceptionEvent;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
+import org.apache.hc.core5.reactor.IOReactor;
 import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.reactor.IOReactorService;
 import org.apache.hc.core5.reactor.IOReactorStatus;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.Asserts;
 import org.apache.hc.core5.util.TimeValue;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
-abstract class IOReactorExecutor<T extends AbstractMultiworkerIOReactor> implements AutoCloseable {
+abstract class IOReactorExecutor<T extends IOReactorService> implements AutoCloseable {
 
     enum Status { READY, RUNNING, TERMINATED }
 
-    private final Logger log = LogManager.getLogger(Http1TestClient.class);
-
     private final IOReactorConfig ioReactorConfig;
-    private final ExceptionListener exceptionListener;
-    private final ExecutorService executorService;
     private final ThreadFactory workerThreadFactory;
     private final AtomicReference<T> ioReactorRef;
     private final AtomicReference<Status> status;
 
-    IOReactorExecutor(
-            final IOReactorConfig ioReactorConfig,
-            final ThreadFactory threadFactory,
-            final ThreadFactory workerThreadFactory) {
+    IOReactorExecutor(final IOReactorConfig ioReactorConfig, final ThreadFactory workerThreadFactory) {
         super();
         this.ioReactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
-        this.exceptionListener = new ExceptionListener() {
-
-            @Override
-            public void onError(final Exception ex) {
-                log.error(ex.getMessage(), ex);
-            }
-
-        };
-        this.executorService = Executors.newSingleThreadExecutor(threadFactory);
         this.workerThreadFactory = workerThreadFactory;
         this.ioReactorRef = new AtomicReference<>(null);
         this.status = new AtomicReference<>(Status.READY);
@@ -105,19 +83,7 @@ abstract class IOReactorExecutor<T extends AbstractMultiworkerIOReactor> impleme
 
                 }))) {
             if (status.compareAndSet(Status.READY, Status.RUNNING)) {
-                executorService.execute(new Runnable() {
-
-                    @Override
-                    public void run() {
-                        try {
-                            ioReactorRef.get().execute();
-                        } catch (final Exception ex) {
-                            if (exceptionListener != null) {
-                                exceptionListener.onError(ex);
-                            }
-                        }
-                    }
-                });
+                ioReactorRef.get().start();
             }
         } else {
             throw new IllegalStateException("I/O reactor has already been started");
@@ -135,25 +101,20 @@ abstract class IOReactorExecutor<T extends AbstractMultiworkerIOReactor> impleme
     }
 
     public IOReactorStatus getStatus() {
-        final T ioReactor = ioReactorRef.get();
+        final IOReactor ioReactor = ioReactorRef.get();
         return ioReactor != null ? ioReactor.getStatus() : IOReactorStatus.INACTIVE;
     }
 
-    public List<ExceptionEvent> getAuditLog() {
-        final T ioReactor = ensureRunning();
-        return ioReactor.getAuditLog();
-    }
-
     public void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
         Args.notNull(waitTime, "Wait time");
-        final T ioReactor = ioReactorRef.get();
+        final IOReactor ioReactor = ioReactorRef.get();
         if (ioReactor != null) {
             ioReactor.awaitShutdown(waitTime);
         }
     }
 
     public void initiateShutdown() {
-        final T ioReactor = ioReactorRef.get();
+        final IOReactor ioReactor = ioReactorRef.get();
         if (ioReactor != null) {
             if (status.compareAndSet(Status.RUNNING, Status.TERMINATED)) {
                 ioReactor.initiateShutdown();
@@ -163,7 +124,7 @@ abstract class IOReactorExecutor<T extends AbstractMultiworkerIOReactor> impleme
 
     public void shutdown(final TimeValue graceTime) {
         Args.notNull(graceTime, "Grace time");
-        final T ioReactor = ioReactorRef.get();
+        final IOReactor ioReactor = ioReactorRef.get();
         if (ioReactor != null) {
             if (status.compareAndSet(Status.RUNNING, Status.TERMINATED)) {
                 ioReactor.initiateShutdown();

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
index 6c7ff90..3598677 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
@@ -27,23 +27,19 @@
 
 package org.apache.hc.core5.testing.nio;
 
-import java.io.IOException;
-import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hc.core5.io.ShutdownType;
 import org.apache.hc.core5.reactor.DefaultListeningIOReactor;
 import org.apache.hc.core5.reactor.IOEventHandler;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
-import org.apache.hc.core5.reactor.IOReactorExceptionHandler;
 import org.apache.hc.core5.reactor.IOReactorStatus;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.reactor.ListenerEndpoint;
 import org.apache.hc.core5.reactor.TlsCapableIOSession;
+import org.apache.hc.core5.util.TimeValue;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -106,20 +102,7 @@ public class TestDefaultListeningIOReactor {
 
     @Test
     public void testEndpointUpAndDown() throws Exception {
-
-        final Thread t = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    ioreactor.execute();
-                } catch (final IOException ex) {
-                }
-            }
-
-        });
-
-        t.start();
+        ioreactor.start();
 
         Set<ListenerEndpoint> endpoints = ioreactor.getEndpoints();
         Assert.assertNotNull(endpoints);
@@ -147,30 +130,13 @@ public class TestDefaultListeningIOReactor {
         Assert.assertEquals(port, ((InetSocketAddress) endpoint.getAddress()).getPort());
 
         ioreactor.shutdown(ShutdownType.GRACEFUL);
-        t.join(1000);
-
+        ioreactor.awaitShutdown(TimeValue.ofSeconds(5));
         Assert.assertEquals(IOReactorStatus.SHUT_DOWN, ioreactor.getStatus());
     }
 
     @Test
     public void testEndpointAlreadyBoundFatal() throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        final Thread t = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    ioreactor.execute();
-                    Assert.fail("IOException should have been thrown");
-                } catch (final IOException ex) {
-                    latch.countDown();
-                }
-            }
-
-        });
-
-        t.start();
+        ioreactor.start();
 
         final ListenerEndpoint endpoint1 = ioreactor.listen(new InetSocketAddress(0));
         endpoint1.waitFor();
@@ -180,16 +146,8 @@ public class TestDefaultListeningIOReactor {
         endpoint2.waitFor();
         Assert.assertNotNull(endpoint2.getException());
 
-        // I/O reactor is now expected to be shutting down
-        latch.await(2000, TimeUnit.MILLISECONDS);
-        Assert.assertTrue(ioreactor.getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0);
-
-        final Set<ListenerEndpoint> endpoints = ioreactor.getEndpoints();
-        Assert.assertNotNull(endpoints);
-        Assert.assertEquals(0, endpoints.size());
-
         ioreactor.shutdown(ShutdownType.GRACEFUL);
-        t.join(1000);
+        ioreactor.awaitShutdown(TimeValue.ofSeconds(5));
 
         Assert.assertEquals(IOReactorStatus.SHUT_DOWN, ioreactor.getStatus());
     }
@@ -199,30 +157,8 @@ public class TestDefaultListeningIOReactor {
         final IOReactorConfig reactorConfig = IOReactorConfig.custom()
                 .setIoThreadCount(1)
                 .build();
-        ioreactor = new DefaultListeningIOReactor(
-                new NoopIOEventHandlerFactory(),
-                reactorConfig,
-                new IOReactorExceptionHandler() {
-
-                    @Override
-                    public boolean handle(final IOException ex) {
-                        return (ex instanceof BindException);
-                    }
-
-                }, null);
-        final Thread t = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    ioreactor.execute();
-                } catch (final IOException ex) {
-                }
-            }
-
-        });
-
-        t.start();
+        ioreactor = new DefaultListeningIOReactor(new NoopIOEventHandlerFactory(), reactorConfig, null);
+        ioreactor.start();
 
         final ListenerEndpoint endpoint1 = ioreactor.listen(new InetSocketAddress(9999));
         endpoint1.waitFor();
@@ -237,7 +173,7 @@ public class TestDefaultListeningIOReactor {
         Assert.assertEquals(IOReactorStatus.ACTIVE, ioreactor.getStatus());
 
         ioreactor.shutdown(ShutdownType.GRACEFUL);
-        t.join(1000);
+        ioreactor.awaitShutdown(TimeValue.ofSeconds(5));
 
         Assert.assertEquals(IOReactorStatus.SHUT_DOWN, ioreactor.getStatus());
     }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFileServerExample.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFileServerExample.java b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFileServerExample.java
index b6d74ab..97bcf15 100644
--- a/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFileServerExample.java
+++ b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFileServerExample.java
@@ -29,16 +29,13 @@ package org.apache.hc.core5.http.examples;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Locale;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hc.core5.http.ConnectionClosedException;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.EndpointDetails;
-import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.HttpConnection;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpRequest;
@@ -87,22 +84,6 @@ public class AsyncFileServerExample {
 
         final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
                 .setIOReactorConfig(config)
-                .setExceptionListener(new ExceptionListener() {
-
-                    @Override
-                    public void onError(final Exception ex) {
-                        if (ex instanceof ConnectionClosedException) {
-                            return;
-                        }
-                        if (ex instanceof SocketTimeoutException) {
-                            System.out.println("Timeout");
-                        } else if (ex instanceof IOException) {
-                            System.out.println("I/O error: " + ex.getMessage());
-                        } else {
-                            ex.printStackTrace();
-                        }
-                    }
-                })
                 .setConnectionListener(new ConnectionListener() {
 
                     @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java
index 9a74f28..b190fcf 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java
@@ -28,36 +28,28 @@
 package org.apache.hc.core5.http.impl.bootstrap;
 
 import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 
 import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.function.Callback;
-import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.URIScheme;
-import org.apache.hc.core5.net.NamedEndpoint;
 import org.apache.hc.core5.reactor.ConnectionInitiator;
 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
-import org.apache.hc.core5.reactor.IOReactorException;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.reactor.SessionRequest;
 import org.apache.hc.core5.reactor.SessionRequestCallback;
 import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.TimeValue;
 
-public class AsyncRequester extends IOReactorExecutor<DefaultConnectingIOReactor> implements ConnectionInitiator {
+public class AsyncRequester extends DefaultConnectingIOReactor implements ConnectionInitiator {
 
     public AsyncRequester(
             final IOEventHandlerFactory eventHandlerFactory,
             final IOReactorConfig ioReactorConfig,
-            final ExceptionListener exceptionListener,
-            final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
-        super(new DefaultConnectingIOReactor(
-                        eventHandlerFactory, ioReactorConfig, new DefaultThreadFactory("requester-dispatch", true), sessionShutdownCallback),
-                exceptionListener,
-                new DefaultThreadFactory("connector", true));
+            final Callback<IOSession> sessionShutdownCallback) {
+        super(eventHandlerFactory, ioReactorConfig, new DefaultThreadFactory("requester-dispatch", true), sessionShutdownCallback);
     }
 
     private InetSocketAddress toSocketAddress(final HttpHost host) {
@@ -81,19 +73,9 @@ public class AsyncRequester extends IOReactorExecutor<DefaultConnectingIOReactor
             final SessionRequestCallback callback) {
         Args.notNull(host, "Host");
         Args.notNull(timeout, "Timeout");
-        final SessionRequest  sessionRequest = reactor().connect(host, toSocketAddress(host), null, attachment, callback);
+        final SessionRequest  sessionRequest = connect(host, toSocketAddress(host), null, attachment, callback);
         sessionRequest.setConnectTimeout(timeout.toMillisIntBound());
         return sessionRequest;
     }
 
-    @Override
-    public SessionRequest connect(
-            final NamedEndpoint remoteEndpoint,
-            final SocketAddress remoteAddress,
-            final SocketAddress localAddress,
-            final Object attachment,
-            final SessionRequestCallback callback) {
-        return reactor().connect(remoteEndpoint, remoteAddress, localAddress, attachment, callback);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java
index d890360..a728d43 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java
@@ -27,7 +27,6 @@
 package org.apache.hc.core5.http.impl.bootstrap;
 
 import org.apache.hc.core5.http.ConnectionReuseStrategy;
-import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.config.CharCodingConfig;
 import org.apache.hc.core5.http.config.H1Config;
@@ -44,7 +43,6 @@ import org.apache.hc.core5.pool.ConnPoolPolicy;
 import org.apache.hc.core5.pool.StrictConnPool;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
-import org.apache.hc.core5.reactor.IOReactorException;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.util.TimeValue;
 
@@ -63,7 +61,6 @@ public class AsyncRequesterBootstrap {
     private TimeValue timeToLive;
     private ConnPoolPolicy connPoolPolicy;
     private TlsStrategy tlsStrategy;
-    private ExceptionListener exceptionListener;
     private ConnectionListener connectionListener;
     private Http1StreamListener streamListener;
     private ConnPoolListener<HttpHost> connPoolListener;
@@ -147,14 +144,6 @@ public class AsyncRequesterBootstrap {
     }
 
     /**
-     * Assigns {@link ExceptionListener} instance.
-     */
-    public final AsyncRequesterBootstrap setExceptionListener(final ExceptionListener exceptionListener) {
-        this.exceptionListener = exceptionListener;
-        return this;
-    }
-
-    /**
      * Assigns {@link ConnectionListener} instance.
      */
     public final AsyncRequesterBootstrap setConnectionListener(final ConnectionListener connectionListener) {
@@ -189,20 +178,18 @@ public class AsyncRequesterBootstrap {
                 httpProcessor != null ? httpProcessor : HttpProcessors.client(),
                 h1Config != null ? h1Config : H1Config.DEFAULT,
                 charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT,
+                connStrategy,
+                null,
+                null,
                 connectionListener,
                 streamListener);
         final IOEventHandlerFactory ioEventHandlerFactory = new ClientHttp1IOEventHandlerFactory(
                 streamDuplexerFactory);
-        try {
-            return new HttpAsyncRequester(
-                    ioReactorConfig,
-                    ioEventHandlerFactory,
-                    connPool,
-                    tlsStrategy != null ? tlsStrategy : new BasicClientTlsStrategy(),
-                    exceptionListener);
-        } catch (final IOReactorException ex) {
-            throw new IllegalStateException(ex);
-        }
+        return new HttpAsyncRequester(
+                ioReactorConfig,
+                ioEventHandlerFactory,
+                connPool,
+                tlsStrategy != null ? tlsStrategy : new BasicClientTlsStrategy());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
index 0e0ed5e..bbd1551 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
@@ -27,38 +27,20 @@
 
 package org.apache.hc.core5.http.impl.bootstrap;
 
-import java.net.InetSocketAddress;
-import java.util.Set;
-
 import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.function.Callback;
-import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.reactor.DefaultListeningIOReactor;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
-import org.apache.hc.core5.reactor.IOReactorException;
 import org.apache.hc.core5.reactor.IOSession;
-import org.apache.hc.core5.reactor.ListenerEndpoint;
 
-public class AsyncServer extends IOReactorExecutor<DefaultListeningIOReactor> {
+public class AsyncServer extends DefaultListeningIOReactor {
 
     public AsyncServer(
             final IOEventHandlerFactory eventHandlerFactory,
             final IOReactorConfig ioReactorConfig,
-            final ExceptionListener exceptionListener,
-            final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
-        super(new DefaultListeningIOReactor(
-                    eventHandlerFactory, ioReactorConfig, new DefaultThreadFactory("server-dispatch", true), null, sessionShutdownCallback),
-                exceptionListener,
-                new DefaultThreadFactory("listener", true));
-    }
-
-    public ListenerEndpoint listen(final InetSocketAddress address) {
-        return reactor().listen(address);
-    }
-
-    public Set<ListenerEndpoint> getEndpoints() {
-        return reactor().getEndpoints();
+            final Callback<IOSession> sessionShutdownCallback) {
+        super(eventHandlerFactory, ioReactorConfig, new DefaultThreadFactory("server-dispatch", true), sessionShutdownCallback);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServerBootstrap.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServerBootstrap.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServerBootstrap.java
index 7cf6cd9..213fb73 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServerBootstrap.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServerBootstrap.java
@@ -31,7 +31,6 @@ import java.util.List;
 
 import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.ConnectionReuseStrategy;
-import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.config.CharCodingConfig;
 import org.apache.hc.core5.http.config.H1Config;
 import org.apache.hc.core5.http.impl.ConnectionListener;
@@ -53,7 +52,6 @@ import org.apache.hc.core5.http.protocol.HttpProcessor;
 import org.apache.hc.core5.net.InetAddressUtils;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
-import org.apache.hc.core5.reactor.IOReactorException;
 import org.apache.hc.core5.util.Args;
 
 /**
@@ -69,7 +67,6 @@ public class AsyncServerBootstrap {
     private HttpProcessor httpProcessor;
     private ConnectionReuseStrategy connStrategy;
     private TlsStrategy tlsStrategy;
-    private ExceptionListener exceptionListener;
     private ConnectionListener connectionListener;
     private Http1StreamListener streamListener;
 
@@ -140,14 +137,6 @@ public class AsyncServerBootstrap {
     }
 
     /**
-     * Assigns {@link ExceptionListener} instance.
-     */
-    public final AsyncServerBootstrap setExceptionListener(final ExceptionListener exceptionListener) {
-        this.exceptionListener = exceptionListener;
-        return this;
-    }
-
-    /**
      * Assigns {@link ConnectionListener} instance.
      */
     public final AsyncServerBootstrap setConnectionListener(final ConnectionListener connectionListener) {
@@ -232,14 +221,7 @@ public class AsyncServerBootstrap {
         final IOEventHandlerFactory ioEventHandlerFactory = new ServerHttp1IOEventHandlerFactory(
                 streamHandlerFactory,
                 tlsStrategy != null ? tlsStrategy : new BasicServerTlsStrategy(new int[] {443, 8443}));
-        try {
-            return new HttpAsyncServer(
-                    ioEventHandlerFactory,
-                    ioReactorConfig,
-                    exceptionListener);
-        } catch (final IOReactorException ex) {
-            throw new IllegalStateException(ex);
-        }
+        return new HttpAsyncServer(ioEventHandlerFactory, ioReactorConfig);
     }
 
     private static class HandlerEntry {

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
index dd4635d..de2c2ce 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
@@ -39,7 +39,6 @@ import org.apache.hc.core5.concurrent.ComplexFuture;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpHost;
@@ -66,7 +65,6 @@ import org.apache.hc.core5.pool.ControlledConnPool;
 import org.apache.hc.core5.pool.PoolEntry;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
-import org.apache.hc.core5.reactor.IOReactorException;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.reactor.SessionRequest;
 import org.apache.hc.core5.reactor.SessionRequestCallback;
@@ -86,9 +84,8 @@ public class HttpAsyncRequester extends AsyncRequester {
             final IOReactorConfig ioReactorConfig,
             final IOEventHandlerFactory eventHandlerFactory,
             final ControlledConnPool<HttpHost, IOSession> connPool,
-            final TlsStrategy tlsStrategy,
-            final ExceptionListener exceptionListener) throws IOReactorException {
-        super(eventHandlerFactory, ioReactorConfig, exceptionListener, new Callback<IOSession>() {
+            final TlsStrategy tlsStrategy) {
+        super(eventHandlerFactory, ioReactorConfig, new Callback<IOSession>() {
 
             @Override
             public void execute(final IOSession session) {
@@ -100,10 +97,6 @@ public class HttpAsyncRequester extends AsyncRequester {
         this.tlsStrategy = tlsStrategy;
     }
 
-    public void start() throws IOException {
-        execute();
-    }
-
     public Future<AsyncClientEndpoint> connect(
             final HttpHost host,
             final TimeValue timeout,

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java
index 81e8c4a..2475d59 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java
@@ -26,15 +26,11 @@
  */
 package org.apache.hc.core5.http.impl.bootstrap;
 
-import java.io.IOException;
-
 import org.apache.hc.core5.function.Callback;
-import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
 import org.apache.hc.core5.io.ShutdownType;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
-import org.apache.hc.core5.reactor.IOReactorException;
 import org.apache.hc.core5.reactor.IOSession;
 
 /**
@@ -44,9 +40,8 @@ public class HttpAsyncServer extends AsyncServer {
 
     public HttpAsyncServer(
             final IOEventHandlerFactory eventHandlerFactory,
-            final IOReactorConfig ioReactorConfig,
-            final ExceptionListener exceptionListener) throws IOReactorException {
-        super(eventHandlerFactory, ioReactorConfig, exceptionListener, new Callback<IOSession>() {
+            final IOReactorConfig ioReactorConfig) {
+        super(eventHandlerFactory, ioReactorConfig, new Callback<IOSession>() {
 
             @Override
             public void execute(final IOSession session) {
@@ -56,8 +51,4 @@ public class HttpAsyncServer extends AsyncServer {
         });
     }
 
-    public void start() throws IOException {
-        execute();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/IOReactorExecutor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/IOReactorExecutor.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/IOReactorExecutor.java
deleted file mode 100644
index 2b58ec5..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/IOReactorExecutor.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.http.impl.bootstrap;
-
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hc.core5.http.ExceptionListener;
-import org.apache.hc.core5.io.GracefullyCloseable;
-import org.apache.hc.core5.io.ShutdownType;
-import org.apache.hc.core5.reactor.AbstractMultiworkerIOReactor;
-import org.apache.hc.core5.reactor.ExceptionEvent;
-import org.apache.hc.core5.reactor.IOReactorStatus;
-import org.apache.hc.core5.util.TimeValue;
-
-abstract class IOReactorExecutor<T extends AbstractMultiworkerIOReactor> implements GracefullyCloseable {
-
-    enum Status { READY, RUNNING, TERMINATED }
-
-    private final T ioReactor;
-    private final ExceptionListener exceptionListener;
-    private final ExecutorService executorService;
-    private final AtomicReference<Status> status;
-
-    IOReactorExecutor(
-            final T ioReactor,
-            final ExceptionListener exceptionListener,
-            final ThreadFactory threadFactory) {
-        super();
-        this.ioReactor = ioReactor;
-        this.exceptionListener = exceptionListener;
-        this.executorService = Executors.newSingleThreadExecutor(threadFactory);
-        this.status = new AtomicReference<>(Status.READY);
-    }
-
-    protected void execute() {
-        if (status.compareAndSet(Status.READY, Status.RUNNING)) {
-            executorService.execute(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        ioReactor.execute();
-                    } catch (final Exception ex) {
-                        if (exceptionListener != null) {
-                            exceptionListener.onError(ex);
-                        }
-                    }
-                }
-            });
-        }
-    }
-
-    T reactor() {
-        return ioReactor;
-    }
-
-    public IOReactorStatus getStatus() {
-        return ioReactor.getStatus();
-    }
-
-    public List<ExceptionEvent> getAuditLog() {
-        return ioReactor.getAuditLog();
-    }
-
-    public void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
-        ioReactor.awaitShutdown(waitTime);
-    }
-
-    public void initiateShutdown() {
-        ioReactor.initiateShutdown();
-    }
-
-    @Override
-    public void shutdown(final ShutdownType shutdownType) {
-        ioReactor.shutdown(shutdownType);
-    }
-
-    @Override
-    public void close() {
-        shutdown(ShutdownType.GRACEFUL);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
deleted file mode 100644
index 76c4542..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.reactor;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.Socket;
-import java.nio.channels.Channel;
-import java.nio.channels.ClosedSelectorException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hc.core5.concurrent.DefaultThreadFactory;
-import org.apache.hc.core5.function.Callback;
-import org.apache.hc.core5.io.ShutdownType;
-import org.apache.hc.core5.util.Args;
-import org.apache.hc.core5.util.TimeValue;
-
-/**
- * Generic implementation of {@link IOReactor} that can run multiple
- * {@link IOReactorImpl} instance in separate worker threads and distribute
- * newly created I/O session equally across those I/O reactors for a more
- * optimal resource utilization and a better I/O performance. Usually it is
- * recommended to have one worker I/O reactor per physical CPU core.
- * <p>
- * <strong>Important note about exception handling</strong>
- * <p>
- * Protocol specific exceptions as well as those I/O exceptions thrown in the
- * course of interaction with the session's channel are to be expected are to be
- * dealt with by specific protocol handlers. These exceptions may result in
- * termination of an individual session but should not affect the I/O reactor
- * and all other active sessions. There are situations, however, when the I/O
- * reactor itself encounters an internal problem such as an I/O exception in
- * the underlying NIO classes or an unhandled runtime exception. Those types of
- * exceptions are usually fatal and will cause the I/O reactor to shut down
- * automatically.
- * <p>
- * There is a possibility to override this behavior and prevent I/O reactors
- * from shutting down automatically in case of a runtime exception or an I/O
- * exception in internal classes. This can be accomplished by providing a custom
- * implementation of the {@link IOReactorExceptionHandler} interface.
- * <p>
- * If an I/O reactor is unable to automatically recover from an I/O or a runtime
- * exception it will enter the shutdown mode. First off, it cancel all pending
- * new session requests. Then it will attempt to close all active I/O sessions
- * gracefully giving them some time to flush pending output data and terminate
- * cleanly. Lastly, it will forcibly shut down those I/O sessions that still
- * remain active after the grace period. This is a fairly complex process, where
- * many things can fail at the same time and many different exceptions can be
- * thrown in the course of the shutdown process. The I/O reactor will record all
- * exceptions thrown during the shutdown process, including the original one
- * that actually caused the shutdown in the first place, in an audit log. One
- * can obtain the audit log using {@link #getAuditLog()}, examine exceptions
- * thrown by the I/O reactor prior and in the course of the reactor shutdown
- * and decide whether it is safe to restart the I/O reactor.
- *
- * @since 4.0
- */
-public abstract class AbstractMultiworkerIOReactor implements IOReactor {
-
-    private final IOReactorConfig reactorConfig;
-    private final Selector selector;
-    private final int workerCount;
-    private final IOEventHandlerFactory eventHandlerFactory;
-    private final ThreadFactory threadFactory;
-    private final Callback<IOSession> sessionShutdownCallback;
-    private final IOReactorImpl[] dispatchers;
-    private final IODispatchWorker[] workers;
-    private final Thread[] threads;
-    private final List<ExceptionEvent> auditLog;
-    private final AtomicReference<IOReactorStatus> status;
-    private final Object shutdownMutex;
-
-    private int currentWorker = 0;
-
-    /**
-     * Creates an instance of AbstractMultiworkerIOReactor with the given configuration.
-     *
-     * @param eventHandlerFactory the factory to create I/O event handlers.
-     * @param reactorConfig I/O reactor configuration.
-     * @param threadFactory the factory to create threads.
-     *   Can be {@code null}.
-     * @throws IOReactorException in case if a non-recoverable I/O error.
-     *
-     * @since 5.0
-     */
-    public AbstractMultiworkerIOReactor(
-            final IOEventHandlerFactory eventHandlerFactory,
-            final IOReactorConfig reactorConfig,
-            final ThreadFactory threadFactory,
-            final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
-        super();
-        this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
-        this.reactorConfig = reactorConfig != null ? reactorConfig : IOReactorConfig.DEFAULT;
-        try {
-            this.selector = Selector.open();
-        } catch (final IOException ex) {
-            throw new IOReactorException("Failure opening selector", ex);
-        }
-        this.threadFactory = threadFactory != null ? threadFactory : new DefaultThreadFactory("I/O dispatcher", true);
-        this.sessionShutdownCallback = sessionShutdownCallback;
-        this.auditLog = new ArrayList<>();
-        this.workerCount = this.reactorConfig.getIoThreadCount();
-        this.dispatchers = new IOReactorImpl[workerCount];
-        this.workers = new IODispatchWorker[workerCount];
-        this.threads = new Thread[workerCount];
-        this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
-        this.shutdownMutex = new Object();
-    }
-
-    public AbstractMultiworkerIOReactor(
-            final IOEventHandlerFactory eventHandlerFactory,
-            final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
-        this(eventHandlerFactory, null, null, sessionShutdownCallback);
-    }
-
-    Selector selector() {
-        return selector;
-    }
-
-    @Override
-    public IOReactorStatus getStatus() {
-        return this.status.get();
-    }
-
-    /**
-     * Returns the audit log containing exceptions thrown by the I/O reactor
-     * prior and in the course of the reactor shutdown.
-     *
-     * @return audit log.
-     */
-    public List<ExceptionEvent> getAuditLog() {
-        synchronized (this.auditLog) {
-            return new ArrayList<>(this.auditLog);
-        }
-    }
-
-    /**
-     * Adds the given {@link Throwable} object with the given time stamp
-     * to the audit log.
-     *
-     * @param ex the exception thrown by the I/O reactor.
-     * @param timestamp the time stamp of the exception. Can be
-     * {@code null} in which case the current date / time will be used.
-     */
-    void addExceptionEvent(final Throwable ex, final Date timestamp) {
-        if (ex == null) {
-            return;
-        }
-        synchronized (this.auditLog) {
-            this.auditLog.add(new ExceptionEvent(ex, timestamp != null ? timestamp : new Date()));
-        }
-    }
-
-    /**
-     * Adds the given {@link Throwable} object to the audit log.
-     *
-     * @param ex the exception thrown by the I/O reactor.
-     */
-    void addExceptionEvent(final Throwable ex) {
-        addExceptionEvent(ex, null);
-    }
-
-    /**
-     * Triggered to process I/O events registered by the main {@link Selector}.
-     * <p>
-     * Super-classes can implement this method to react to the event.
-     *
-     * @param count event count.
-     * @throws IOReactorException in case if a non-recoverable I/O error.
-     */
-    protected abstract void processEvents(int count) throws IOReactorException;
-
-    /**
-     * Triggered to cancel pending session requests.
-     * <p>
-     * Super-classes can implement this method to react to the event.
-     */
-    protected abstract void cancelRequests();
-
-    /**
-     * Activates the main I/O reactor as well as all worker I/O reactors.
-     * The I/O main reactor will start reacting to I/O events and triggering
-     * notification methods. The worker I/O reactor in their turn will start
-     * reacting to I/O events and dispatch I/O event notifications to the
-     * {@link IOEventHandler} associated with the given I/O session.
-     * <p>
-     * This method will enter the infinite I/O select loop on
-     * the {@link Selector} instance associated with this I/O reactor and used
-     * to manage creation of new I/O channels. Once a new I/O channel has been
-     * created the processing of I/O events on that channel will be delegated
-     * to one of the worker I/O reactors.
-     * <p>
-     * The method will remain blocked unto the I/O reactor is shut down or the
-     * execution thread is interrupted.
-     *
-     * @see #processEvents(int)
-     * @see #cancelRequests()
-     *
-     * @throws InterruptedIOException if the dispatch thread is interrupted.
-     * @throws IOReactorException in case if a non-recoverable I/O error.
-     */
-    @Override
-    public void execute() throws InterruptedIOException, IOReactorException {
-        if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE)) {
-            doExecute();
-        }
-    }
-
-    private void doExecute() throws InterruptedIOException, IOReactorException {
-        try {
-            // Start I/O dispatchers
-            for (int i = 0; i < this.dispatchers.length; i++) {
-                final IOReactorImpl dispatcher = new IOReactorImpl(
-                        this.eventHandlerFactory,
-                        this.reactorConfig,
-                        this.sessionShutdownCallback);
-                this.dispatchers[i] = dispatcher;
-            }
-            for (int i = 0; i < this.workerCount; i++) {
-                final IOReactorImpl dispatcher = this.dispatchers[i];
-                this.workers[i] = new IODispatchWorker(dispatcher);
-                this.threads[i] = this.threadFactory.newThread(this.workers[i]);
-            }
-
-            for (int i = 0; i < this.workerCount; i++) {
-                if (this.status.get().compareTo(IOReactorStatus.ACTIVE) != 0)  {
-                    return;
-                }
-                this.threads[i].start();
-            }
-
-            final long selectTimeout = this.reactorConfig.getSelectInterval();
-            while (!Thread.currentThread().isInterrupted()) {
-                if (this.status.get().compareTo(IOReactorStatus.ACTIVE) != 0) {
-                    break;
-                }
-
-                final int readyCount;
-                try {
-                    readyCount = this.selector.select(selectTimeout);
-                } catch (final InterruptedIOException ex) {
-                    throw ex;
-                } catch (final IOException ex) {
-                    throw new IOReactorException("Unexpected selector failure", ex);
-                }
-
-                if (this.status.get().compareTo(IOReactorStatus.ACTIVE) != 0) {
-                    break;
-                }
-
-                processEvents(readyCount);
-
-                // Verify I/O dispatchers
-                for (int i = 0; i < this.workerCount; i++) {
-                    final IODispatchWorker worker = this.workers[i];
-                    final Throwable ex = worker.getThrowable();
-                    if (ex != null) {
-                        throw new IOReactorException("I/O dispatch worker terminated abnormally", ex);
-                    }
-                }
-            }
-
-        } catch (final ClosedSelectorException ex) {
-            addExceptionEvent(ex);
-        } catch (final IOReactorException ex) {
-            if (ex.getCause() != null) {
-                addExceptionEvent(ex.getCause());
-            }
-            throw ex;
-        } finally {
-            try {
-                cancelRequests();
-                closeActiveChannels();
-                try {
-                    this.selector.close();
-                } catch (final IOException ex) {
-                    addExceptionEvent(ex);
-                }
-            } finally {
-                synchronized (this.shutdownMutex) {
-                    this.status.set(IOReactorStatus.SHUT_DOWN);
-                    this.shutdownMutex.notifyAll();
-                }
-            }
-        }
-    }
-
-    private void closeActiveChannels() {
-        for (final SelectionKey key : this.selector.keys()) {
-            try {
-                final Channel channel = key.channel();
-                if (channel != null) {
-                    channel.close();
-                }
-            } catch (final IOException ex) {
-                addExceptionEvent(ex);
-            }
-        }
-    }
-
-    /**
-     * Assigns the given channel entry to one of the worker I/O reactors.
-     *
-     * @param channel the new channel.
-     * @param sessionRequest the session request if applicable.
-     */
-    protected void enqueuePendingSession(final SocketChannel channel, final SessionRequestImpl sessionRequest) {
-        // Distribute new channels among the workers
-        final int i = Math.abs(this.currentWorker++ % this.workerCount);
-        this.dispatchers[i].enqueuePendingSession(channel, sessionRequest);
-    }
-
-    /**
-     * Registers the given channel with the main {@link Selector}.
-     *
-     * @param channel the channel.
-     * @param ops interest ops.
-     * @return  selection key.
-     * @throws ClosedChannelException if the channel has been already closed.
-     */
-    protected SelectionKey registerChannel(
-            final SelectableChannel channel, final int ops) throws ClosedChannelException {
-        return channel.register(this.selector, ops);
-    }
-
-    /**
-     * Prepares the given {@link Socket} by resetting some of its properties.
-     *
-     * @param socket the socket
-     * @throws IOException in case of an I/O error.
-     */
-    protected void prepareSocket(final Socket socket) throws IOException {
-        socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
-        socket.setKeepAlive(this.reactorConfig.isSoKeepalive());
-        final int millis = this.reactorConfig.getSoTimeout().toMillisIntBound();
-        if (millis > 0) {
-            socket.setSoTimeout(millis);
-        }
-        if (this.reactorConfig.getSndBufSize() > 0) {
-            socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
-        }
-        if (this.reactorConfig.getRcvBufSize() > 0) {
-            socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
-        }
-        final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
-        if (linger >= 0) {
-            socket.setSoLinger(true, linger);
-        }
-    }
-
-    @Override
-    public void initiateShutdown() {
-        if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) {
-            selector.wakeup();
-            for (int i = 0; i < this.workerCount; i++) {
-                final IOReactorImpl dispatcher = this.dispatchers[i];
-                if (dispatcher != null) {
-                    dispatcher.initiateShutdown();
-                }
-            }
-        }
-    }
-
-    @Override
-    public void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
-        Args.notNull(waitTime, "Wait time");
-        if (this.status.get() == IOReactorStatus.INACTIVE) {
-            return;
-        }
-        final long deadline = System.currentTimeMillis() + waitTime.toMillis();
-        long remaining = waitTime.toMillis();
-        synchronized (this.shutdownMutex) {
-            while (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
-                this.shutdownMutex.wait(remaining);
-                remaining = deadline - System.currentTimeMillis();
-                if (remaining <= 0) {
-                    return;
-                }
-            }
-        }
-        for (int i = 0; i < this.dispatchers.length; i++) {
-            final IOReactorImpl dispatcher = this.dispatchers[i];
-            if (dispatcher != null) {
-                if (dispatcher.getStatus().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
-                    dispatcher.awaitShutdown(TimeValue.of(remaining, TimeUnit.MILLISECONDS));
-                    remaining = deadline - System.currentTimeMillis();
-                    if (remaining <= 0) {
-                        return;
-                    }
-                }
-            }
-        }
-        for (int i = 0; i < this.threads.length; i++) {
-            final Thread thread = this.threads[i];
-            thread.join(remaining);
-            remaining = deadline - System.currentTimeMillis();
-            if (remaining <= 0) {
-                return;
-            }
-        }
-    }
-
-    private void forceShutdown() {
-        this.status.set(IOReactorStatus.SHUT_DOWN);
-        this.selector.wakeup();
-        for (int i = 0; i < this.dispatchers.length; i++) {
-            final IOReactorImpl dispatcher = this.dispatchers[i];
-            if (dispatcher != null) {
-                dispatcher.forceShutdown();
-            }
-        }
-        for (int i = 0; i < this.threads.length; i++) {
-            final Thread thread = this.threads[i];
-            thread.interrupt();
-        }
-    }
-
-    @Override
-    public void shutdown(final ShutdownType shutdownType) {
-        if (this.status.get() == IOReactorStatus.INACTIVE) {
-            return;
-        }
-        initiateShutdown();
-        try {
-            if (shutdownType == ShutdownType.GRACEFUL) {
-                awaitShutdown(TimeValue.ofSeconds(5));
-            }
-            forceShutdown();
-        } catch (final InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    @Override
-    public void close() {
-        shutdown(ShutdownType.GRACEFUL);
-    }
-
-    static void closeChannel(final Channel channel) {
-        try {
-            channel.close();
-        } catch (final IOException ignore) {
-        }
-    }
-
-    static class IODispatchWorker implements Runnable {
-
-        final IOReactorImpl dispatcher;
-
-        private volatile Throwable throwable;
-
-        public IODispatchWorker(final IOReactorImpl dispatcher) {
-            super();
-            this.dispatcher = dispatcher;
-        }
-
-        @Override
-        public void run() {
-            try {
-                this.dispatcher.execute();
-            } catch (final Error ex) {
-                this.throwable = ex;
-                throw ex;
-            } catch (final Exception ex) {
-                this.throwable = ex;
-            }
-        }
-
-        public Throwable getThrowable() {
-            return this.throwable;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java
new file mode 100644
index 0000000..44c0e38
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java
@@ -0,0 +1,168 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Date;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
+
+abstract class AbstractSingleCoreIOReactor implements IOReactor {
+
+    private final Queue<ExceptionEvent> auditLog;
+    private final AtomicReference<IOReactorStatus> status;
+    private final Object shutdownMutex;
+
+    final Selector selector;
+
+    AbstractSingleCoreIOReactor(final Queue<ExceptionEvent> auditLog) {
+        super();
+        this.auditLog = auditLog;
+        this.shutdownMutex = new Object();
+        this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
+        try {
+            this.selector = Selector.open();
+        } catch (final IOException ex) {
+            throw new IllegalStateException("Unexpected failure opening I/O selector", ex);
+        }
+    }
+
+    @Override
+    public final IOReactorStatus getStatus() {
+        return this.status.get();
+    }
+
+    void addExceptionEvent(final Throwable ex) {
+        this.auditLog.add(new ExceptionEvent(ex, new Date()));
+    }
+
+    abstract void doExecute() throws IOException;
+
+    abstract void doTerminate() throws IOException;
+
+    public void execute() {
+        if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE)) {
+            try {
+                doExecute();
+            } catch (final ClosedSelectorException ignore) {
+            } catch (final Exception ex) {
+                addExceptionEvent(ex);
+            } finally {
+                try {
+                    doTerminate();
+                    final Set<SelectionKey> keys = this.selector.keys();
+                    for (final SelectionKey key : keys) {
+                        final Closeable closeable = (Closeable) key.attachment();
+                        if (closeable != null) {
+                            try {
+                                closeable.close();
+                            } catch (final IOException ex) {
+                                addExceptionEvent(ex);
+                            }
+                        }
+                        key.channel().close();
+                    }
+                    try {
+                        this.selector.close();
+                    } catch (final IOException ex) {
+                        addExceptionEvent(ex);
+                    }
+                } catch (final Exception ex) {
+                    addExceptionEvent(ex);
+                } finally {
+                    this.status.set(IOReactorStatus.SHUT_DOWN);
+                    synchronized (this.shutdownMutex) {
+                        this.shutdownMutex.notifyAll();
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
+        Args.notNull(waitTime, "Wait time");
+        final long deadline = System.currentTimeMillis() + waitTime.toMillis();
+        long remaining = waitTime.toMillis();
+        synchronized (this.shutdownMutex) {
+            while (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
+                this.shutdownMutex.wait(remaining);
+                remaining = deadline - System.currentTimeMillis();
+                if (remaining <= 0) {
+                    return;
+                }
+            }
+        }
+    }
+
+    @Override
+    public final void initiateShutdown() {
+        if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.SHUT_DOWN)) {
+            synchronized (this.shutdownMutex) {
+                this.shutdownMutex.notifyAll();
+            }
+        } else if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) {
+            this.selector.wakeup();
+        }
+    }
+
+    @Override
+    public final void shutdown(final ShutdownType shutdownType) {
+        if (shutdownType == ShutdownType.GRACEFUL) {
+            initiateShutdown();
+            try {
+                awaitShutdown(TimeValue.ofSeconds(5));
+            } catch (final InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        } else {
+            final IOReactorStatus previousStatus = this.status.getAndSet(IOReactorStatus.SHUT_DOWN);
+            if (previousStatus.compareTo(IOReactorStatus.ACTIVE) == 0) {
+                this.selector.wakeup();
+            }
+            synchronized (this.shutdownMutex) {
+                this.shutdownMutex.notifyAll();
+            }
+        }
+    }
+
+    @Override
+    public final void close() {
+        shutdown(ShutdownType.GRACEFUL);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectingIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectingIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectingIOReactor.java
index 30a1c99..e7727eb 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectingIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectingIOReactor.java
@@ -33,6 +33,6 @@ package org.apache.hc.core5.reactor;
  *
  * @since 4.0
  */
-public interface ConnectingIOReactor extends IOReactor, ConnectionInitiator {
+public interface ConnectingIOReactor extends IOReactorService, ConnectionInitiator {
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java
new file mode 100644
index 0000000..43a4f84
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java
@@ -0,0 +1,81 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Set;
+
+/**
+ * Non-blocking connection acceptor.
+ *
+ * @since 5.0
+ */
+public interface ConnectionAcceptor {
+
+    /**
+     * Opens a new listener endpoint with the given socket address. Once
+     * the endpoint is fully initialized it starts accepting incoming
+     * connections and propagates I/O activity notifications to the I/O event
+     * dispatcher.
+     * <p>
+     * {@link ListenerEndpoint#waitFor()} can be used to wait for the
+     *  listener to be come ready to accept incoming connections.
+     * <p>
+     * {@link ListenerEndpoint#close()} can be used to shut down
+     * the listener even before it is fully initialized.
+     *
+     * @param address the socket address to listen on.
+     * @return listener endpoint.
+     */
+    ListenerEndpoint listen(SocketAddress address);
+
+    /**
+     * Suspends the I/O reactor preventing it from accepting new connections on
+     * all active endpoints.
+     *
+     * @throws IOException in case of an I/O error.
+     */
+    void pause() throws IOException;
+
+    /**
+     * Resumes the I/O reactor restoring its ability to accept incoming
+     * connections on all active endpoints.
+     *
+     * @throws IOException in case of an I/O error.
+     */
+    void resume() throws IOException;
+
+    /**
+     * Returns a set of endpoints for this I/O reactor.
+     *
+     * @return set of endpoints.
+     */
+    Set<ListenerEndpoint> getEndpoints();
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionInitiator.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionInitiator.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionInitiator.java
index 98fd8fe..3dd00a4 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionInitiator.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionInitiator.java
@@ -34,7 +34,7 @@ import org.apache.hc.core5.net.NamedEndpoint;
 /**
  * Non-blocking connection initiator.
  *
- * @since 4.0
+ * @since 5.0
  */
 public interface ConnectionInitiator {
 


Mime
View raw message