hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject [2/7] httpcomponents-core git commit: Rewrite of I/O reactor internal channel management; more efficient handling of outgoing connection requests
Date Thu, 15 Jun 2017 05:59:33 GMT
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
index bd914ae..d863d54 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
@@ -28,240 +28,118 @@
 package org.apache.hc.core5.reactor;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.Deque;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.io.ShutdownType;
 import org.apache.hc.core5.net.NamedEndpoint;
 import org.apache.hc.core5.util.Args;
-import org.apache.hc.core5.util.Asserts;
+import org.apache.hc.core5.util.TimeValue;
 
 /**
- * Default implementation of {@link ConnectingIOReactor}. This class extends
- * {@link AbstractMultiworkerIOReactor} with capability to connect to remote
- * hosts.
+ * Default implementation of {@link ConnectingIOReactor}.
  *
  * @since 4.0
  */
-public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
-        implements ConnectingIOReactor {
+public class DefaultConnectingIOReactor implements ConnectingIOReactor {
 
-    private final IOReactorConfig reactorConfig;
-    private final Queue<SessionRequestImpl> requestQueue;
-    private final long selectInterval;
-    private long lastTimeoutCheck;
+    private final int workerCount;
+    private final SingleCoreIOReactor[] dispatchers;
+    private final MultiCoreIOReactor ioReactor;
+    private final AtomicInteger currentWorker;
+
+    private final static ThreadFactory THREAD_FACTORY = new DefaultThreadFactory("I/O dispatch", true);
 
     public DefaultConnectingIOReactor(
             final IOEventHandlerFactory eventHandlerFactory,
             final IOReactorConfig ioReactorConfig,
             final ThreadFactory threadFactory,
-            final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
-        super(eventHandlerFactory, ioReactorConfig, threadFactory, sessionShutdownCallback);
-        this.reactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
-        this.requestQueue = new ConcurrentLinkedQueue<>();
-        this.selectInterval = this.reactorConfig.getSelectInterval();
-        this.lastTimeoutCheck = System.currentTimeMillis();
+            final Callback<IOSession> sessionShutdownCallback) {
+        Args.notNull(eventHandlerFactory, "Event handler factory");
+        final Deque<ExceptionEvent> auditLog = new ConcurrentLinkedDeque<>();
+        this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount();
+        this.dispatchers = new SingleCoreIOReactor[workerCount];
+        final Thread[] threads = new Thread[workerCount];
+        for (int i = 0; i < this.dispatchers.length; i++) {
+            final SingleCoreIOReactor dispatcher = new SingleCoreIOReactor(
+                    auditLog,
+                    eventHandlerFactory,
+                    ioReactorConfig,
+                    sessionShutdownCallback);
+            this.dispatchers[i] = dispatcher;
+            threads[i] = (threadFactory != null ? threadFactory : THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher));
+        }
+        this.ioReactor = new MultiCoreIOReactor(this.dispatchers, threads);
+        this.currentWorker = new AtomicInteger(0);
     }
 
     public DefaultConnectingIOReactor(
             final IOEventHandlerFactory eventHandlerFactory,
             final IOReactorConfig config,
-            final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
+            final Callback<IOSession> sessionShutdownCallback) {
         this(eventHandlerFactory, config, null, sessionShutdownCallback);
     }
 
     /**
      * Creates an instance of DefaultConnectingIOReactor with default configuration.
      *
-     * @throws IOReactorException in case if a non-recoverable I/O error.
-     *
      * @since 5.0
      */
-    public DefaultConnectingIOReactor(
-            final IOEventHandlerFactory eventHandlerFactory) throws IOReactorException {
+    public DefaultConnectingIOReactor(final IOEventHandlerFactory eventHandlerFactory) {
         this(eventHandlerFactory, null, null);
     }
 
-    @Override
-    protected void cancelRequests() {
-        SessionRequestImpl request;
-        while ((request = this.requestQueue.poll()) != null) {
-            request.cancel();
-        }
+    public void start() {
+        ioReactor.start();
     }
 
     @Override
-    protected void processEvents(final int readyCount) throws IOReactorException {
-        processSessionRequests();
-
-        if (readyCount > 0) {
-            final Set<SelectionKey> selectedKeys = selector().selectedKeys();
-            for (final SelectionKey key : selectedKeys) {
-
-                processEvent(key);
-
-            }
-            selectedKeys.clear();
-        }
-
-        final long currentTime = System.currentTimeMillis();
-        if ((currentTime - this.lastTimeoutCheck) >= this.selectInterval) {
-            this.lastTimeoutCheck = currentTime;
-            final Set<SelectionKey> keys = selector().keys();
-            processTimeouts(keys);
-        }
-    }
-
-    private void processEvent(final SelectionKey key) {
-        try {
-
-            if (key.isConnectable()) {
-
-                final SocketChannel socketChannel = (SocketChannel) key.channel();
-                // Get request handle
-                final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
-                final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
-
-                // Finish connection process
-                try {
-                    socketChannel.finishConnect();
-                } catch (final IOException ex) {
-                    sessionRequest.failed(ex);
-                }
-                key.cancel();
-                key.attach(null);
-                if (!sessionRequest.isCompleted()) {
-                    enqueuePendingSession(socketChannel, sessionRequest);
-                } else {
-                    try {
-                        socketChannel.close();
-                    } catch (final IOException ignore) {
-                    }
-                }
-            }
-
-        } catch (final CancelledKeyException ex) {
-            final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
-            key.attach(null);
-            if (requestHandle != null) {
-                final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
-                if (sessionRequest != null) {
-                    sessionRequest.cancel();
-                }
-            }
-        }
+    public IOReactorStatus getStatus() {
+        return ioReactor.getStatus();
     }
 
-    private void processTimeouts(final Set<SelectionKey> keys) {
-        final long now = System.currentTimeMillis();
-        for (final SelectionKey key : keys) {
-            final Object attachment = key.attachment();
-
-            if (attachment instanceof SessionRequestHandle) {
-                final SessionRequestHandle handle = (SessionRequestHandle) key.attachment();
-                final SessionRequestImpl sessionRequest = handle.getSessionRequest();
-                final int timeout = sessionRequest.getConnectTimeout();
-                if (timeout > 0) {
-                    if (handle.getRequestTime() + timeout < now) {
-                        sessionRequest.timeout();
-                    }
-                }
-            }
-
-        }
-    }
-
-    @Override
     public SessionRequest connect(
             final NamedEndpoint remoteEndpoint,
             final SocketAddress remoteAddress,
             final SocketAddress localAddress,
             final Object attachment,
-            final SessionRequestCallback callback) {
+            final SessionRequestCallback callback) throws IOReactorShutdownException {
         Args.notNull(remoteEndpoint, "Remote endpoint");
-        final IOReactorStatus status = getStatus();
-        Asserts.check(status == IOReactorStatus.INACTIVE || status == IOReactorStatus.ACTIVE, "I/O reactor has been shut down");
-        final SessionRequestImpl sessionRequest = new SessionRequestImpl(
-                remoteEndpoint,
-                remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(), remoteEndpoint.getPort()),
-                localAddress,
-                attachment,
-                callback);
-
-        this.requestQueue.add(sessionRequest);
-        selector().wakeup();
-
-        return sessionRequest;
-    }
-
-    private void validateAddress(final SocketAddress address) throws UnknownHostException {
-        if (address == null) {
-            return;
+        if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
+            throw new IOReactorShutdownException("I/O reactor has been shut down");
         }
-        if (address instanceof InetSocketAddress) {
-            final InetSocketAddress endpoint = (InetSocketAddress) address;
-            if (endpoint.isUnresolved()) {
-                throw new UnknownHostException(endpoint.getHostName());
-            }
+        final int i = Math.abs(currentWorker.incrementAndGet() % workerCount);
+        try {
+            return dispatchers[i].connect(remoteEndpoint, remoteAddress, localAddress, attachment, callback);
+        } catch (final IOReactorShutdownException ex) {
+            initiateShutdown();
+            throw ex;
         }
     }
 
-    private void processSessionRequests() throws IOReactorException {
-        SessionRequestImpl request;
-        while ((request = this.requestQueue.poll()) != null) {
-            if (request.isCompleted()) {
-                continue;
-            }
-            final SocketChannel socketChannel;
-            try {
-                socketChannel = SocketChannel.open();
-            } catch (final IOException ex) {
-                request.failed(ex);
-                return;
-            }
-            try {
-                validateAddress(request.getLocalAddress());
-                validateAddress(request.getRemoteAddress());
+    @Override
+    public void initiateShutdown() {
+        ioReactor.initiateShutdown();
+    }
 
-                socketChannel.configureBlocking(false);
-                prepareSocket(socketChannel.socket());
+    @Override
+    public void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
+        ioReactor.awaitShutdown(waitTime);
+    }
 
-                if (request.getLocalAddress() != null) {
-                    final Socket sock = socketChannel.socket();
-                    sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
-                    sock.bind(request.getLocalAddress());
-                }
-                final boolean connected = socketChannel.connect(request.getRemoteAddress());
-                if (connected) {
-                    enqueuePendingSession(socketChannel, request);
-                    continue;
-                }
-            } catch (final IOException ex) {
-                closeChannel(socketChannel);
-                request.failed(ex);
-                return;
-            }
+    @Override
+    public void shutdown(final ShutdownType shutdownType) {
+        ioReactor.shutdown(shutdownType);
+    }
 
-            final SessionRequestHandle requestHandle = new SessionRequestHandle(request);
-            try {
-                final SelectionKey key = socketChannel.register(selector(), SelectionKey.OP_CONNECT,
-                        requestHandle);
-                request.setKey(key);
-            } catch (final IOException ex) {
-                closeChannel(socketChannel);
-                throw new IOReactorException("Failure registering channel " +
-                        "with the selector", ex);
-            }
-        }
+    @Override
+    public void close() throws IOException {
+        ioReactor.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
index 1276435..05421b2 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
@@ -28,40 +28,36 @@
 package org.apache.hc.core5.reactor;
 
 import java.io.IOException;
-import java.net.ServerSocket;
 import java.net.SocketAddress;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Queue;
+import java.util.Deque;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.function.Callback;
-import org.apache.hc.core5.util.Asserts;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.net.NamedEndpoint;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
 
 /**
- * Default implementation of {@link ListeningIOReactor}. This class extends
- * {@link AbstractMultiworkerIOReactor} with capability to listen for incoming
- * connections.
+ * Default implementation of {@link ListeningIOReactor}.
  *
  * @since 4.0
  */
-public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
-        implements ListeningIOReactor {
+public class DefaultListeningIOReactor implements ListeningIOReactor {
 
-    private final IOReactorConfig reactorConfig;
-    private final IOReactorExceptionHandler exceptionHandler;
-    private final Queue<ListenerEndpointImpl> requestQueue;
-    private final Set<ListenerEndpointImpl> endpoints;
-    private final Set<SocketAddress> pausedEndpoints;
+    private final static ThreadFactory DISPATCH_THREAD_FACTORY = new DefaultThreadFactory("I/O dispatch", true);
+    private final static ThreadFactory LISTENER_THREAD_FACTORY = new DefaultThreadFactory("I/O listener", true);
 
-    private volatile boolean paused;
+    private final int workerCount;
+    private final SingleCoreIOReactor[] dispatchers;
+    private final SingleCoreListeningIOReactor listener;
+    private final MultiCoreIOReactor ioReactor;
+    private final AtomicInteger currentWorker;
 
     /**
      * Creates an instance of DefaultListeningIOReactor with the given configuration.
@@ -70,7 +66,6 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
      * @param ioReactorConfig I/O reactor configuration.
      * @param threadFactory the factory to create threads.
      *   Can be {@code null}.
-     * @throws IOReactorException in case if a non-recoverable I/O error.
      *
      * @since 5.0
      */
@@ -78,14 +73,36 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
             final IOEventHandlerFactory eventHandlerFactory,
             final IOReactorConfig ioReactorConfig,
             final ThreadFactory threadFactory,
-            final IOReactorExceptionHandler exceptionHandler,
-            final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
-        super(eventHandlerFactory, ioReactorConfig, threadFactory, sessionShutdownCallback);
-        this.reactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
-        this.exceptionHandler = exceptionHandler;
-        this.requestQueue = new ConcurrentLinkedQueue<>();
-        this.endpoints = Collections.synchronizedSet(new HashSet<ListenerEndpointImpl>());
-        this.pausedEndpoints = new HashSet<>();
+            final Callback<IOSession> sessionShutdownCallback) {
+        Args.notNull(eventHandlerFactory, "Event handler factory");
+        final Deque<ExceptionEvent> auditLog = new ConcurrentLinkedDeque<>();
+        this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount();
+        this.dispatchers = new SingleCoreIOReactor[workerCount];
+        final Thread[] threads = new Thread[workerCount + 1];
+        for (int i = 0; i < this.dispatchers.length; i++) {
+            final SingleCoreIOReactor dispatcher = new SingleCoreIOReactor(
+                    auditLog,
+                    eventHandlerFactory,
+                    ioReactorConfig,
+                    sessionShutdownCallback);
+            this.dispatchers[i] = dispatcher;
+            threads[i + 1] = (threadFactory != null ? threadFactory : DISPATCH_THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher));
+        }
+        final IOReactor[] ioReactors = new IOReactor[this.workerCount + 1];
+        System.arraycopy(this.dispatchers, 0, ioReactors, 1, this.workerCount);
+        this.listener = new SingleCoreListeningIOReactor(auditLog, ioReactorConfig, new Callback<SocketChannel>() {
+
+            @Override
+            public void execute(final SocketChannel channel) {
+                enqueueChannel(channel);
+            }
+
+        });
+        ioReactors[0] = this.listener;
+        threads[0] = (threadFactory != null ? threadFactory : LISTENER_THREAD_FACTORY).newThread(new IOReactorWorker(listener));
+
+        this.ioReactor = new MultiCoreIOReactor(ioReactors, threads);
+        this.currentWorker = new AtomicInteger(0);
     }
 
     /**
@@ -94,207 +111,103 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
      * @param eventHandlerFactory the factory to create I/O event handlers.
      * @param config I/O reactor configuration.
      *   Can be {@code null}.
-     * @throws IOReactorException in case if a non-recoverable I/O error.
      *
      * @since 5.0
      */
     public DefaultListeningIOReactor(
             final IOEventHandlerFactory eventHandlerFactory,
             final IOReactorConfig config,
-            final IOReactorExceptionHandler exceptionHandler,
-            final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
-        this(eventHandlerFactory, config, null, exceptionHandler, sessionShutdownCallback);
+            final Callback<IOSession> sessionShutdownCallback) {
+        this(eventHandlerFactory, config, null, sessionShutdownCallback);
     }
 
     /**
      * Creates an instance of DefaultListeningIOReactor with default configuration.
      *
      * @param eventHandlerFactory the factory to create I/O event handlers.
-     * @throws IOReactorException in case if a non-recoverable I/O error.
      *
      * @since 5.0
      */
-    public DefaultListeningIOReactor(final IOEventHandlerFactory eventHandlerFactory) throws IOReactorException {
+    public DefaultListeningIOReactor(final IOEventHandlerFactory eventHandlerFactory) {
         this(eventHandlerFactory, null, null, null);
     }
 
     @Override
-    protected void cancelRequests() {
-        ListenerEndpointImpl request;
-        while ((request = this.requestQueue.poll()) != null) {
-            request.cancel();
-        }
+    public void start() {
+        ioReactor.start();
     }
 
     @Override
-    protected void processEvents(final int readyCount) throws IOReactorException {
-        if (!this.paused) {
-            processSessionRequests();
-        }
-
-        if (readyCount > 0) {
-            final Set<SelectionKey> selectedKeys = selector().selectedKeys();
-            for (final SelectionKey key : selectedKeys) {
-
-                processEvent(key);
-
-            }
-            selectedKeys.clear();
-        }
+    public ListenerEndpoint listen(final SocketAddress address) {
+        return listener.listen(address);
     }
 
-    private void processEvent(final SelectionKey key)
-            throws IOReactorException {
-        try {
-
-            if (key.isAcceptable()) {
-
-                final ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
-                for (;;) {
-                    SocketChannel socketChannel = null;
-                    try {
-                        socketChannel = serverChannel.accept();
-                    } catch (final IOException ex) {
-                        if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
-                            throw new IOReactorException("Failure accepting connection", ex);
-                        }
-                    }
-                    if (socketChannel == null) {
-                        break;
-                    }
-                    try {
-                        prepareSocket(socketChannel.socket());
-                    } catch (final IOException ex) {
-                        if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
-                            throw new IOReactorException("Failure initalizing socket", ex);
-                        }
-                    }
-                    enqueuePendingSession(socketChannel, null);
-                }
-            }
-
-        } catch (final CancelledKeyException ex) {
-            final ListenerEndpoint endpoint = (ListenerEndpoint) key.attachment();
-            this.endpoints.remove(endpoint);
-            key.attach(null);
-        }
+    @Override
+    public Set<ListenerEndpoint> getEndpoints() {
+        return listener.getEndpoints();
     }
 
-    private ListenerEndpointImpl createEndpoint(final SocketAddress address) {
-        return new ListenerEndpointImpl(
-                address,
-                new ListenerEndpointClosedCallback() {
-
-                    @Override
-                    public void endpointClosed(final ListenerEndpoint endpoint) {
-                        endpoints.remove(endpoint);
-                    }
+    @Override
+    public void pause() throws IOException {
+        listener.pause();
+    }
 
-                });
+    @Override
+    public void resume() throws IOException {
+        listener.resume();
     }
 
     @Override
-    public ListenerEndpoint listen(final SocketAddress address) {
-        final IOReactorStatus status = getStatus();
-        Asserts.check(status == IOReactorStatus.INACTIVE || status == IOReactorStatus.ACTIVE, "I/O reactor has been shut down");
-        final ListenerEndpointImpl request = createEndpoint(address);
-        this.requestQueue.add(request);
-        selector().wakeup();
-        return request;
+    public IOReactorStatus getStatus() {
+        return ioReactor.getStatus();
     }
 
-    private void processSessionRequests() throws IOReactorException {
-        ListenerEndpointImpl request;
-        while ((request = this.requestQueue.poll()) != null) {
-            final SocketAddress address = request.getAddress();
-            final ServerSocketChannel serverChannel;
-            try {
-                serverChannel = ServerSocketChannel.open();
-            } catch (final IOException ex) {
-                throw new IOReactorException("Failure opening server socket", ex);
-            }
-            try {
-                final ServerSocket socket = serverChannel.socket();
-                socket.setReuseAddress(this.reactorConfig.isSoReuseAddress());
-                final int millis = this.reactorConfig.getSoTimeout().toMillisIntBound();
-                if (millis > 0) {
-                    socket.setSoTimeout(millis);
-                }
-                if (this.reactorConfig.getRcvBufSize() > 0) {
-                    socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
-                }
-                serverChannel.configureBlocking(false);
-                socket.bind(address, this.reactorConfig.getBacklogSize());
-            } catch (final IOException ex) {
-                closeChannel(serverChannel);
-                request.failed(ex);
-                if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
-                    throw new IOReactorException("Failure binding socket to address "
-                            + address, ex);
-                }
-                return;
-            }
-            try {
-                final SelectionKey key = serverChannel.register(selector(), SelectionKey.OP_ACCEPT);
-                key.attach(request);
-                request.setKey(key);
-            } catch (final IOException ex) {
-                closeChannel(serverChannel);
-                throw new IOReactorException("Failure registering channel " +
-                        "with the selector", ex);
-            }
+    private void enqueueChannel(final SocketChannel socketChannel) {
+        final int i = Math.abs(currentWorker.incrementAndGet() % workerCount);
+        try {
+            dispatchers[i].enqueueChannel(socketChannel);
+        } catch (final IOReactorShutdownException ex) {
+            initiateShutdown();
+        }
+    }
 
-            this.endpoints.add(request);
-            request.completed(serverChannel.socket().getLocalSocketAddress());
+    public SessionRequest connect(
+            final NamedEndpoint remoteEndpoint,
+            final SocketAddress remoteAddress,
+            final SocketAddress localAddress,
+            final Object attachment,
+            final SessionRequestCallback callback) throws IOReactorShutdownException {
+        Args.notNull(remoteEndpoint, "Remote endpoint");
+        if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
+            throw new IOReactorShutdownException("I/O reactor has been shut down");
+        }
+        final int i = Math.abs(currentWorker.incrementAndGet() % workerCount);
+        try {
+            return dispatchers[i].connect(remoteEndpoint, remoteAddress, localAddress, attachment, callback);
+        } catch (final IOReactorShutdownException ex) {
+            initiateShutdown();
+            throw ex;
         }
     }
 
     @Override
-    public Set<ListenerEndpoint> getEndpoints() {
-        final Set<ListenerEndpoint> set = new HashSet<>();
-        synchronized (this.endpoints) {
-            final Iterator<ListenerEndpointImpl> it = this.endpoints.iterator();
-            while (it.hasNext()) {
-                final ListenerEndpoint endpoint = it.next();
-                if (!endpoint.isClosed()) {
-                    set.add(endpoint);
-                } else {
-                    it.remove();
-                }
-            }
-        }
-        return set;
+    public void initiateShutdown() {
+        ioReactor.initiateShutdown();
     }
 
     @Override
-    public void pause() throws IOException {
-        if (this.paused) {
-            return;
-        }
-        this.paused = true;
-        synchronized (this.endpoints) {
-            for (final ListenerEndpointImpl endpoint : this.endpoints) {
-                if (!endpoint.isClosed()) {
-                    endpoint.close();
-                    this.pausedEndpoints.add(endpoint.getAddress());
-                }
-            }
-            this.endpoints.clear();
-        }
+    public void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
+        ioReactor.awaitShutdown(waitTime);
     }
 
     @Override
-    public void resume() throws IOException {
-        if (!this.paused) {
-            return;
-        }
-        this.paused = false;
-        for (final SocketAddress address: this.pausedEndpoints) {
-            final ListenerEndpointImpl request = createEndpoint(address);
-            this.requestQueue.add(request);
-        }
-        this.pausedEndpoints.clear();
-        selector().wakeup();
+    public void shutdown(final ShutdownType shutdownType) {
+        ioReactor.shutdown(shutdownType);
+    }
+
+    @Override
+    public void close() throws IOException {
+        ioReactor.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactor.java
index 7b77012..06910ae 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactor.java
@@ -27,8 +27,6 @@
 
 package org.apache.hc.core5.reactor;
 
-import java.io.IOException;
-
 import org.apache.hc.core5.io.GracefullyCloseable;
 import org.apache.hc.core5.util.TimeValue;
 
@@ -59,14 +57,6 @@ public interface IOReactor extends GracefullyCloseable {
     IOReactorStatus getStatus();
 
     /**
-     * Starts the reactor and initiates the dispatch of I/O event to I/O session
-     * event handlers.
-     *
-     * @throws IOException in case of an I/O error.
-     */
-    void execute() throws IOException;
-
-    /**
      * Initiates shutdown of the reactor without blocking. The reactor is expected
      * to terminate all active connections, to shut down itself and to release
      * system resources it currently holds

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorException.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorException.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorException.java
deleted file mode 100644
index 27096a5..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorException.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.reactor;
-
-import java.io.IOException;
-
-/**
- * I/O exception that can be thrown by an I/O reactor. Usually exceptions
- * of this type are fatal and are not recoverable.
- *
- * @since 4.0
- */
-public class IOReactorException extends IOException {
-
-    private static final long serialVersionUID = -4248110651729635749L;
-
-    public IOReactorException(final String message, final Throwable cause) {
-        super(message);
-        if (cause != null) {
-            initCause(cause);
-        }
-    }
-
-    public IOReactorException(final String message) {
-        super(message);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorExceptionHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorExceptionHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorExceptionHandler.java
deleted file mode 100644
index 3327641..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorExceptionHandler.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.reactor;
-
-import java.io.IOException;
-
-/**
- * Abstract exception handler intended to deal with potentially recoverable
- * I/O exceptions thrown by an I/O reactor.
- *
- * @since 4.0
- */
-public interface IOReactorExceptionHandler {
-
-    /**
-     * This method is expected to examine the I/O exception passed as
-     * a parameter and decide whether it is safe to continue execution of
-     * the I/O reactor.
-     *
-     * @param ex potentially recoverable I/O exception
-     * @return {@code true} if it is safe to ignore the exception
-     * and continue execution of the I/O reactor; {@code false} if the
-     * I/O reactor must throw {@link IOReactorException} and terminate
-     */
-    boolean handle(IOException ex);
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
deleted file mode 100644
index e0f0c94..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.reactor;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.ClosedSelectorException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hc.core5.concurrent.Cancellable;
-import org.apache.hc.core5.function.Callback;
-import org.apache.hc.core5.io.ShutdownType;
-import org.apache.hc.core5.util.Args;
-import org.apache.hc.core5.util.TimeValue;
-
-/**
- * {@link IOReactor} implementation.
- *
- * @since 4.0
- */
-class IOReactorImpl implements IOReactor {
-
-    private final IOReactorConfig reactorConfig;
-    private final IOEventHandlerFactory eventHandlerFactory;
-    private final Selector selector;
-    private final Queue<InternalIOSession> closedSessions;
-    private final Queue<PendingSession> pendingSessions;
-    private final AtomicReference<IOReactorStatus> status;
-    private final AtomicBoolean shutdownInitiated;
-    private final Object shutdownMutex;
-    private final Callback<IOSession> sessionShutdownCallback;
-
-    private volatile long lastTimeoutCheck;
-
-    IOReactorImpl(
-            final IOEventHandlerFactory eventHandlerFactory,
-            final IOReactorConfig reactorConfig,
-            final Callback<IOSession> sessionShutdownCallback) {
-        super();
-        this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
-        this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
-        this.sessionShutdownCallback = sessionShutdownCallback;
-        this.shutdownInitiated = new AtomicBoolean(false);
-        this.closedSessions = new ConcurrentLinkedQueue<>();
-        this.pendingSessions = new ConcurrentLinkedQueue<>();
-        try {
-            this.selector = Selector.open();
-        } catch (final IOException ex) {
-            throw new IllegalStateException("Unexpected failure opening I/O selector", ex);
-        }
-        this.shutdownMutex = new Object();
-        this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
-    }
-
-    @Override
-    public IOReactorStatus getStatus() {
-        return this.status.get();
-    }
-
-    private void closeQuietly(final Closeable closeable) {
-        if (closeable != null) {
-            try {
-                closeable.close();
-            } catch (final IOException ignore) {
-            }
-        }
-    }
-
-    private void cancelQuietly(final Cancellable cancellable) {
-        if (cancellable != null) {
-            cancellable.cancel();
-        }
-    }
-
-    void enqueuePendingSession(final SocketChannel socketChannel, final SessionRequestImpl sessionRequest) {
-        Args.notNull(socketChannel, "SocketChannel");
-        this.pendingSessions.add(new PendingSession(socketChannel, sessionRequest));
-        this.selector.wakeup();
-    }
-
-    /**
-     * Activates the I/O reactor. The I/O reactor will start reacting to I/O
-     * events and and dispatch I/O event notifications to the {@link IOEventHandler}
-     * associated with the given I/O session.
-     * <p>
-     * This method will enter the infinite I/O select loop on
-     * the {@link Selector} instance associated with this I/O reactor.
-     * <p>
-     * The method will remain blocked unto the I/O reactor is shut down or the
-     * execution thread is interrupted.
-     *
-     * @throws InterruptedIOException if the dispatch thread is interrupted.
-     * @throws IOReactorException in case if a non-recoverable I/O error.
-     */
-
-    @Override
-    public void execute() throws InterruptedIOException, IOReactorException {
-        if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE)) {
-            doExecute();
-        }
-    }
-
-    private void doExecute() throws InterruptedIOException, IOReactorException {
-
-        final long selectTimeout = this.reactorConfig.getSelectInterval();
-        try {
-            while (!Thread.currentThread().isInterrupted()) {
-
-                final int readyCount;
-                try {
-                    readyCount = this.selector.select(selectTimeout);
-                } catch (final InterruptedIOException ex) {
-                    throw ex;
-                } catch (final IOException ex) {
-                    throw new IOReactorException("Unexpected selector failure", ex);
-                }
-
-                if (this.status.get().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
-                    if (this.shutdownInitiated.compareAndSet(false, true)) {
-                        initiateSessionShutdown();
-                    }
-                    closePendingSessions();
-                }
-                if (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) == 0) {
-                    break;
-                }
-
-                // Process selected I/O events
-                if (readyCount > 0) {
-                    processEvents(this.selector.selectedKeys());
-                }
-
-                validateActiveChannels();
-
-                // Process closed sessions
-                processClosedSessions();
-
-                // If active process new channels
-                if (this.status.get().compareTo(IOReactorStatus.ACTIVE) == 0) {
-                    processPendingSessions();
-                }
-
-                // Exit select loop if graceful shutdown has been completed
-                if (this.status.get().compareTo(IOReactorStatus.SHUTTING_DOWN) == 0 && this.selector.keys().isEmpty()) {
-                    break;
-                }
-                if (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) == 0) {
-                    break;
-                }
-            }
-
-        } catch (final ClosedSelectorException ignore) {
-        } finally {
-            try {
-                closePendingSessions();
-                closeActiveChannels();
-                processClosedSessions();
-            } finally {
-                this.status.set(IOReactorStatus.SHUT_DOWN);
-                synchronized (this.shutdownMutex) {
-                    this.shutdownMutex.notifyAll();
-                }
-            }
-        }
-    }
-
-    private void initiateSessionShutdown() {
-        if (this.sessionShutdownCallback != null) {
-            final Set<SelectionKey> keys = this.selector.keys();
-            for (final SelectionKey key : keys) {
-                final InternalIOSession session = (InternalIOSession) key.attachment();
-                if (session != null) {
-                    this.sessionShutdownCallback.execute(session);
-                }
-            }
-        }
-    }
-
-    private void validateActiveChannels() {
-        final long currentTime = System.currentTimeMillis();
-        if( (currentTime - this.lastTimeoutCheck) >= this.reactorConfig.getSelectInterval()) {
-            this.lastTimeoutCheck = currentTime;
-            for (final SelectionKey key : this.selector.keys()) {
-                timeoutCheck(key, currentTime);
-            }
-        }
-    }
-
-    private void processEvents(final Set<SelectionKey> selectedKeys) {
-        for (final SelectionKey key : selectedKeys) {
-            processEvent(key);
-        }
-        selectedKeys.clear();
-    }
-
-    private void processEvent(final SelectionKey key) {
-        final InternalIOSession session = (InternalIOSession) key.attachment();
-        try {
-            if (key.isReadable()) {
-                session.updateAccessTime();
-                session.onInputReady();
-            }
-            if (key.isWritable()) {
-                session.updateAccessTime();
-                session.onOutputReady();
-            }
-        } catch (final CancelledKeyException ex) {
-            session.shutdown(ShutdownType.GRACEFUL);
-        } catch (final IOException ex) {
-            session.onException(ex);
-        } catch (final RuntimeException ex) {
-            session.onException(ex);
-            throw ex;
-        }
-    }
-
-    private void processPendingSessions() throws IOReactorException {
-        PendingSession pendingSession;
-        while ((pendingSession = this.pendingSessions.poll()) != null) {
-            final InternalIOSession session;
-            try {
-                final SocketChannel socketChannel = pendingSession.socketChannel;
-                final SessionRequestImpl sessionRequest = pendingSession.sessionRequest;
-                socketChannel.configureBlocking(false);
-                final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_READ);
-                session = new InternalIOSession(
-                        sessionRequest != null ?  sessionRequest.getRemoteEndpoint() : null,
-                        new IOSessionImpl(key, socketChannel),
-                        closedSessions);
-                session.setHandler(this.eventHandlerFactory.createHandler(session,
-                        sessionRequest != null ? sessionRequest.getAttachment() : null));
-                session.setSocketTimeout(this.reactorConfig.getSoTimeout().toMillisIntBound());
-                key.attach(session);
-            } catch (final ClosedChannelException ex) {
-                final SessionRequestImpl sessionRequest = pendingSession.sessionRequest;
-                if (sessionRequest != null) {
-                    sessionRequest.failed(ex);
-                }
-                return;
-            } catch (final IOException ex) {
-                throw new IOReactorException("Failure registering channel with the selector", ex);
-            }
-            try {
-                final SessionRequestImpl sessionRequest = pendingSession.sessionRequest;
-                if (sessionRequest != null) {
-                    sessionRequest.completed(session);
-                }
-                session.onConnected();
-            } catch (final CancelledKeyException ex) {
-                session.shutdown(ShutdownType.GRACEFUL);
-            } catch (final IOException ex) {
-                session.onException(ex);
-            } catch (final RuntimeException ex) {
-                session.onException(ex);
-                throw ex;
-            }
-        }
-    }
-
-    private void processClosedSessions() {
-        for (;;) {
-            final InternalIOSession session = this.closedSessions.poll();
-            if (session == null) {
-                break;
-            }
-            try {
-                session.onDisconnected();
-            } catch (final CancelledKeyException ex) {
-                // ignore and move on
-            }
-        }
-    }
-
-    private void timeoutCheck(final SelectionKey key, final long now) {
-        final InternalIOSession session = (InternalIOSession) key.attachment();
-        if (session != null) {
-            try {
-                final int timeout = session.getSocketTimeout();
-                if (timeout > 0) {
-                    if (session.getLastAccessTime() + timeout < now) {
-                        session.onTimeout();
-                    }
-                }
-            } catch (final CancelledKeyException ex) {
-                session.shutdown(ShutdownType.GRACEFUL);
-            } catch (final IOException ex) {
-                session.onException(ex);
-            } catch (final RuntimeException ex) {
-                session.onException(ex);
-                throw ex;
-            }
-        }
-    }
-
-    private void closePendingSessions() {
-        for (;;) {
-            final PendingSession pendingSession = this.pendingSessions.poll();
-            if (pendingSession == null) {
-                break;
-            } else {
-                cancelQuietly(pendingSession.sessionRequest);
-                closeQuietly(pendingSession.socketChannel);
-            }
-        }
-    }
-
-    private void closeActiveChannels() {
-        final Set<SelectionKey> keys = this.selector.keys();
-        for (final SelectionKey key : keys) {
-            final InternalIOSession session = (InternalIOSession) key.attachment();
-            closeQuietly(session);
-        }
-        closeQuietly(this.selector);
-    }
-
-    @Override
-    public void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
-        Args.notNull(waitTime, "Wait time");
-        final long deadline = System.currentTimeMillis() + waitTime.toMillis();
-        long remaining = waitTime.toMillis();
-        synchronized (this.shutdownMutex) {
-            while (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
-                this.shutdownMutex.wait(remaining);
-                remaining = deadline - System.currentTimeMillis();
-                if (remaining <= 0) {
-                    return;
-                }
-            }
-        }
-    }
-
-    @Override
-    public void initiateShutdown() {
-        if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.SHUT_DOWN)) {
-            synchronized (this.shutdownMutex) {
-                this.shutdownMutex.notifyAll();
-            }
-        } else if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) {
-            this.selector.wakeup();
-        }
-    }
-
-    void forceShutdown() {
-        final IOReactorStatus previousStatus = this.status.getAndSet(IOReactorStatus.SHUT_DOWN);
-        if (previousStatus.compareTo(IOReactorStatus.ACTIVE) == 0) {
-            this.selector.wakeup();
-        }
-        synchronized (this.shutdownMutex) {
-            this.shutdownMutex.notifyAll();
-        }
-    }
-
-    @Override
-    public void shutdown(final ShutdownType shutdownType) {
-        initiateShutdown();
-        try {
-            if (shutdownType == ShutdownType.GRACEFUL) {
-                awaitShutdown(TimeValue.ofSeconds(5));
-            }
-            forceShutdown();
-        } catch (final InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    @Override
-    public void close() {
-        shutdown(ShutdownType.GRACEFUL);
-    }
-
-    private static class PendingSession {
-
-        final SocketChannel socketChannel;
-        final SessionRequestImpl sessionRequest;
-
-        private PendingSession(final SocketChannel socketChannel, final SessionRequestImpl sessionRequest) {
-            this.socketChannel = socketChannel;
-            this.sessionRequest = sessionRequest;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorService.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorService.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorService.java
new file mode 100644
index 0000000..965791d
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorService.java
@@ -0,0 +1,39 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+/**
+ * {@link IOReactor} running as a service.
+ *
+ * @since 5.0
+ */
+public interface IOReactorService extends IOReactor {
+
+    void start();
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorShutdownException.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorShutdownException.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorShutdownException.java
new file mode 100644
index 0000000..3e3854e
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorShutdownException.java
@@ -0,0 +1,41 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+/**
+ * Signals the I/O reator has been shut down or is in the process of shutting down.
+ *
+ * @since 5.0
+ */
+public class IOReactorShutdownException extends IllegalStateException {
+
+    public IOReactorShutdownException(final String message) {
+        super(message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorStatus.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorStatus.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorStatus.java
index dcec4cc..306885d 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorStatus.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorStatus.java
@@ -35,6 +35,11 @@ package org.apache.hc.core5.reactor;
 public enum IOReactorStatus {
 
     /**
+     * The reactor is inactive / has not been started
+     */
+    INACTIVE,
+
+    /**
      * The reactor is active / processing I/O events.
      */
     ACTIVE,
@@ -47,11 +52,6 @@ public enum IOReactorStatus {
     /**
      * The reactor has shut down.
      */
-    SHUT_DOWN,
-
-    /**
-     * The reactor is inactive / has not been started
-     */
-    INACTIVE
+    SHUT_DOWN
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorWorker.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorWorker.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorWorker.java
new file mode 100644
index 0000000..4992f67
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorWorker.java
@@ -0,0 +1,57 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+final class IOReactorWorker implements Runnable {
+
+    private final AbstractSingleCoreIOReactor ioReactor;
+
+    private volatile Throwable throwable;
+
+    public IOReactorWorker(final AbstractSingleCoreIOReactor ioReactor) {
+        super();
+        this.ioReactor = ioReactor;
+    }
+
+    @Override
+    public void run() {
+        try {
+            this.ioReactor.execute();
+        } catch (final Error ex) {
+            this.throwable = ex;
+            throw ex;
+        } catch (final Exception ex) {
+            this.throwable = ex;
+        }
+    }
+
+    public Throwable getThrowable() {
+        return this.throwable;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
index 7268a87..1188d07 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
@@ -37,17 +37,9 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.hc.core5.annotation.Contract;
-import org.apache.hc.core5.annotation.ThreadingBehavior;
 import org.apache.hc.core5.io.ShutdownType;
 import org.apache.hc.core5.util.Args;
 
-/**
- * Default implementation of {@link IOSession}.
- *
- * @since 4.0
- */
-@Contract(threading = ThreadingBehavior.SAFE)
 class IOSessionImpl implements IOSession {
 
     private final static AtomicLong COUNT = new AtomicLong(0);

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalChannel.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalChannel.java
new file mode 100644
index 0000000..47b6d6d
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalChannel.java
@@ -0,0 +1,81 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+import java.io.IOException;
+import java.nio.channels.CancelledKeyException;
+
+import org.apache.hc.core5.io.GracefullyCloseable;
+import org.apache.hc.core5.io.ShutdownType;
+
+abstract class InternalChannel implements GracefullyCloseable {
+
+    private volatile long lastEventTime;
+
+    InternalChannel() {
+        this.lastEventTime = System.currentTimeMillis();
+    }
+
+    abstract void onIOEvent(final int ops) throws IOException;
+
+    abstract void onTimeout() throws IOException;
+
+    abstract void onException(final Exception cause);
+
+    abstract int getTimeout();
+
+    final void handleIOEvent(final int ops) {
+        lastEventTime = System.currentTimeMillis();
+        try {
+            onIOEvent(ops);
+        } catch (final CancelledKeyException ex) {
+            shutdown(ShutdownType.GRACEFUL);
+        } catch (final Exception ex) {
+            onException(ex);
+            shutdown(ShutdownType.IMMEDIATE);
+        }
+    }
+
+    final void checkTimeout(final long currentTime) {
+        final int timeout = getTimeout();
+        if (timeout > 0) {
+            final long deadline = lastEventTime + timeout;
+            if (currentTime > deadline) {
+                try {
+                    onTimeout();
+                } catch (final CancelledKeyException ex) {
+                    shutdown(ShutdownType.GRACEFUL);
+                } catch (final Exception ex) {
+                    onException(ex);
+                    shutdown(ShutdownType.IMMEDIATE);
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalConnectChannel.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalConnectChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalConnectChannel.java
new file mode 100644
index 0000000..56142f9
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalConnectChannel.java
@@ -0,0 +1,101 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.hc.core5.io.ShutdownType;
+
+final class InternalConnectChannel extends InternalChannel {
+
+    private final SelectionKey key;
+    private final SocketChannel socketChannel;
+    private final SessionRequestImpl sessionRequest;
+    private final InternalDataChannelFactory dataChannelFactory;
+
+    InternalConnectChannel(
+            final SelectionKey key,
+            final SocketChannel socketChannel,
+            final SessionRequestImpl sessionRequest,
+            final InternalDataChannelFactory dataChannelFactory) {
+        super();
+        this.key = key;
+        this.socketChannel = socketChannel;
+        this.sessionRequest = sessionRequest;
+        this.dataChannelFactory = dataChannelFactory;
+    }
+
+    @Override
+    void onIOEvent(final int readyOps) throws IOException {
+        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
+            if (socketChannel.isConnectionPending()) {
+                socketChannel.finishConnect();
+            }
+            final InternalDataChannel dataChannel = dataChannelFactory.create(
+                    key,
+                    socketChannel,
+                    sessionRequest.getRemoteEndpoint(),
+                    sessionRequest.getAttachment());
+            key.attach(dataChannel);
+            sessionRequest.completed(dataChannel);
+            dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
+        }
+    }
+
+    @Override
+    int getTimeout() {
+        return sessionRequest.getConnectTimeout();
+    }
+
+    @Override
+    void onTimeout() throws IOException {
+        sessionRequest.timeout();
+    }
+
+    @Override
+    void onException(final Exception cause) {
+        sessionRequest.failed(cause);
+    }
+
+    @Override
+    public void close() throws IOException {
+        key.cancel();
+        socketChannel.close();
+    }
+
+    @Override
+    public void shutdown(final ShutdownType shutdownType) {
+        try {
+            close();
+        } catch (final IOException ignore) {
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
new file mode 100644
index 0000000..def406d
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
@@ -0,0 +1,310 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.net.NamedEndpoint;
+import org.apache.hc.core5.reactor.ssl.SSLBufferManagement;
+import org.apache.hc.core5.reactor.ssl.SSLIOSession;
+import org.apache.hc.core5.reactor.ssl.SSLMode;
+import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
+import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
+import org.apache.hc.core5.reactor.ssl.TlsDetails;
+import org.apache.hc.core5.util.Asserts;
+
+final class InternalDataChannel extends InternalChannel implements TlsCapableIOSession {
+
+    private final IOSession ioSession;
+    private final NamedEndpoint namedEndpoint;
+    private final AtomicReference<SSLIOSession> tlsSessionRef;
+    private final Queue<InternalDataChannel> closedSessions;
+    private final AtomicBoolean connected;
+    private final AtomicBoolean closed;
+
+    InternalDataChannel(
+            final SelectionKey key,
+            final SocketChannel socketChannel,
+            final NamedEndpoint namedEndpoint,
+            final Queue<InternalDataChannel> closedSessions) {
+        this.ioSession = new IOSessionImpl(key, socketChannel);
+        this.namedEndpoint = namedEndpoint;
+        this.closedSessions = closedSessions;
+        this.tlsSessionRef = new AtomicReference<>(null);
+        this.connected = new AtomicBoolean(false);
+        this.closed = new AtomicBoolean(false);
+    }
+
+    @Override
+    public String getId() {
+        return ioSession.getId();
+    }
+
+    private IOSession getSessionImpl() {
+        final SSLIOSession tlsSession = tlsSessionRef.get();
+        if (tlsSession != null) {
+            return tlsSession;
+        } else {
+            return ioSession;
+        }
+    }
+
+    private IOEventHandler getEventHandler() {
+        final IOEventHandler handler = ioSession.getHandler();
+        Asserts.notNull(handler, "IO event handler");
+        return handler;
+    }
+
+    @Override
+    void onIOEvent(final int readyOps) throws IOException {
+        final SSLIOSession tlsSession = tlsSessionRef.get();
+        if (tlsSession != null) {
+            if (!tlsSession.isInitialized()) {
+                tlsSession.initialize();
+            }
+            if ((readyOps & SelectionKey.OP_READ) != 0) {
+                if (tlsSession.isAppInputReady()) {
+                    do {
+                        final IOEventHandler handler = getEventHandler();
+                        handler.inputReady(this);
+                    } while (tlsSession.hasInputDate());
+                }
+                tlsSession.inboundTransport();
+            }
+            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
+                if (tlsSession.isAppOutputReady()) {
+                    final IOEventHandler handler = getEventHandler();
+                    handler.outputReady(this);
+                }
+                tlsSession.outboundTransport();
+            }
+        } else {
+            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
+                if (connected.compareAndSet(false, true)) {
+                    final IOEventHandler handler = getEventHandler();
+                    handler.connected(this);
+                }
+            }
+            if ((readyOps & SelectionKey.OP_READ) != 0) {
+                final IOEventHandler handler = getEventHandler();
+                handler.inputReady(this);
+            }
+            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
+                final IOEventHandler handler = getEventHandler();
+                handler.outputReady(this);
+            }
+        }
+    }
+
+    @Override
+    int getTimeout() {
+        return ioSession.getSocketTimeout();
+    }
+
+    @Override
+    void onTimeout() throws IOException {
+        final IOEventHandler handler = getEventHandler();
+        handler.timeout(this);
+        final SSLIOSession tlsSession = tlsSessionRef.get();
+        if (tlsSession != null) {
+            if (tlsSession.isOutboundDone() && !tlsSession.isInboundDone()) {
+                // The session failed to terminate cleanly
+                tlsSession.shutdown(ShutdownType.IMMEDIATE);
+            }
+        }
+    }
+
+    @Override
+    void onException(final Exception cause) {
+        final IOEventHandler handler = getEventHandler();
+        handler.exception(this, cause);
+    }
+
+    void disconnected() {
+        final IOEventHandler handler = getEventHandler();
+        handler.disconnected(this);
+    }
+
+    @Override
+    public void startTls(
+            final SSLContext sslContext,
+            final SSLBufferManagement sslBufferManagement,
+            final SSLSessionInitializer initializer,
+            final SSLSessionVerifier verifier) {
+        if (!tlsSessionRef.compareAndSet(null, new SSLIOSession(
+                namedEndpoint,
+                ioSession,
+                namedEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
+                sslContext,
+                sslBufferManagement,
+                initializer,
+                verifier,
+                new Callback<SSLIOSession>() {
+
+                    @Override
+                    public void execute(final SSLIOSession sslSession) {
+                        if (connected.compareAndSet(false, true)) {
+                            final IOEventHandler handler = getEventHandler();
+                            try {
+                                handler.connected(InternalDataChannel.this);
+                            } catch (final Exception ex) {
+                                handler.exception(InternalDataChannel.this, ex);
+                            }
+                        }
+                    }
+
+                }))) {
+            throw new IllegalStateException("TLS already activated");
+        }
+    }
+
+    @Override
+    public TlsDetails getTlsDetails() {
+        final SSLIOSession sslIoSession = tlsSessionRef.get();
+        return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
+    }
+
+    @Override
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            try {
+                getSessionImpl().close();
+            } finally {
+                closedSessions.add(this);
+            }
+        }
+    }
+
+    @Override
+    public void shutdown(final ShutdownType shutdownType) {
+        if (closed.compareAndSet(false, true)) {
+            try {
+                getSessionImpl().shutdown(shutdownType);
+            } finally {
+                closedSessions.add(this);
+            }
+        }
+    }
+
+    @Override
+    public int getStatus() {
+        return getSessionImpl().getStatus();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return getSessionImpl().isClosed();
+    }
+
+    @Override
+    public IOEventHandler getHandler() {
+        return ioSession.getHandler();
+    }
+
+    @Override
+    public void setHandler(final IOEventHandler eventHandler) {
+        ioSession.setHandler(eventHandler);
+    }
+
+    @Override
+    public void addLast(final Command command) {
+        getSessionImpl().addLast(command);
+    }
+
+    @Override
+    public void addFirst(final Command command) {
+        getSessionImpl().addFirst(command);
+    }
+
+    @Override
+    public Command getCommand() {
+        return getSessionImpl().getCommand();
+    }
+
+    @Override
+    public ByteChannel channel() {
+        return getSessionImpl().channel();
+    }
+
+    @Override
+    public SocketAddress getRemoteAddress() {
+        return ioSession.getRemoteAddress();
+    }
+
+    @Override
+    public SocketAddress getLocalAddress() {
+        return ioSession.getLocalAddress();
+    }
+
+    @Override
+    public int getEventMask() {
+        return getSessionImpl().getEventMask();
+    }
+
+    @Override
+    public void setEventMask(final int ops) {
+        getSessionImpl().setEventMask(ops);
+    }
+
+    @Override
+    public void setEvent(final int op) {
+        getSessionImpl().setEvent(op);
+    }
+
+    @Override
+    public void clearEvent(final int op) {
+        getSessionImpl().clearEvent(op);
+    }
+
+    @Override
+    public int getSocketTimeout() {
+        return ioSession.getSocketTimeout();
+    }
+
+    @Override
+    public void setSocketTimeout(final int timeout) {
+        ioSession.setSocketTimeout(timeout);
+    }
+
+    @Override
+    public String toString() {
+        return getSessionImpl().toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannelFactory.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannelFactory.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannelFactory.java
new file mode 100644
index 0000000..5f008aa
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannelFactory.java
@@ -0,0 +1,43 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.hc.core5.net.NamedEndpoint;
+
+interface InternalDataChannelFactory {
+
+    InternalDataChannel create(
+            SelectionKey key,
+            SocketChannel socketChannel,
+            NamedEndpoint namedEndpoint,
+            Object attachment);
+
+}


Mime
View raw message