hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r1767954 - in /httpcomponents/httpcore/trunk: httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ht...
Date Thu, 03 Nov 2016 19:28:18 GMT
Author: olegk
Date: Thu Nov  3 19:28:18 2016
New Revision: 1767954

URL: http://svn.apache.org/viewvc?rev=1767954&view=rev
Log:
Redesign of non-blocking I/O session APIs

Added:
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java   (contents, props changed)
      - copied, changed from r1767392, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactor.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java   (with props)
Removed:
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactor.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/BaseIOReactor.java
Modified:
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2IOEventHandler.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
    httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOEventHandler.java
    httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java
    httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestClient.java
    httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestServer.java
    httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestClient.java
    httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestServer.java
    httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpointImpl.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncRequester.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncServer.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2IOEventHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2IOEventHandler.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2IOEventHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2IOEventHandler.java Thu Nov  3 19:28:18 2016
@@ -81,6 +81,11 @@ class AbstractHttp2IOEventHandler implem
     }
 
     @Override
+    public void exception(final IOSession session, final Exception cause) {
+        streamMultiplexer.onException(cause);
+    }
+
+    @Override
     public void disconnected(final IOSession session) {
         streamMultiplexer.onDisconnect();
     }

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java Thu Nov  3 19:28:18 2016
@@ -36,7 +36,6 @@ import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -506,9 +505,11 @@ abstract class AbstractHttp2StreamMultip
     }
 
     private void processPendingCommands() throws IOException, HttpException {
-        final Queue<Command> commandQueue = ioSession.getCommandQueue();
-        while (!commandQueue.isEmpty() && streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
-            final Command command = commandQueue.remove();
+        while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
+            final Command command = ioSession.getCommand();
+            if (command == null) {
+                break;
+            }
             if (command instanceof ShutdownCommand) {
                 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
                 if (shutdownCommand.getType() == ShutdownType.IMMEDIATE) {
@@ -558,9 +559,8 @@ abstract class AbstractHttp2StreamMultip
     }
 
     private void cancelPendingCommands() {
-        final Deque<Command> commandQueue = ioSession.getCommandQueue();
         for (;;) {
-            final Command command = commandQueue.poll();
+            final Command command = ioSession.getCommand();
             if (command != null) {
                 command.cancel();
             } else {
@@ -1063,14 +1063,12 @@ abstract class AbstractHttp2StreamMultip
 
     @Override
     public void close() throws IOException {
-        ioSession.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
-        ioSession.setEvent(SelectionKey.OP_WRITE);
+        ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
     }
 
     @Override
     public void shutdown() throws IOException {
-        ioSession.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.IMMEDIATE));
-        ioSession.setEvent(SelectionKey.OP_WRITE);
+        ioSession.addFirst(new ShutdownCommand(ShutdownType.IMMEDIATE));
     }
 
     @Override

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java Thu Nov  3 19:28:18 2016
@@ -29,6 +29,7 @@ package org.apache.hc.core5.http2.impl.n
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ByteChannel;
 import java.nio.charset.Charset;
@@ -125,6 +126,15 @@ public class ClientHttpProtocolNegotiato
 
     @Override
     public void timeout(final IOSession session) {
+        exception(session, new SocketTimeoutException());
+    }
+
+    @Override
+    public void exception(final IOSession session, final Exception cause) {
+        session.close();
+        if (connectionListener != null) {
+            connectionListener.onError(this, new SocketTimeoutException());
+        }
     }
 
     @Override

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java Thu Nov  3 19:28:18 2016
@@ -126,6 +126,11 @@ public class ServerHttpProtocolNegotiato
 
     @Override
     public void timeout(final IOSession session) {
+        exception(session, new SocketTimeoutException());
+    }
+
+    @Override
+    public void exception(final IOSession session, final Exception cause) {
         session.close();
         if (connectionListener != null) {
             connectionListener.onError(this, new SocketTimeoutException());

Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOEventHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOEventHandler.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOEventHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOEventHandler.java Thu Nov  3 19:28:18 2016
@@ -84,6 +84,11 @@ public class LoggingIOEventHandler imple
     }
 
     @Override
+    public void exception(final IOSession session, final Exception cause) {
+        handler.exception(session, cause);
+    }
+
+    @Override
     public void disconnected(final IOSession session) {
         if (log.isDebugEnabled()) {
             log.debug(id + " " + session + " disconnected");

Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java Thu Nov  3 19:28:18 2016
@@ -32,7 +32,6 @@ import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.ByteChannel;
 import java.nio.channels.SelectionKey;
-import java.util.Deque;
 
 import org.apache.hc.core5.reactor.Command;
 import org.apache.hc.core5.reactor.IOEventHandler;
@@ -61,8 +60,18 @@ public class LoggingIOSession implements
     }
 
     @Override
-    public Deque<Command> getCommandQueue() {
-        return this.session.getCommandQueue();
+    public void addLast(final Command command) {
+        this.session.addLast(command);
+    }
+
+    @Override
+    public void addFirst(final Command command) {
+        this.session.addFirst(command);
+    }
+
+    @Override
+    public Command getCommand() {
+        return this.session.getCommand();
     }
 
     @Override

Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestClient.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestClient.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestClient.java Thu Nov  3 19:28:18 2016
@@ -30,7 +30,6 @@ package org.apache.hc.core5.testing.nio.
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
-import java.nio.channels.SelectionKey;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -69,8 +68,7 @@ public class Http1TestClient extends Asy
         }, new IOSessionCallback() {
             @Override
             public void execute(final IOSession session) throws IOException {
-                session.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
-                session.setEvent(SelectionKey.OP_WRITE);
+                session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
             }
         });
     }

Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestServer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestServer.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestServer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestServer.java Thu Nov  3 19:28:18 2016
@@ -29,7 +29,6 @@ package org.apache.hc.core5.testing.nio.
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.channels.SelectionKey;
 
 import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.config.ConnectionConfig;
@@ -71,8 +70,7 @@ public class Http1TestServer extends Asy
 
             @Override
             public void execute(final IOSession session) throws IOException {
-                session.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
-                session.setEvent(SelectionKey.OP_WRITE);
+                session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
             }
 
         });

Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestClient.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestClient.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestClient.java Thu Nov  3 19:28:18 2016
@@ -30,7 +30,6 @@ package org.apache.hc.core5.testing.nio.
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
-import java.nio.channels.SelectionKey;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -82,8 +81,7 @@ public class Http2TestClient extends Asy
 
             @Override
             public void execute(final IOSession session) throws IOException {
-                session.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
-                session.setEvent(SelectionKey.OP_WRITE);
+                session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
             }
 
         });

Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestServer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestServer.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestServer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestServer.java Thu Nov  3 19:28:18 2016
@@ -29,7 +29,6 @@ package org.apache.hc.core5.testing.nio.
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.channels.SelectionKey;
 import java.nio.charset.StandardCharsets;
 
 import org.apache.hc.core5.http.ExceptionListener;
@@ -70,8 +69,7 @@ public class Http2TestServer extends Asy
 
             @Override
             public void execute(final IOSession session) throws IOException {
-                session.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
-                session.setEvent(SelectionKey.OP_WRITE);
+                session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
             }
 
         });

Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java Thu Nov  3 19:28:18 2016
@@ -82,6 +82,10 @@ public class TestDefaultListeningIOReact
                     }
 
                     @Override
+                    public void exception(final IOSession session, final Exception cause) {
+                    }
+
+                    @Override
                     public void disconnected(final IOSession session) {
                     }
                 };

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java Thu Nov  3 19:28:18 2016
@@ -80,6 +80,11 @@ class AbstractHttp1IOEventHandler implem
     }
 
     @Override
+    public void exception(final IOSession session, final Exception cause) {
+        streamDuplexer.onException(cause);
+    }
+
+    @Override
     public void disconnected(final IOSession session) {
         streamDuplexer.onDisconnect();
     }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java Thu Nov  3 19:28:18 2016
@@ -35,7 +35,6 @@ import java.nio.channels.ClosedChannelEx
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.WritableByteChannel;
-import java.util.Deque;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -164,7 +163,7 @@ abstract class AbstractHttp1StreamDuplex
 
     private void processCommands() throws HttpException, IOException {
         for (;;) {
-            final Command command = ioSession.getCommandQueue().poll();
+            final Command command = ioSession.getCommand();
             if (command == null) {
                 return;
             }
@@ -320,9 +319,8 @@ abstract class AbstractHttp1StreamDuplex
     }
 
     private void cancelPendingCommands() {
-        final Deque<Command> commandQueue = ioSession.getCommandQueue();
         for (;;) {
-            final Command command = commandQueue.poll();
+            final Command command = ioSession.getCommand();
             if (command != null) {
                 command.cancel();
             } else {
@@ -433,14 +431,12 @@ abstract class AbstractHttp1StreamDuplex
 
     @Override
     public void close() throws IOException {
-        ioSession.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
-        ioSession.setEvent(SelectionKey.OP_WRITE);
+        ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
     }
 
     @Override
     public void shutdown() throws IOException {
-        ioSession.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.IMMEDIATE));
-        ioSession.setEvent(SelectionKey.OP_WRITE);
+        ioSession.addFirst(new ShutdownCommand(ShutdownType.IMMEDIATE));
     }
 
     @Override

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpointImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpointImpl.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpointImpl.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpointImpl.java Thu Nov  3 19:28:18 2016
@@ -27,7 +27,6 @@
 
 package org.apache.hc.core5.http.impl.nio.bootstrap;
 
-import java.nio.channels.SelectionKey;
 import java.util.concurrent.Future;
 
 import org.apache.hc.core5.annotation.Contract;
@@ -68,8 +67,7 @@ public final class ClientEndpointImpl im
         final Command executionCommand = new ExecutionCommand(
                 exchangeHandler,
                 context != null ? context : HttpCoreContext.create());
-        ioSession.getCommandQueue().add(executionCommand);
-        ioSession.setEvent(SelectionKey.OP_WRITE);
+        ioSession.addLast(executionCommand);
         if (ioSession.isClosed()) {
             executionCommand.cancel();
         }
@@ -120,8 +118,7 @@ public final class ClientEndpointImpl im
 
     @Override
     public void shutdown(final ShutdownType type) {
-        ioSession.getCommandQueue().addFirst(new ShutdownCommand(type));
-        ioSession.setEvent(SelectionKey.OP_WRITE);
+        ioSession.addFirst(new ShutdownCommand(type));
     }
 
 }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncRequester.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncRequester.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncRequester.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncRequester.java Thu Nov  3 19:28:18 2016
@@ -30,7 +30,6 @@ package org.apache.hc.core5.http.impl.ni
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
-import java.nio.channels.SelectionKey;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -63,8 +62,7 @@ public class HttpAsyncRequester extends
 
             @Override
             public void execute(final IOSession session) throws IOException {
-                session.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
-                session.setEvent(SelectionKey.OP_WRITE);
+                session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
             }
 
         });

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncServer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncServer.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncServer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncServer.java Thu Nov  3 19:28:18 2016
@@ -27,7 +27,6 @@
 package org.apache.hc.core5.http.impl.nio.bootstrap;
 
 import java.io.IOException;
-import java.nio.channels.SelectionKey;
 
 import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
@@ -53,8 +52,7 @@ public class HttpAsyncServer extends Asy
 
             @Override
             public void execute(final IOSession session) throws IOException {
-                session.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
-                session.setEvent(SelectionKey.OP_WRITE);
+                session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
             }
 
         });

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java Thu Nov  3 19:28:18 2016
@@ -49,7 +49,7 @@ import org.apache.hc.core5.util.Args;
 
 /**
  * Generic implementation of {@link IOReactor} that can run multiple
- * {@link BaseIOReactor} instance in separate worker threads and distribute
+ * {@link IOReactorImpl} instance in separate worker threads and distribute
  * newly created I/O session equally across those I/O reactors for a more
  * optimal resource utilization and a better I/O performance. Usually it is
  * recommended to have one worker I/O reactor per physical CPU core.
@@ -94,7 +94,7 @@ public abstract class AbstractMultiworke
     private final int workerCount;
     private final IOEventHandlerFactory eventHandlerFactory;
     private final ThreadFactory threadFactory;
-    private final BaseIOReactor[] dispatchers;
+    private final IOReactorImpl[] dispatchers;
     private final Worker[] workers;
     private final Thread[] threads;
     private final AtomicReference<IOReactorStatus> status;
@@ -136,7 +136,7 @@ public abstract class AbstractMultiworke
         }
         this.auditLog = new ArrayList<>();
         this.workerCount = this.reactorConfig.getIoThreadCount();
-        this.dispatchers = new BaseIOReactor[workerCount];
+        this.dispatchers = new IOReactorImpl[workerCount];
         this.workers = new Worker[workerCount];
         this.threads = new Thread[workerCount];
         this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
@@ -257,11 +257,11 @@ public abstract class AbstractMultiworke
         try {
             // Start I/O dispatchers
             for (int i = 0; i < this.dispatchers.length; i++) {
-                final BaseIOReactor dispatcher = new BaseIOReactor(this.eventHandlerFactory, this.reactorConfig, this.exceptionHandler);
+                final IOReactorImpl dispatcher = new IOReactorImpl(this.eventHandlerFactory, this.reactorConfig, this.exceptionHandler);
                 this.dispatchers[i] = dispatcher;
             }
             for (int i = 0; i < this.workerCount; i++) {
-                final BaseIOReactor dispatcher = this.dispatchers[i];
+                final IOReactorImpl dispatcher = this.dispatchers[i];
                 this.workers[i] = new Worker(dispatcher);
                 this.threads[i] = this.threadFactory.newThread(this.workers[i]);
             }
@@ -400,7 +400,7 @@ public abstract class AbstractMultiworke
         if (callback == null) {
             return;
         }
-        for (BaseIOReactor dispatcher: dispatchers) {
+        for (IOReactorImpl dispatcher: dispatchers) {
             if (dispatcher != null) {
                 dispatcher.enumSessions(callback);
             }
@@ -412,7 +412,7 @@ public abstract class AbstractMultiworke
         if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) {
             selector.wakeup();
             for (int i = 0; i < this.workerCount; i++) {
-                final BaseIOReactor dispatcher = this.dispatchers[i];
+                final IOReactorImpl dispatcher = this.dispatchers[i];
                 if (dispatcher != null) {
                     dispatcher.initiateShutdown();
                 }
@@ -439,7 +439,7 @@ public abstract class AbstractMultiworke
             }
         }
         for (int i = 0; i < this.dispatchers.length; i++) {
-            final BaseIOReactor dispatcher = this.dispatchers[i];
+            final IOReactorImpl dispatcher = this.dispatchers[i];
             if (dispatcher != null) {
                 if (dispatcher.getStatus().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
                     dispatcher.awaitShutdown(remaining, TimeUnit.MILLISECONDS);
@@ -464,7 +464,7 @@ public abstract class AbstractMultiworke
         this.status.set(IOReactorStatus.SHUT_DOWN);
         this.selector.wakeup();
         for (int i = 0; i < this.dispatchers.length; i++) {
-            final BaseIOReactor dispatcher = this.dispatchers[i];
+            final IOReactorImpl dispatcher = this.dispatchers[i];
             if (dispatcher != null) {
                 dispatcher.forceShutdown();
             }
@@ -503,11 +503,11 @@ public abstract class AbstractMultiworke
 
     static class Worker implements Runnable {
 
-        final BaseIOReactor dispatcher;
+        final IOReactorImpl dispatcher;
 
         private volatile Exception exception;
 
-        public Worker(final BaseIOReactor dispatcher) {
+        public Worker(final IOReactorImpl dispatcher) {
             super();
             this.dispatcher = dispatcher;
         }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java Thu Nov  3 19:28:18 2016
@@ -67,6 +67,13 @@ public interface IOEventHandler {
     void timeout(IOSession session);
 
     /**
+     * Triggered when the given session throws a exception.
+     *
+     * @param session the I/O session.
+     */
+    void exception(IOSession session, Exception cause);
+
+    /**
      * Triggered when the given session has been terminated.
      *
      * @param session the I/O session.

Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java (from r1767392, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactor.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactor.java&r1=1767392&r2=1767954&rev=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java Thu Nov  3 19:28:18 2016
@@ -42,36 +42,33 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.util.Args;
-import org.apache.hc.core5.util.Asserts;
 
 /**
- * Generic implementation of {@link IOReactor} that can used as a subclass
- * for more specialized I/O reactors. It is based on a single {@link Selector}
- * instance.
+ * {@link IOReactor} implementation.
  *
  * @since 4.0
  */
-abstract class AbstractIOReactor implements IOReactor {
+class IOReactorImpl implements IOReactor {
 
     private final IOReactorConfig reactorConfig;
     private final IOEventHandlerFactory eventHandlerFactory;
     private final Selector selector;
-    private final Queue<IOSession> closedSessions;
+    private final Queue<ManagedIOSession> closedSessions;
     private final Queue<PendingSession> pendingSessions;
+    private final AtomicReference<IOReactorStatus> status;
     private final Object shutdownMutex;
+    private final IOReactorExceptionHandler exceptionHandler;
 
-    private final AtomicReference<IOReactorStatus> status;
+    private volatile long lastTimeoutCheck;
 
-    /**
-     * Creates new AbstractIOReactor instance.
-     *
-     * @param eventHandlerFactory the event handler factory.
-     * @param reactorConfig the reactor configuration.
-     */
-    public AbstractIOReactor(final IOEventHandlerFactory eventHandlerFactory, final IOReactorConfig reactorConfig) {
+    IOReactorImpl(
+            final IOEventHandlerFactory eventHandlerFactory,
+            final IOReactorConfig reactorConfig,
+            final IOReactorExceptionHandler exceptionHandler) {
         super();
         this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
         this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
+        this.exceptionHandler = exceptionHandler;
         this.closedSessions = new ConcurrentLinkedQueue<>();
         this.pendingSessions = new ConcurrentLinkedQueue<>();
         try {
@@ -83,112 +80,12 @@ abstract class AbstractIOReactor impleme
         this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
     }
 
-    /**
-     * Triggered when the key signals {@link SelectionKey#OP_ACCEPT} readiness.
-     * <p>
-     * Super-classes can implement this method to react to the event.
-     *
-     * @param key the selection key.
-     */
-    protected abstract void acceptable(SelectionKey key);
-
-    /**
-     * Triggered when the key signals {@link SelectionKey#OP_CONNECT} readiness.
-     * <p>
-     * Super-classes can implement this method to react to the event.
-     *
-     * @param key the selection key.
-     */
-    protected abstract void connectable(SelectionKey key);
-
-    /**
-     * Triggered when the key signals {@link SelectionKey#OP_READ} readiness.
-     * <p>
-     * Super-classes can implement this method to react to the event.
-     *
-     * @param key the selection key.
-     */
-    protected abstract void readable(SelectionKey key);
-
-    /**
-     * Triggered when the key signals {@link SelectionKey#OP_WRITE} readiness.
-     * <p>
-     * Super-classes can implement this method to react to the event.
-     *
-     * @param key the selection key.
-     */
-    protected abstract void writable(SelectionKey key);
-
-    /**
-     * Triggered to validate keys currently registered with the selector. This
-     * method is called after each I/O select loop.
-     * <p>
-     * Super-classes can implement this method to run validity checks on
-     * active sessions and include additional processing that needs to be
-     * executed after each I/O select loop.
-     *
-     * @param keys all selection keys registered with the selector.
-     */
-    protected abstract void validate(Set<SelectionKey> keys);
-
-    /**
-     * Triggered when new session has been created.
-     * <p>
-     * Super-classes can implement this method to react to the event.
-     *
-     * @param session new I/O session.
-     */
-    protected abstract void sessionCreated(final IOSession session);
-
-    /**
-     * Triggered when a session has been closed.
-     * <p>
-     * Super-classes can implement this method to react to the event.
-     *
-     * @param session closed I/O session.
-     */
-    protected abstract void sessionClosed(final IOSession session);
-
-    /**
-     * Triggered when a session has timed out.
-     * <p>
-     * Super-classes can implement this method to react to the event.
-     *
-     * @param session timed out I/O session.
-     */
-    protected abstract void sessionTimedOut(final IOSession session);
-
-    /**
-     * Obtains {@link IOSession} instance associated with the given selection
-     * key.
-     *
-     * @param key the selection key.
-     * @return I/O session.
-     */
-    protected IOSession getSession(final SelectionKey key) {
-        return (IOSession) key.attachment();
-    }
-
-    protected IOEventHandler ensureEventHandler(final IOSession ioSession) {
-        Asserts.notNull(ioSession, "IO session");
-        final IOEventHandler handler = ioSession.getHandler();
-        Asserts.notNull(handler, "IO event handler");
-        return handler;
-    }
-
     @Override
     public IOReactorStatus getStatus() {
         return this.status.get();
     }
 
-    /**
-     * Enqueues pending session. The socket channel will be asynchronously registered
-     * with the selector.
-     *
-     * @param socketChannel the new socketChannel.
-     * @param sessionRequest the session request if applicable.
-     */
-    public void enqueuePendingSession(final SocketChannel socketChannel, final SessionRequestImpl sessionRequest) {
+    void enqueuePendingSession(final SocketChannel socketChannel, final SessionRequestImpl sessionRequest) {
         Args.notNull(socketChannel, "SocketChannel");
         this.pendingSessions.add(new PendingSession(socketChannel, sessionRequest));
         this.selector.wakeup();
@@ -205,15 +102,6 @@ abstract class AbstractIOReactor impleme
      * The method will remain blocked unto the I/O reactor is shut down or the
      * execution thread is interrupted.
      *
-     * @see #acceptable(SelectionKey)
-     * @see #connectable(SelectionKey)
-     * @see #readable(SelectionKey)
-     * @see #writable(SelectionKey)
-     * @see #timeoutCheck(SelectionKey, long)
-     * @see #validate(Set)
-     * @see #sessionCreated(IOSession)
-     * @see #sessionClosed(IOSession)
-     *
      * @throws InterruptedIOException if the dispatch thread is interrupted.
      * @throws IOReactorException in case if a non-recoverable I/O error.
      */
@@ -284,51 +172,59 @@ abstract class AbstractIOReactor impleme
         }
     }
 
+    private void validate(final Set<SelectionKey> keys) {
+        final long currentTime = System.currentTimeMillis();
+        if( (currentTime - this.lastTimeoutCheck) >= this.reactorConfig.getSelectInterval()) {
+            this.lastTimeoutCheck = currentTime;
+            if (keys != null) {
+                for (final SelectionKey key : keys) {
+                    timeoutCheck(key, currentTime);
+                }
+            }
+        }
+    }
+
     private void processEvents(final Set<SelectionKey> selectedKeys) {
         for (final SelectionKey key : selectedKeys) {
-
             processEvent(key);
-
         }
         selectedKeys.clear();
     }
 
-    /**
-     * Processes new event on the given selection key.
-     *
-     * @param key the selection key that triggered an event.
-     */
-    protected void processEvent(final SelectionKey key) {
-        final IOSessionImpl session = (IOSessionImpl) key.attachment();
+    private void handleRuntimeException(final RuntimeException ex) {
+        if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
+            throw ex;
+        }
+    }
+
+    private void processEvent(final SelectionKey key) {
+        final ManagedIOSession session = (ManagedIOSession) key.attachment();
         try {
-            if (key.isAcceptable()) {
-                acceptable(key);
-            }
-            if (key.isConnectable()) {
-                connectable(key);
-            }
             if (key.isReadable()) {
-                session.resetLastRead();
-                readable(key);
+                session.updateAccessTime();
+                session.onInputReady();
             }
             if (key.isWritable()) {
-                session.resetLastWrite();
-                writable(key);
+                session.updateAccessTime();
+                session.onOutputReady();
             }
         } catch (final CancelledKeyException ex) {
             session.shutdown();
+        } catch (final RuntimeException ex) {
+            session.shutdown();
+            handleRuntimeException(ex);
         }
     }
 
     private void processPendingSessions() throws IOReactorException {
         PendingSession pendingSession;
         while ((pendingSession = this.pendingSessions.poll()) != null) {
-            final IOSession session;
+            final ManagedIOSession session;
             try {
                 final SocketChannel socketChannel = pendingSession.socketChannel;
                 socketChannel.configureBlocking(false);
                 final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_READ);
-                session = new IOSessionImpl(key, socketChannel, this.closedSessions);
+                session = new ManagedIOSession(new IOSessionImpl(key, socketChannel), closedSessions);
                 session.setHandler(this.eventHandlerFactory.createHandler(session));
                 session.setSocketTimeout(this.reactorConfig.getSoTimeout());
                 key.attach(session);
@@ -346,7 +242,11 @@ abstract class AbstractIOReactor impleme
                 if (sessionRequest != null) {
                     sessionRequest.completed(session);
                 }
-                sessionCreated(session);
+                try {
+                    session.onConnected();
+                } catch (final RuntimeException ex) {
+                    handleRuntimeException(ex);
+                }
             } catch (final CancelledKeyException ex) {
                 session.shutdown();
             }
@@ -354,33 +254,36 @@ abstract class AbstractIOReactor impleme
     }
 
     private void processClosedSessions() {
-        IOSession session;
-        while ((session = this.closedSessions.poll()) != null) {
+        for (;;) {
+            final ManagedIOSession session = this.closedSessions.poll();
+            if (session == null) {
+                break;
+            }
             try {
-                sessionClosed(session);
+                session.onDisconnected();
             } catch (final CancelledKeyException ex) {
                 // ignore and move on
+            } catch (final RuntimeException ex) {
+                handleRuntimeException(ex);
             }
         }
     }
 
-    /**
-     * Triggered to verify whether the I/O session associated with the
-     * given selection key has not timed out.
-     * <p>
-     * Super-classes can implement this method to react to the event.
-     *
-     * @param key the selection key.
-     * @param now current time as long value.
-     */
-    protected void timeoutCheck(final SelectionKey key, final long now) {
-        final IOSessionImpl session = (IOSessionImpl) key.attachment();
+    private void timeoutCheck(final SelectionKey key, final long now) {
+        final ManagedIOSession session = (ManagedIOSession) key.attachment();
         if (session != null) {
-            final int timeout = session.getSocketTimeout();
-            if (timeout > 0) {
-                if (session.getLastAccessTime() + timeout < now) {
-                    sessionTimedOut(session);
+            try {
+                final int timeout = session.getSocketTimeout();
+                if (timeout > 0) {
+                    if (session.getLastAccessTime() + timeout < now) {
+                        session.onTimeout();
+                    }
                 }
+            } catch (final CancelledKeyException ex) {
+                session.shutdown();
+            } catch (final RuntimeException ex) {
+                session.shutdown();
+                handleRuntimeException(ex);
             }
         }
     }
@@ -400,15 +303,11 @@ abstract class AbstractIOReactor impleme
         }
     }
 
-    /**
-     * Closes out all active channels registered with the selector of
-     * this I/O reactor.
-     */
-    protected void closeActiveChannels() {
+    private void closeActiveChannels() {
         try {
             final Set<SelectionKey> keys = this.selector.keys();
             for (final SelectionKey key : keys) {
-                final IOSession session = getSession(key);
+                final ManagedIOSession session = (ManagedIOSession) key.attachment();
                 if (session != null) {
                     session.close();
                 }
@@ -418,19 +317,18 @@ abstract class AbstractIOReactor impleme
         }
     }
 
-    /**
-     * Enumerates all active sessions
-     *
-     * @since 5.0
-     */
-    protected void enumSessions(final IOSessionCallback callback) throws IOException {
+    void enumSessions(final IOSessionCallback callback) throws IOException {
         if (this.selector.isOpen()) {
             try {
                 final Set<SelectionKey> keys = this.selector.keys();
                 for (final SelectionKey key : keys) {
-                    final IOSession session = getSession(key);
+                    final ManagedIOSession session = (ManagedIOSession) key.attachment();
                     if (session != null) {
-                        callback.execute(session);
+                        try {
+                            callback.execute(session);
+                        } catch (CancelledKeyException ex) {
+                            session.close();
+                        }
                     }
                 }
             } catch (ClosedSelectorException ignore) {

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java Thu Nov  3 19:28:18 2016
@@ -29,7 +29,6 @@ package org.apache.hc.core5.reactor;
 
 import java.net.SocketAddress;
 import java.nio.channels.ByteChannel;
-import java.util.Deque;
 
 /**
  * IOSession interface represents a sequence of logically related data exchanges
@@ -49,12 +48,6 @@ import java.util.Deque;
  */
 public interface IOSession {
 
-    /**
-     * Name of the context attribute key, which can be used to obtain the
-     * session attachment object.
-     */
-    String ATTACHMENT_KEY = "http.session.attachment";
-
     int ACTIVE       = 0;
     int CLOSING      = 1;
     int CLOSED       = Integer.MAX_VALUE;
@@ -74,11 +67,24 @@ public interface IOSession {
     void setHandler(IOEventHandler handler);
 
     /**
-     * Returns the command queue for this session.
+     * Inserts {@link Command} at the end of the command queue.
+     *
+     * @since 5.0
+     */
+    void addLast(Command command);
+
+    /**
+     * Inserts {@link Command} at the front of the command queue.
+     *
+     * @since 5.0
+     */
+    void addFirst(Command command);
+    /**
+     * Retrieves and removes first {@link Command} from the command queue.
      *
      * @since 5.0
      */
-    Deque<Command> getCommandQueue();
+    Command getCommand();
 
     /**
      * Returns the underlying I/O channel associated with this session.

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java Thu Nov  3 19:28:18 2016
@@ -34,9 +34,6 @@ import java.nio.channels.ByteChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.Deque;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -54,10 +51,6 @@ class IOSessionImpl implements IOSession
 
     private final SelectionKey key;
     private final SocketChannel channel;
-    private final Map<String, Object> attributes;
-    private final Queue<IOSession> closedSessions;
-
-    private final long startedTime;
     private final AtomicInteger status;
     private final AtomicInteger eventMask;
     private final Deque<Command> commandQueue;
@@ -65,37 +58,22 @@ class IOSessionImpl implements IOSession
     private volatile IOEventHandler eventHandler;
     private volatile int socketTimeout;
 
-    private volatile long lastReadTime;
-    private volatile long lastWriteTime;
-    private volatile long lastAccessTime;
-
     /**
      * Creates new instance of IOSessionImpl.
      *
      * @param key the selection key.
      * @param socketChannel the socket channel
-     * @param closedSessions the queue containing closed sessions
      *
      * @since 4.1
      */
-    public IOSessionImpl(
-            final SelectionKey key,
-            final SocketChannel socketChannel,
-            final Queue<IOSession> closedSessions) {
+    public IOSessionImpl(final SelectionKey key, final SocketChannel socketChannel) {
         super();
         this.key = Args.notNull(key, "Selection key");
         this.channel = Args.notNull(socketChannel, "Socket channel");
-        this.closedSessions = closedSessions;
-        this.attributes = new ConcurrentHashMap<>();
         this.commandQueue = new ConcurrentLinkedDeque<>();
         this.socketTimeout = 0;
         this.eventMask = new AtomicInteger(key.interestOps());
         this.status = new AtomicInteger(ACTIVE);
-        final long now = System.currentTimeMillis();
-        this.startedTime = now;
-        this.lastReadTime = now;
-        this.lastWriteTime = now;
-        this.lastAccessTime = now;
     }
 
     @Override
@@ -109,8 +87,20 @@ class IOSessionImpl implements IOSession
     }
 
     @Override
-    public Deque<Command> getCommandQueue() {
-        return commandQueue;
+    public void addLast(final Command command) {
+        commandQueue.addLast(command);
+        setEvent(SelectionKey.OP_WRITE);
+    }
+
+    @Override
+    public void addFirst(final Command command) {
+        commandQueue.addFirst(command);
+        setEvent(SelectionKey.OP_WRITE);
+    }
+
+    @Override
+    public Command getCommand() {
+        return commandQueue.poll();
     }
 
     @Override
@@ -188,15 +178,11 @@ class IOSessionImpl implements IOSession
     @Override
     public void setSocketTimeout(final int timeout) {
         this.socketTimeout = timeout;
-        this.lastAccessTime = System.currentTimeMillis();
     }
 
     @Override
     public void close() {
         if (this.status.compareAndSet(ACTIVE, CLOSED)) {
-            if (this.closedSessions != null) {
-                this.closedSessions.add(this);
-            }
             this.key.cancel();
             this.key.attach(null);
             try {
@@ -226,34 +212,6 @@ class IOSessionImpl implements IOSession
         close();
     }
 
-    public long getStartedTime() {
-        return this.startedTime;
-    }
-
-    public long getLastReadTime() {
-        return this.lastReadTime;
-    }
-
-    public long getLastWriteTime() {
-        return this.lastWriteTime;
-    }
-
-    public long getLastAccessTime() {
-        return this.lastAccessTime;
-    }
-
-    void resetLastRead() {
-        final long now = System.currentTimeMillis();
-        this.lastReadTime = now;
-        this.lastAccessTime = now;
-    }
-
-    void resetLastWrite() {
-        final long now = System.currentTimeMillis();
-        this.lastWriteTime = now;
-        this.lastAccessTime = now;
-    }
-
     private static void formatOps(final StringBuilder buffer, final int ops) {
         if ((ops & SelectionKey.OP_READ) > 0) {
             buffer.append('r');

Added: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java?rev=1767954&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java Thu Nov  3 19:28:18 2016
@@ -0,0 +1,288 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.channels.ByteChannel;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.reactor.ssl.SSLIOSession;
+import org.apache.hc.core5.util.Asserts;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+class ManagedIOSession implements IOSession {
+
+    private final IOSession ioSession;
+    private final AtomicReference<SSLIOSession> tlsSessionRef;
+    private final Queue<ManagedIOSession> closedSessions;
+    private final AtomicBoolean closed;
+
+    private volatile long lastAccessTime;
+
+    ManagedIOSession(final IOSession ioSession, final Queue<ManagedIOSession> closedSessions) {
+        this.ioSession = ioSession;
+        this.closedSessions = closedSessions;
+        this.tlsSessionRef = new AtomicReference<>(null);
+        this.closed = new AtomicBoolean(false);
+        updateAccessTime();
+    }
+
+    void updateAccessTime() {
+        this.lastAccessTime = System.currentTimeMillis();
+    }
+
+    long getLastAccessTime() {
+        return lastAccessTime;
+    }
+
+    private IOSession getSessionImpl() {
+        final SSLIOSession tlsSession = tlsSessionRef.get();
+        if (tlsSession != null) {
+            return tlsSession;
+        } else {
+            return ioSession;
+        }
+    }
+
+    IOEventHandler getEventHandler() {
+        final IOEventHandler handler = ioSession.getHandler();
+        Asserts.notNull(handler, "IO event handler");
+        return handler;
+    }
+
+    void onConnected() {
+        try {
+            final IOEventHandler handler = getEventHandler();
+            final SSLIOSession tlsSession = tlsSessionRef.get();
+            if (tlsSession != null) {
+                try {
+                    if (tlsSession.isInitialized()) {
+                        tlsSession.initialize();
+                    }
+                    handler.connected(this);
+                } catch (Exception ex) {
+                    handler.exception(tlsSession, ex);
+                }
+            } else {
+                handler.connected(this);
+            }
+        } catch (RuntimeException ex) {
+            shutdown();
+            throw ex;
+        }
+    }
+
+    void onInputReady() {
+        try {
+            final IOEventHandler handler = getEventHandler();
+            final SSLIOSession tlsSession = tlsSessionRef.get();
+            if (tlsSession != null) {
+                try {
+                    if (!tlsSession.isInitialized()) {
+                        tlsSession.initialize();
+                    }
+                    if (tlsSession.isAppInputReady()) {
+                        handler.inputReady(this);
+                    }
+                    tlsSession.inboundTransport();
+                } catch (final IOException ex) {
+                    handler.exception(tlsSession, ex);
+                    tlsSession.shutdown();
+                }
+            } else {
+                handler.inputReady(this);
+            }
+        } catch (RuntimeException ex) {
+            shutdown();
+            throw ex;
+        }
+    }
+
+    void onOutputReady() {
+        try {
+            final IOEventHandler handler = getEventHandler();
+            final SSLIOSession tlsSession = tlsSessionRef.get();
+            if (tlsSession != null) {
+                try {
+                    if (!tlsSession.isInitialized()) {
+                        tlsSession.initialize();
+                    }
+                    if (tlsSession.isAppOutputReady()) {
+                        handler.outputReady(this);
+                    }
+                    tlsSession.outboundTransport();
+                } catch (final IOException ex) {
+                    handler.exception(tlsSession, ex);
+                    tlsSession.shutdown();
+                }
+            } else {
+                handler.outputReady(this);
+            }
+        } catch (RuntimeException ex) {
+            shutdown();
+            throw ex;
+        }
+    }
+
+    void onTimeout() {
+        try {
+            final IOEventHandler handler = getEventHandler();
+            handler.timeout(this);
+            final SSLIOSession tlsSession = tlsSessionRef.get();
+            if (tlsSession != null) {
+                if (tlsSession.isOutboundDone() && !tlsSession.isInboundDone()) {
+                    // The session failed to terminate cleanly
+                    tlsSession.shutdown();
+                }
+            }
+        } catch (RuntimeException ex) {
+            shutdown();
+            throw ex;
+        }
+    }
+
+    void onDisconnected() {
+        final IOEventHandler handler = getEventHandler();
+        handler.disconnected(this);
+    }
+
+    @Override
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            try {
+                getSessionImpl().close();
+            } finally {
+                closedSessions.add(this);
+            }
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        if (closed.compareAndSet(false, true)) {
+            try {
+                getSessionImpl().shutdown();
+            } finally {
+                closedSessions.add(this);
+            }
+        }
+    }
+
+    @Override
+    public int getStatus() {
+        return getSessionImpl().getStatus();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return getSessionImpl().isClosed();
+    }
+
+    @Override
+    public IOEventHandler getHandler() {
+        return ioSession.getHandler();
+    }
+
+    public void setHandler(final IOEventHandler eventHandler) {
+        ioSession.setHandler(eventHandler);
+    }
+
+    @Override
+    public void addLast(final Command command) {
+        ioSession.addLast(command);
+    }
+
+    @Override
+    public void addFirst(final Command command) {
+        ioSession.addFirst(command);
+    }
+
+    @Override
+    public Command getCommand() {
+        return ioSession.getCommand();
+    }
+
+    @Override
+    public ByteChannel channel() {
+        return getSessionImpl().channel();
+    }
+
+    @Override
+    public SocketAddress getRemoteAddress() {
+        return ioSession.getRemoteAddress();
+    }
+
+    @Override
+    public SocketAddress getLocalAddress() {
+        return ioSession.getLocalAddress();
+    }
+
+    @Override
+    public int getEventMask() {
+        return getSessionImpl().getEventMask();
+    }
+
+    @Override
+    public void setEventMask(final int ops) {
+        getSessionImpl().setEventMask(ops);
+    }
+
+    @Override
+    public void setEvent(final int op) {
+        getSessionImpl().setEvent(op);
+    }
+
+    @Override
+    public void clearEvent(final int op) {
+        getSessionImpl().clearEvent(op);
+    }
+
+    @Override
+    public int getSocketTimeout() {
+        return ioSession.getSocketTimeout();
+    }
+
+    @Override
+    public void setSocketTimeout(final int timeout) {
+        ioSession.setSocketTimeout(timeout);
+    }
+
+    @Override
+    public String toString() {
+        return getSessionImpl().toString();
+    }
+
+}

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java Thu Nov  3 19:28:18 2016
@@ -34,7 +34,6 @@ import java.nio.channels.ByteChannel;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
-import java.util.Deque;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
@@ -58,28 +57,12 @@ import org.apache.hc.core5.util.Asserts;
  * {@code SSLIOSession} is a decorator class intended to transparently extend
  * an {@link IOSession} with transport layer security capabilities based on
  * the SSL/TLS protocol.
- * <p>
- * The resultant instance of {@code SSLIOSession} must be added to the original
- * I/O session as an attribute with the {@link #SESSION_KEY} key.
- * <pre>
- *  SSLContext sslcontext = SSLContext.getInstance("SSL");
- *  sslcontext.init(null, null, null);
- *  SSLIOSession sslsession = new SSLIOSession(
- *      iosession, SSLMode.CLIENT, sslcontext, null);
- *  iosession.setAttribute(SSLIOSession.SESSION_KEY, sslsession);
- * </pre>
  *
  * @since 4.2
  */
 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
 public class SSLIOSession implements IOSession {
 
-    /**
-     * Name of the context attribute key, which can be used to obtain the
-     * SSL session.
-     */
-    public static final String SESSION_KEY = "http.session.ssl";
-
     private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
 
     private final IOSession session;
@@ -179,10 +162,6 @@ public class SSLIOSession implements IOS
         this(session, sslMode, null, sslContext, handler);
     }
 
-    protected SSLSetupHandler getSSLSetupHandler() {
-        return this.handler;
-    }
-
     /**
      * Returns {@code true} is the session has been fully initialized,
      * {@code false} otherwise.
@@ -644,8 +623,18 @@ public class SSLIOSession implements IOS
     }
 
     @Override
-    public Deque<Command> getCommandQueue() {
-        return this.session.getCommandQueue();
+    public void addLast(final Command command) {
+        this.session.addLast(command);
+    }
+
+    @Override
+    public void addFirst(final Command command) {
+        this.session.addFirst(command);
+    }
+
+    @Override
+    public Command getCommand() {
+        return this.session.getCommand();
     }
 
     @Override



Mime
View raw message