hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject httpcomponents-core git commit: Improved non-blocking i/o session command API
Date Sun, 11 Feb 2018 14:05:03 GMT
Repository: httpcomponents-core
Updated Branches:
  refs/heads/master 65e51b1a5 -> 7bcb19686


Improved non-blocking i/o session command API


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/7bcb1968
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/7bcb1968
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/7bcb1968

Branch: refs/heads/master
Commit: 7bcb1968665c70f892922312f4192c3177b11b8d
Parents: 65e51b1
Author: Oleg Kalnichevski <olegk@apache.org>
Authored: Sat Feb 10 20:04:05 2018 +0100
Committer: Oleg Kalnichevski <olegk@apache.org>
Committed: Sun Feb 11 14:51:35 2018 +0100

----------------------------------------------------------------------
 .../impl/nio/AbstractHttp2StreamMultiplexer.java  |  8 ++++----
 .../impl/nio/ClientHttpProtocolNegotiator.java    |  4 ++--
 .../nio/Http2OnlyClientProtocolNegotiator.java    |  4 ++--
 .../nio/bootstrap/Http2MultiplexingRequester.java |  7 ++++---
 .../hc/core5/http2/nio/pool/H2ConnPool.java       |  7 ++++---
 .../core5/testing/nio/ClientSessionEndpoint.java  | 10 +++++-----
 .../hc/core5/testing/nio/IOReactorExecutor.java   |  3 ++-
 .../hc/core5/testing/nio/LoggingIOSession.java    | 17 +++++++----------
 .../core5/testing/nio/Http2IntegrationTest.java   |  3 ++-
 .../http/impl/bootstrap/HttpAsyncRequester.java   |  5 +++--
 .../http/impl/bootstrap/HttpAsyncServer.java      |  3 ++-
 .../impl/nio/AbstractHttp1StreamDuplexer.java     | 14 +++++++-------
 .../java/org/apache/hc/core5/reactor/Command.java |  2 ++
 .../org/apache/hc/core5/reactor/IOSession.java    | 10 +++++-----
 .../apache/hc/core5/reactor/IOSessionImpl.java    | 15 +++++++++------
 .../hc/core5/reactor/InternalDataChannel.java     | 12 ++++++------
 .../apache/hc/core5/reactor/ssl/SSLIOSession.java | 18 ++++++------------
 17 files changed, 72 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
index d5a9b64..ac8203d 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
@@ -549,7 +549,7 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable,
HttpConne
             stream.cancel();
         }
         for (;;) {
-            final Command command = ioSession.getCommand();
+            final Command command = ioSession.poll();
             if (command != null) {
                 if (command instanceof ExecutionCommand) {
                     final AsyncClientExchangeHandler exchangeHandler = ((ExecutionCommand)
command).getExchangeHandler();
@@ -566,7 +566,7 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable,
HttpConne
 
     private void processPendingCommands() throws IOException, HttpException {
         while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
-            final Command command = ioSession.getCommand();
+            final Command command = ioSession.poll();
             if (command == null) {
                 break;
             }
@@ -650,7 +650,7 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable,
HttpConne
                 }
             }
             for (;;) {
-                final Command command = ioSession.getCommand();
+                final Command command = ioSession.poll();
                 if (command != null) {
                     if (command instanceof ExecutionCommand) {
                         final ExecutionCommand executionCommand = (ExecutionCommand) command;
@@ -1189,7 +1189,7 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable,
HttpConne
 
     @Override
     public void close() throws IOException {
-        ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+        ioSession.enqueue(new ShutdownCommand(ShutdownType.GRACEFUL), Command.Priority.IMMEDIATE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
index 313222c..180e26a 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
@@ -157,7 +157,7 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
     public void exception(final IOSession session, final Exception cause) {
         try {
             for (;;) {
-                final Command command = ioSession.getCommand();
+                final Command command = ioSession.poll();
                 if (command != null) {
                     if (command instanceof ExecutionCommand) {
                         final ExecutionCommand executionCommand = (ExecutionCommand) command;
@@ -179,7 +179,7 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
     @Override
     public void disconnected(final IOSession session) {
         for (;;) {
-            final Command command = ioSession.getCommand();
+            final Command command = ioSession.poll();
             if (command != null) {
                 if (command instanceof ExecutionCommand) {
                     final ExecutionCommand executionCommand = (ExecutionCommand) command;

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2OnlyClientProtocolNegotiator.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2OnlyClientProtocolNegotiator.java
b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2OnlyClientProtocolNegotiator.java
index c80d8b3..d8cbbcf 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2OnlyClientProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2OnlyClientProtocolNegotiator.java
@@ -140,7 +140,7 @@ public class Http2OnlyClientProtocolNegotiator implements HttpConnectionEventHan
     public void exception(final IOSession session, final Exception cause) {
         try {
             for (;;) {
-                final Command command = ioSession.getCommand();
+                final Command command = ioSession.poll();
                 if (command != null) {
                     if (command instanceof ExecutionCommand) {
                         final ExecutionCommand executionCommand = (ExecutionCommand) command;
@@ -162,7 +162,7 @@ public class Http2OnlyClientProtocolNegotiator implements HttpConnectionEventHan
     @Override
     public void disconnected(final IOSession session) {
         for (;;) {
-            final Command command = ioSession.getCommand();
+            final Command command = ioSession.poll();
             if (command != null) {
                 if (command instanceof ExecutionCommand) {
                     final ExecutionCommand executionCommand = (ExecutionCommand) command;

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
index 8fab172..1972a58 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
@@ -65,6 +65,7 @@ import org.apache.hc.core5.http.protocol.HttpCoreContext;
 import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
 import org.apache.hc.core5.io.ShutdownType;
 import org.apache.hc.core5.net.URIAuthority;
+import org.apache.hc.core5.reactor.Command;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.reactor.IOSession;
@@ -91,7 +92,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
 
             @Override
             public void execute(final IOSession session) {
-                session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+                session.enqueue(new ShutdownCommand(ShutdownType.GRACEFUL), Command.Priority.IMMEDIATE);
             }
 
         }, DefaultAddressResolver.INSTANCE);
@@ -151,7 +152,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
 
                         @Override
                         public void completed(final IOSession ioSession) {
-                            ioSession.addLast(new ExecutionCommand(new AsyncClientExchangeHandler()
{
+                            ioSession.enqueue(new ExecutionCommand(new AsyncClientExchangeHandler()
{
 
                                 @Override
                                 public void releaseResources() {
@@ -209,7 +210,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
                                     exchangeHandler.failed(cause);
                                 }
 
-                            }, cancellableDependency, context));
+                            }, cancellableDependency, context), Command.Priority.NORMAL);
                         }
 
                         @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/pool/H2ConnPool.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/pool/H2ConnPool.java
b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/pool/H2ConnPool.java
index 700698a..3f9d0cf 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/pool/H2ConnPool.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/pool/H2ConnPool.java
@@ -43,6 +43,7 @@ import org.apache.hc.core5.http2.nio.command.PingCommand;
 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
 import org.apache.hc.core5.io.ShutdownType;
 import org.apache.hc.core5.reactor.AbstractIOSessionPool;
+import org.apache.hc.core5.reactor.Command;
 import org.apache.hc.core5.reactor.ConnectionInitiator;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
@@ -85,7 +86,7 @@ public final class H2ConnPool extends AbstractIOSessionPool<HttpHost>
{
             final IOSession ioSession,
             final ShutdownType shutdownType) {
         if (shutdownType == ShutdownType.GRACEFUL) {
-            ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+            ioSession.enqueue(new ShutdownCommand(ShutdownType.GRACEFUL), Command.Priority.NORMAL);
         } else {
             ioSession.shutdown(shutdownType);
         }
@@ -138,7 +139,7 @@ public final class H2ConnPool extends AbstractIOSessionPool<HttpHost>
{
             final long deadline = lastAccessTime + timeValue.toMillis();
             if (deadline <= System.currentTimeMillis()) {
                 final int socketTimeout = ioSession.getSocketTimeout();
-                ioSession.addLast(new PingCommand(new BasicPingHandler(new Callback<Boolean>()
{
+                ioSession.enqueue(new PingCommand(new BasicPingHandler(new Callback<Boolean>()
{
 
                     @Override
                     public void execute(final Boolean result) {
@@ -146,7 +147,7 @@ public final class H2ConnPool extends AbstractIOSessionPool<HttpHost>
{
                         callback.execute(result);
                     }
 
-                })));
+                })), Command.Priority.NORMAL);
                 return;
             }
         }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java
b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java
index 8da31b5..bbca9c0 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java
@@ -65,8 +65,8 @@ public final class ClientSessionEndpoint implements GracefullyCloseable
{
         this.closed = new AtomicBoolean(false);
     }
 
-    public void execute(final Command command) {
-        ioSession.addLast(command);
+    public void execute(final Command command, final Command.Priority priority) {
+        ioSession.enqueue(command, priority);
         if (ioSession.isClosed()) {
             command.cancel();
         }
@@ -77,7 +77,7 @@ public final class ClientSessionEndpoint implements GracefullyCloseable
{
             final HttpContext context) {
         Asserts.check(!closed.get(), "Connection is already closed");
         final Command executionCommand = new ExecutionCommand(exchangeHandler, context);
-        execute(executionCommand);
+        execute(executionCommand, Command.Priority.NORMAL);
     }
 
     public <T> Future<T> execute(
@@ -125,7 +125,7 @@ public final class ClientSessionEndpoint implements GracefullyCloseable
{
     public void shutdown(final ShutdownType shutdownType) {
         if (closed.compareAndSet(false, true)) {
             if (shutdownType == ShutdownType.GRACEFUL) {
-                ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+                ioSession.enqueue(new ShutdownCommand(ShutdownType.GRACEFUL), Command.Priority.NORMAL);
             } else {
                 ioSession.shutdown(shutdownType);
             }
@@ -135,7 +135,7 @@ public final class ClientSessionEndpoint implements GracefullyCloseable
{
     @Override
     public void close() throws IOException {
         if (closed.compareAndSet(false, true)) {
-            ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+            ioSession.enqueue(new ShutdownCommand(ShutdownType.GRACEFUL), Command.Priority.IMMEDIATE);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
index 04e1c99..988f5b2 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
 import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.Command;
 import org.apache.hc.core5.reactor.ExceptionEvent;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
@@ -79,7 +80,7 @@ abstract class IOReactorExecutor<T extends IOReactorService> implements
AutoClos
 
                     @Override
                     public void execute(final IOSession session) {
-                        session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+                        session.enqueue(new ShutdownCommand(ShutdownType.GRACEFUL), Command.Priority.NORMAL);
                     }
 
                 }))) {

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java
b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java
index ac6c245..5bd07ac 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java
@@ -70,24 +70,21 @@ public class LoggingIOSession implements IOSession {
     }
 
     @Override
-    public void addLast(final Command command) {
-        this.session.addLast(command);
+    public void enqueue(final Command command, final Command.Priority priority) {
+        this.session.enqueue(command, priority);
         if (this.log.isDebugEnabled()) {
-            this.log.debug(command.getClass().getSimpleName() + " added last");
+            this.log.debug("Enqueued " + command.getClass().getSimpleName() + " with priority
" + priority);
         }
     }
 
     @Override
-    public void addFirst(final Command command) {
-        this.session.addFirst(command);
-        if (this.log.isDebugEnabled()) {
-            this.log.debug(command.getClass().getSimpleName() + " added first");
-        }
+    public boolean hasCommands() {
+        return this.session.hasCommands();
     }
 
     @Override
-    public Command getCommand() {
-        return this.session.getCommand();
+    public Command poll() {
+        return this.session.poll();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
index 892f8c6..28fdbb6 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
@@ -110,6 +110,7 @@ import org.apache.hc.core5.http2.H2StreamResetException;
 import org.apache.hc.core5.http2.config.H2Config;
 import org.apache.hc.core5.http2.nio.command.PingCommand;
 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
+import org.apache.hc.core5.reactor.Command;
 import org.apache.hc.core5.reactor.ExceptionEvent;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.reactor.IOSession;
@@ -1004,7 +1005,7 @@ public class Http2IntegrationTest extends InternalHttp2ServerTestBase
{
                     latch.countDown();
                 }
 
-            })));
+            })), Command.Priority.NORMAL);
 
         }
         Assert.assertTrue(latch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
index a5745fc..e55f5b6 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
@@ -67,6 +67,7 @@ import org.apache.hc.core5.pool.ConnPoolControl;
 import org.apache.hc.core5.pool.ManagedConnPool;
 import org.apache.hc.core5.pool.PoolEntry;
 import org.apache.hc.core5.pool.PoolStats;
+import org.apache.hc.core5.reactor.Command;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.reactor.IOSession;
@@ -95,7 +96,7 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
 
             @Override
             public void execute(final IOSession session) {
-                session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+                session.enqueue(new ShutdownCommand(ShutdownType.GRACEFUL), Command.Priority.IMMEDIATE);
             }
 
         }, DefaultAddressResolver.INSTANCE);
@@ -420,7 +421,7 @@ public class HttpAsyncRequester extends AsyncRequester implements ConnPoolContro
             if (ioSession == null) {
                 throw new IllegalStateException("I/O session is invalid");
             }
-            ioSession.addLast(new ExecutionCommand(exchangeHandler, context));
+            ioSession.enqueue(new ExecutionCommand(exchangeHandler, context), Command.Priority.NORMAL);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java
b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java
index e1372c7..f8f62d7 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java
@@ -30,6 +30,7 @@ import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.function.Decorator;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
 import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.Command;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.reactor.IOSession;
@@ -49,7 +50,7 @@ public class HttpAsyncServer extends AsyncServer {
 
             @Override
             public void execute(final IOSession session) {
-                session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+                session.enqueue(new ShutdownCommand(ShutdownType.GRACEFUL), Command.Priority.NORMAL);
             }
 
         });

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
index db7fedd..f363212 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
@@ -76,8 +76,8 @@ import org.apache.hc.core5.io.ShutdownType;
 import org.apache.hc.core5.reactor.Command;
 import org.apache.hc.core5.reactor.EventMask;
 import org.apache.hc.core5.reactor.IOEventHandler;
-import org.apache.hc.core5.reactor.ProtocolLayer;
 import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ProtocolLayer;
 import org.apache.hc.core5.reactor.ssl.SSLBufferManagement;
 import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
 import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
@@ -152,7 +152,7 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends
HttpMessage,
     void shutdownSession(final ShutdownType shutdownType) {
         if (shutdownType == ShutdownType.GRACEFUL) {
             connState = ConnectionState.GRACEFUL_SHUTDOWN;
-            ioSession.addLast(new ShutdownCommand(ShutdownType.GRACEFUL));
+            ioSession.enqueue(new ShutdownCommand(ShutdownType.GRACEFUL), Command.Priority.NORMAL);
         } else {
             connState = ConnectionState.SHUTDOWN;
             ioSession.close();
@@ -218,7 +218,7 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends
HttpMessage,
 
     private void processCommands() throws HttpException, IOException {
         for (;;) {
-            final Command command = ioSession.getCommand();
+            final Command command = ioSession.poll();
             if (command == null) {
                 return;
             }
@@ -408,7 +408,7 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends
HttpMessage,
     public final void onException(final Exception ex) {
         shutdownSession(ex);
         for (;;) {
-            final Command command = ioSession.getCommand();
+            final Command command = ioSession.poll();
             if (command != null) {
                 if (command instanceof ExecutionCommand) {
                     final AsyncClientExchangeHandler exchangeHandler = ((ExecutionCommand)
command).getExchangeHandler();
@@ -426,7 +426,7 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends
HttpMessage,
     public final void onDisconnect() {
         disconnected();
         for (;;) {
-            final Command command = ioSession.getCommand();
+            final Command command = ioSession.poll();
             if (command != null) {
                 if (command instanceof ExecutionCommand) {
                     final AsyncClientExchangeHandler exchangeHandler = ((ExecutionCommand)
command).getExchangeHandler();
@@ -559,12 +559,12 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends
HttpMessage,
 
     @Override
     public void close() throws IOException {
-        ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+        ioSession.enqueue(new ShutdownCommand(ShutdownType.GRACEFUL), Command.Priority.NORMAL);
     }
 
     @Override
     public void shutdown(final ShutdownType shutdownType) {
-        ioSession.addFirst(new ShutdownCommand(shutdownType));
+        ioSession.enqueue(new ShutdownCommand(shutdownType), Command.Priority.IMMEDIATE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5/src/main/java/org/apache/hc/core5/reactor/Command.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/Command.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/Command.java
index f2c1674..62c945e 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/Command.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/Command.java
@@ -36,4 +36,6 @@ import org.apache.hc.core5.concurrent.Cancellable;
  */
 public interface Command extends Cancellable {
 
+    enum Priority { NORMAL, IMMEDIATE }
+
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
index 19b48e5..28a222a 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
@@ -69,21 +69,21 @@ public interface IOSession extends GracefullyCloseable, Identifiable {
      *
      * @since 5.0
      */
-    void addLast(Command command);
+    void enqueue(Command command, Command.Priority priority);
 
     /**
-     * Inserts {@link Command} at the front of the command queue.
+     * Tests if there enqueued commands pending execution.
      *
      * @since 5.0
      */
-    void addFirst(Command command);
+    boolean hasCommands();
 
     /**
-     * Retrieves and removes first {@link Command} from the command queue.
+     * Removes first {@link Command} from the command queue if available.
      *
      * @since 5.0
      */
-    Command getCommand();
+    Command poll();
 
     /**
      * Returns the underlying I/O channel associated with this session.

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
index b486ace..46b43b6 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
@@ -88,19 +88,22 @@ class IOSessionImpl implements IOSession {
     }
 
     @Override
-    public void addLast(final Command command) {
-        commandQueue.addLast(command);
+    public void enqueue(final Command command, final Command.Priority priority) {
+        if (priority == Command.Priority.IMMEDIATE) {
+            commandQueue.addFirst(command);
+        } else {
+            commandQueue.add(command);
+        }
         setEvent(SelectionKey.OP_WRITE);
     }
 
     @Override
-    public void addFirst(final Command command) {
-        commandQueue.addFirst(command);
-        setEvent(SelectionKey.OP_WRITE);
+    public boolean hasCommands() {
+        return !commandQueue.isEmpty();
     }
 
     @Override
-    public Command getCommand() {
+    public Command poll() {
         return commandQueue.poll();
     }
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
index ab28092..bad70f7 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
@@ -298,18 +298,18 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
     }
 
     @Override
-    public void addLast(final Command command) {
-        getSessionImpl().addLast(command);
+    public void enqueue(final Command command, final Command.Priority priority) {
+        getSessionImpl().enqueue(command, priority);
     }
 
     @Override
-    public void addFirst(final Command command) {
-        getSessionImpl().addFirst(command);
+    public boolean hasCommands() {
+        return getSessionImpl().hasCommands();
     }
 
     @Override
-    public Command getCommand() {
-        return getSessionImpl().getCommand();
+    public Command poll() {
+        return getSessionImpl().poll();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/7bcb1968/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
index 47f0bec..afaa81a 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
@@ -696,10 +696,10 @@ public class SSLIOSession implements IOSession {
     }
 
     @Override
-    public void addLast(final Command command) {
+    public void enqueue(final Command command, final Command.Priority priority) {
         this.session.lock().lock();
         try {
-            this.session.addLast(command);
+            this.session.enqueue(command, priority);
             setEvent(SelectionKey.OP_WRITE);
         } finally {
             this.session.lock().unlock();
@@ -707,19 +707,13 @@ public class SSLIOSession implements IOSession {
     }
 
     @Override
-    public void addFirst(final Command command) {
-        this.session.lock().lock();
-        try {
-            this.session.addFirst(command);
-            setEvent(SelectionKey.OP_WRITE);
-        } finally {
-            this.session.lock().unlock();
-        }
+    public boolean hasCommands() {
+        return this.session.hasCommands();
     }
 
     @Override
-    public Command getCommand() {
-        return this.session.getCommand();
+    public Command poll() {
+        return this.session.poll();
     }
 
     @Override


Mime
View raw message