hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r1762692 - in /httpcomponents/httpcore/trunk/httpcore5-h2/src: main/java/org/apache/hc/core5/http2/impl/nio/ main/java/org/apache/hc/core5/http2/nio/ main/java/org/apache/hc/core5/http2/nio/command/ test/java/org/apache/hc/core5/http2/integ...
Date Wed, 28 Sep 2016 17:26:17 GMT
Author: olegk
Date: Wed Sep 28 17:26:17 2016
New Revision: 1762692

URL: http://svn.apache.org/viewvc?rev=1762692&view=rev
Log:
RFC 7230: Improved handling of out of sequence responses

Added:
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractAsyncServerExchangeHandler.java   (contents, props changed)
      - copied, changed from r1762528, httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractAsyncExchangeHandler.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractClassicServerExchangeHandler.java   (contents, props changed)
      - copied, changed from r1762528, httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractClassicExchangeHandler.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncClientExchangeHandler.java
      - copied, changed from r1762528, httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncResponseConsumer.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncServerExchangeHandler.java   (contents, props changed)
      - copied, changed from r1762528, httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncExchangeHandler.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicClientExchangeHandler.java   (with props)
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/RequestChannel.java
      - copied, changed from r1762528, httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncRequestConsumer.java
Removed:
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractAsyncExchangeHandler.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractClassicExchangeHandler.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncExchangeHandler.java
Modified:
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiatorFactory.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncRequestConsumer.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncResponseConsumer.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicRequestConsumer.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicResponseConsumer.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicResponseProducer.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/FixedResponseExchangeHandler.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/ClientCommandEndpoint.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/ExecutionCommand.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/integration/Http2IntegrationTest.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/integration/Http2TestServer.java
    httpcomponents/httpcore/trunk/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/integration/InternalServerHttp2EventHandlerFactory.java

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java Wed Sep 28 17:26:17 2016
@@ -524,21 +524,18 @@ abstract class AbstractHttp2StreamMultip
                 if (mode == Mode.SERVER) {
                     throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Illegal attempt to execute a request");
                 }
-                @SuppressWarnings("unchecked")
-                final ExecutionCommand<Object> executionCommand = (ExecutionCommand<Object>) command;
+                final ExecutionCommand executionCommand = (ExecutionCommand) command;
                 final int streamId = generateStreamId();
                 final Http2StreamChannelImpl channel = new Http2StreamChannelImpl(
                         streamId,
                         localConfig.getInitialWindowSize(),
                         remoteConfig.getInitialWindowSize());
-                final Http2StreamHandler streamHandler = new ClientHttp2StreamHandler<>(
+                final Http2StreamHandler streamHandler = new ClientHttp2StreamHandler(
                         channel,
                         httpProcessor,
                         connMetrics,
-                        executionCommand.getRequestProducer(),
-                        executionCommand.getResponseConsumer(),
-                        executionCommand.getContext(),
-                        executionCommand.getCallback());
+                        executionCommand.getExchangeHandler(),
+                        executionCommand.getContext());
                 final Http2Stream stream = new Http2Stream(channel, streamHandler, false);
                 if (stream.isOutputReady()) {
                     stream.produceOutput();

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java Wed Sep 28 17:26:17 2016
@@ -31,7 +31,6 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
@@ -45,23 +44,24 @@ import org.apache.hc.core5.http.impl.Bas
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http.protocol.HttpCoreContext;
 import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.http2.H2ConnectionException;
+import org.apache.hc.core5.http2.H2Error;
 import org.apache.hc.core5.http2.impl.DefaultH2RequestConverter;
 import org.apache.hc.core5.http2.impl.DefaultH2ResponseConverter;
 import org.apache.hc.core5.http2.impl.IncomingEntityDetails;
-import org.apache.hc.core5.http2.nio.AsyncRequestProducer;
-import org.apache.hc.core5.http2.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http2.nio.AsyncClientExchangeHandler;
 import org.apache.hc.core5.http2.nio.DataStreamChannel;
+import org.apache.hc.core5.http2.nio.RequestChannel;
 
-class ClientHttp2StreamHandler<T> implements Http2StreamHandler {
+class ClientHttp2StreamHandler implements Http2StreamHandler {
 
     private final Http2StreamChannel outputChannel;
     private final DataStreamChannel dataChannel;
     private final HttpProcessor httpProcessor;
     private final BasicHttpConnectionMetrics connMetrics;
-    private final AsyncRequestProducer requestProducer;
-    private final AsyncResponseConsumer<T> responseConsumer;
+    private final AsyncClientExchangeHandler exchangeHandler;
     private final HttpCoreContext context;
-    private final FutureCallback<T> resultCallback;
+    private final AtomicBoolean requestCommitted;
     private final AtomicBoolean done;
 
     private volatile MessageState requestState;
@@ -71,10 +71,8 @@ class ClientHttp2StreamHandler<T> implem
             final Http2StreamChannel outputChannel,
             final HttpProcessor httpProcessor,
             final BasicHttpConnectionMetrics connMetrics,
-            final AsyncRequestProducer requestProducer,
-            final AsyncResponseConsumer<T> responseConsumer,
-            final HttpContext context,
-            final FutureCallback<T> resultCallback) {
+            final AsyncClientExchangeHandler exchangeHandler,
+            final HttpContext context) {
         this.outputChannel = outputChannel;
         this.dataChannel = new DataStreamChannel() {
 
@@ -103,10 +101,9 @@ class ClientHttp2StreamHandler<T> implem
         };
         this.httpProcessor = httpProcessor;
         this.connMetrics = connMetrics;
-        this.requestProducer = requestProducer;
-        this.responseConsumer = responseConsumer;
-        this.resultCallback = resultCallback;
+        this.exchangeHandler = exchangeHandler;
         this.context = HttpCoreContext.adapt(context);
+        this.requestCommitted = new AtomicBoolean(false);
         this.done = new AtomicBoolean(false);
         this.requestState = MessageState.HEADERS;
         this.responseState = MessageState.HEADERS;
@@ -118,43 +115,51 @@ class ClientHttp2StreamHandler<T> implem
             case HEADERS:
                 return true;
             case BODY:
-                return requestProducer.available() > 0;
+                return exchangeHandler.available() > 0;
             default:
                 return false;
         }
     }
 
+    private void commitRequest(final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
+        if (requestCommitted.compareAndSet(false, true)) {
+            context.setProtocolVersion(HttpVersion.HTTP_2);
+            context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
+            context.setAttribute(HttpCoreContext.HTTP_CONNECTION, this);
+            httpProcessor.process(request, entityDetails, context);
+            connMetrics.incrementRequestCount();
+
+            final List<Header> headers = DefaultH2RequestConverter.INSTANCE.convert(request);
+            outputChannel.submit(headers, entityDetails == null);
+
+            if (entityDetails == null) {
+                requestState = MessageState.COMPLETE;
+            } else {
+                final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
+                final boolean expectContinue = h != null && "100-continue".equalsIgnoreCase(h.getValue());
+                requestState = expectContinue ? MessageState.ACK : MessageState.BODY;
+            }
+        } else {
+            throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Request already committed");
+        }
+    }
+
     @Override
     public void produceOutput() throws HttpException, IOException {
         switch (requestState) {
             case HEADERS:
-                final HttpRequest request = requestProducer.produceRequest();
-                final EntityDetails entityDetails = requestProducer.getEntityDetails();
+                exchangeHandler.submitRequest(new RequestChannel() {
 
-                context.setProtocolVersion(HttpVersion.HTTP_2);
-                context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
-                context.setAttribute(HttpCoreContext.HTTP_CONNECTION, this);
-                httpProcessor.process(request, entityDetails, context);
-                connMetrics.incrementRequestCount();
-
-                final List<Header> headers = DefaultH2RequestConverter.INSTANCE.convert(request);
-                outputChannel.submit(headers, entityDetails == null);
-
-                if (entityDetails == null) {
-                    requestState = MessageState.COMPLETE;
-                } else {
-                    final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
-                    final boolean expectContinue = h != null && "100-continue".equalsIgnoreCase(h.getValue());
-                    if (expectContinue) {
-                        requestState = MessageState.ACK;
-                    } else {
-                        requestState = MessageState.BODY;
-                        requestProducer.dataStart(dataChannel);
+                    @Override
+                    public void sendRequest(
+                            final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
+                        commitRequest(request, entityDetails);
                     }
-                }
+
+                });
                 break;
             case BODY:
-                requestProducer.produce(dataChannel);
+                exchangeHandler.produce(dataChannel);
                 break;
         }
     }
@@ -175,7 +180,7 @@ class ClientHttp2StreamHandler<T> implem
         if (response.getCode() < 200) {
             if (response.getCode() == HttpStatus.SC_CONTINUE && requestState == MessageState.ACK) {
                 requestState = MessageState.BODY;
-                requestProducer.dataStart(dataChannel);
+                exchangeHandler.produce(dataChannel);
             }
             return;
         }
@@ -184,41 +189,13 @@ class ClientHttp2StreamHandler<T> implem
         httpProcessor.process(response, entityDetails, context);
         connMetrics.incrementResponseCount();
 
-        responseConsumer.consumeResponse(response, entityDetails, new FutureCallback<T>() {
-
-            @Override
-            public void completed(final T result) {
-                if (resultCallback != null) {
-                    resultCallback.completed(result);
-                }
-            }
-
-            @Override
-            public void failed(final Exception ex) {
-                if (resultCallback != null) {
-                    resultCallback.failed(ex);
-                }
-            }
-
-            @Override
-            public void cancelled() {
-                if (resultCallback != null) {
-                    resultCallback.cancelled();
-                }
-            }
-
-        });
-        if (endStream) {
-            responseState = MessageState.COMPLETE;
-            responseConsumer.streamEnd(null);
-        } else {
-            responseState = MessageState.BODY;
-        }
+        exchangeHandler.consumeResponse(response, entityDetails);
+        responseState = endStream ? MessageState.COMPLETE : MessageState.BODY;
     }
 
     @Override
     public void updateInputCapacity() throws IOException {
-        responseConsumer.updateCapacity(outputChannel);
+        exchangeHandler.updateCapacity(outputChannel);
     }
 
     @Override
@@ -227,21 +204,18 @@ class ClientHttp2StreamHandler<T> implem
             throw new ProtocolException("Unexpected message data");
         }
         if (src != null) {
-            responseConsumer.consume(src);
+            exchangeHandler.consume(src);
         }
         if (endStream) {
             responseState = MessageState.COMPLETE;
-            responseConsumer.streamEnd(null);
+            exchangeHandler.streamEnd(null);
         }
     }
 
     @Override
     public void failed(final Exception cause) {
         try {
-            if (resultCallback != null) {
-                resultCallback.failed(cause);
-            }
-            requestProducer.failed(cause);
+            exchangeHandler.failed(cause);
         } finally {
             releaseResources();
         }
@@ -250,9 +224,7 @@ class ClientHttp2StreamHandler<T> implem
     @Override
     public void cancel() {
         try {
-            if (resultCallback != null) {
-                resultCallback.cancelled();
-            }
+            exchangeHandler.cancel();
         } finally {
             releaseResources();
         }
@@ -262,9 +234,8 @@ class ClientHttp2StreamHandler<T> implem
     public void releaseResources() {
         if (done.compareAndSet(false, true)) {
             responseState = MessageState.COMPLETE;
-            responseConsumer.releaseResources();
             requestState = MessageState.COMPLETE;
-            requestProducer.releaseResources();
+            exchangeHandler.releaseResources();
         }
     }
 

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java Wed Sep 28 17:26:17 2016
@@ -49,8 +49,8 @@ import org.apache.hc.core5.http2.H2Error
 import org.apache.hc.core5.http2.impl.DefaultH2RequestConverter;
 import org.apache.hc.core5.http2.impl.DefaultH2ResponseConverter;
 import org.apache.hc.core5.http2.impl.IncomingEntityDetails;
-import org.apache.hc.core5.http2.nio.AsyncExchangeHandler;
 import org.apache.hc.core5.http2.nio.AsyncPushProducer;
+import org.apache.hc.core5.http2.nio.AsyncServerExchangeHandler;
 import org.apache.hc.core5.http2.nio.DataStreamChannel;
 import org.apache.hc.core5.http2.nio.ExpectationChannel;
 import org.apache.hc.core5.http2.nio.HandlerFactory;
@@ -63,14 +63,12 @@ public class ServerHttp2StreamHandler im
     private final DataStreamChannel dataChannel;
     private final HttpProcessor httpProcessor;
     private final BasicHttpConnectionMetrics connMetrics;
-    private final HandlerFactory<AsyncExchangeHandler> exchangeHandlerFactory;
+    private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
     private final HttpCoreContext context;
     private final AtomicBoolean responseCommitted;
     private final AtomicBoolean done;
 
-    private volatile HttpRequest request;
-    private volatile EntityDetails requestEntityDetails;
-    private volatile AsyncExchangeHandler exchangeHandler;
+    private volatile AsyncServerExchangeHandler exchangeHandler;
     private volatile MessageState requestState;
     private volatile MessageState responseState;
 
@@ -78,7 +76,7 @@ public class ServerHttp2StreamHandler im
             final Http2StreamChannel outputChannel,
             final HttpProcessor httpProcessor,
             final BasicHttpConnectionMetrics connMetrics,
-            final HandlerFactory<AsyncExchangeHandler> exchangeHandlerFactory) {
+            final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory) {
         this.outputChannel = outputChannel;
         this.dataChannel = new DataStreamChannel() {
 

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java Wed Sep 28 17:26:17 2016
@@ -35,7 +35,7 @@ import org.apache.hc.core5.http2.config.
 import org.apache.hc.core5.http2.frame.DefaultFrameFactory;
 import org.apache.hc.core5.http2.frame.FrameFactory;
 import org.apache.hc.core5.http2.frame.StreamIdGenerator;
-import org.apache.hc.core5.http2.nio.AsyncExchangeHandler;
+import org.apache.hc.core5.http2.nio.AsyncServerExchangeHandler;
 import org.apache.hc.core5.http2.nio.HandlerFactory;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.util.Args;
@@ -47,13 +47,13 @@ import org.apache.hc.core5.util.Args;
  */
 public class ServerHttp2StreamMultiplexer extends AbstractHttp2StreamMultiplexer {
 
-    private final HandlerFactory<AsyncExchangeHandler> exchangeHandlerFactory;
+    private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
 
     public ServerHttp2StreamMultiplexer(
             final IOSession ioSession,
             final FrameFactory frameFactory,
             final HttpProcessor httpProcessor,
-            final HandlerFactory<AsyncExchangeHandler> exchangeHandlerFactory,
+            final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
             final Charset charset,
             final H2Config h2Config,
             final Http2StreamListener callback) {
@@ -64,7 +64,7 @@ public class ServerHttp2StreamMultiplexe
     public ServerHttp2StreamMultiplexer(
             final IOSession ioSession,
             final HttpProcessor httpProcessor,
-            final HandlerFactory<AsyncExchangeHandler> exchangeHandlerFactory,
+            final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
             final Charset charset,
             final H2Config h2Config) {
         this(ioSession, DefaultFrameFactory.INSTANCE, httpProcessor, exchangeHandlerFactory, charset, h2Config, null);

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java Wed Sep 28 17:26:17 2016
@@ -39,7 +39,7 @@ import org.apache.hc.core5.http2.H2Conne
 import org.apache.hc.core5.http2.H2Error;
 import org.apache.hc.core5.http2.config.H2Config;
 import org.apache.hc.core5.http2.frame.DefaultFrameFactory;
-import org.apache.hc.core5.http2.nio.AsyncExchangeHandler;
+import org.apache.hc.core5.http2.nio.AsyncServerExchangeHandler;
 import org.apache.hc.core5.http2.nio.HandlerFactory;
 import org.apache.hc.core5.reactor.IOEventHandler;
 import org.apache.hc.core5.reactor.IOSession;
@@ -54,7 +54,7 @@ public class ServerHttpProtocolNegotiato
     final static byte[] PREFACE = ClientHttpProtocolNegotiator.PREFACE;
 
     private final HttpProcessor httpProcessor;
-    private final HandlerFactory<AsyncExchangeHandler> exchangeHandlerFactory;
+    private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
     private final Charset charset;
     private final H2Config h2Config;
     private final Http2StreamListener streamListener;
@@ -63,7 +63,7 @@ public class ServerHttpProtocolNegotiato
 
     public ServerHttpProtocolNegotiator(
             final HttpProcessor httpProcessor,
-            final HandlerFactory<AsyncExchangeHandler> exchangeHandlerFactory,
+            final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
             final Charset charset,
             final H2Config h2Config,
             final Http2StreamListener streamListener,

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiatorFactory.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiatorFactory.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiatorFactory.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiatorFactory.java Wed Sep 28 17:26:17 2016
@@ -34,7 +34,7 @@ import org.apache.hc.core5.annotation.Co
 import org.apache.hc.core5.annotation.ThreadingBehavior;
 import org.apache.hc.core5.http.protocol.HttpProcessor;
 import org.apache.hc.core5.http2.config.H2Config;
-import org.apache.hc.core5.http2.nio.AsyncExchangeHandler;
+import org.apache.hc.core5.http2.nio.AsyncServerExchangeHandler;
 import org.apache.hc.core5.http2.nio.HandlerFactory;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOSession;
@@ -47,7 +47,7 @@ import org.apache.hc.core5.util.Args;
 public class ServerHttpProtocolNegotiatorFactory implements IOEventHandlerFactory {
 
     private final HttpProcessor httpProcessor;
-    private final HandlerFactory<AsyncExchangeHandler> exchangeHandlerFactory;
+    private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
     private final Charset charset;
     private final H2Config h2Config;
     private final Http2StreamListener streamListener;
@@ -55,7 +55,7 @@ public class ServerHttpProtocolNegotiato
 
     public ServerHttpProtocolNegotiatorFactory(
             final HttpProcessor httpProcessor,
-            final HandlerFactory<AsyncExchangeHandler> exchangeHandlerFactory,
+            final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
             final Charset charset,
             final H2Config h2Config,
             final Http2StreamListener streamListener,
@@ -70,7 +70,7 @@ public class ServerHttpProtocolNegotiato
 
     public ServerHttpProtocolNegotiatorFactory(
             final HttpProcessor httpProcessor,
-            final HandlerFactory<AsyncExchangeHandler> exchangeHandlerFactory,
+            final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
             final Http2StreamListener streamListener,
             final HttpErrorListener errorListener) {
         this(httpProcessor, exchangeHandlerFactory, null, null, streamListener, errorListener);

Copied: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractAsyncServerExchangeHandler.java (from r1762528, httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractAsyncExchangeHandler.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractAsyncServerExchangeHandler.java?p2=httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractAsyncServerExchangeHandler.java&p1=httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractAsyncExchangeHandler.java&r1=1762528&r2=1762692&rev=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractAsyncExchangeHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractAsyncServerExchangeHandler.java Wed Sep 28 17:26:17 2016
@@ -39,44 +39,46 @@ import org.apache.hc.core5.http.HttpExce
 import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpStatus;
 import org.apache.hc.core5.http.Message;
-import org.apache.hc.core5.http.entity.ContentType;
-import org.apache.hc.core5.http2.nio.entity.StringAsyncEntityProducer;
 import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.Asserts;
 
 /**
  * @since 5.0
  */
-public abstract class AbstractAsyncExchangeHandler<T> implements AsyncExchangeHandler {
+public abstract class AbstractAsyncServerExchangeHandler<T> implements AsyncServerExchangeHandler {
 
     private final AsyncRequestConsumer<Message<HttpRequest, T>> requestConsumer;
     private final AtomicReference<AsyncResponseProducer> responseProducer;
     private final AtomicBoolean dataStarted;
-    private final AtomicReference<Exception> exception;
 
-    public AbstractAsyncExchangeHandler(final AsyncRequestConsumer<Message<HttpRequest, T>> requestConsumer) {
+    public AbstractAsyncServerExchangeHandler(final AsyncRequestConsumer<Message<HttpRequest, T>> requestConsumer) {
         this.requestConsumer = Args.notNull(requestConsumer, "Request consumer");
         this.responseProducer = new AtomicReference<>(null);
         this.dataStarted = new AtomicBoolean(false);
-        this.exception = new AtomicReference<>(null);
     }
 
-    public AbstractAsyncExchangeHandler(final AsyncEntityConsumer<T> requestEntityConsumer) {
+    public AbstractAsyncServerExchangeHandler(final AsyncEntityConsumer<T> requestEntityConsumer) {
         this(new BasicRequestConsumer<>(requestEntityConsumer));
     }
 
-    public Exception getException() {
-        return exception.get();
+    protected AsyncResponseProducer verify(final HttpRequest request) throws IOException, HttpException {
+        return null;
     }
 
     protected abstract void handle(Message<HttpRequest, T> request, AsyncResponseTrigger responseTrigger) throws IOException, HttpException;
 
     @Override
-    public void verify(
+    public final void verify(
             final HttpRequest request,
             final EntityDetails entityDetails,
             final ExpectationChannel expectationChannel) throws HttpException, IOException {
-        expectationChannel.sendContinue();
+        final AsyncResponseProducer producer = verify(request);
+        if (producer != null) {
+            responseProducer.set(producer);
+            expectationChannel.sendResponse(producer.produceResponse(), producer.getEntityDetails());
+        } else {
+            expectationChannel.sendContinue();
+        }
     }
 
     @Override
@@ -119,9 +121,8 @@ public abstract class AbstractAsyncExcha
                     handle(message, responseTrigger);
                 } catch (HttpException ex) {
                     try {
-                        responseTrigger.submitResponse(new BasicResponseProducer(
-                                HttpStatus.SC_INTERNAL_SERVER_ERROR,
-                                new StringAsyncEntityProducer(ex.getMessage(), ContentType.TEXT_PLAIN)));
+                        responseTrigger.submitResponse(
+                                new BasicResponseProducer(HttpStatus.SC_INTERNAL_SERVER_ERROR, ex.getMessage()));
                     } catch (HttpException | IOException ex2) {
                         failed(ex2);
                     }
@@ -132,7 +133,7 @@ public abstract class AbstractAsyncExcha
 
             @Override
             public void failed(final Exception ex) {
-                exception.compareAndSet(null, ex);
+                AbstractAsyncServerExchangeHandler.this.failed(ex);
                 releaseResources();
             }
 
@@ -183,11 +184,10 @@ public abstract class AbstractAsyncExcha
 
     @Override
     public final void failed(final Exception cause) {
-        if (exception.compareAndSet(null, cause)) {
-            final AsyncResponseProducer dataProducer = responseProducer.get();
-            if (dataProducer != null) {
-                dataProducer.failed(cause);
-            }
+        requestConsumer.failed(cause);
+        final AsyncResponseProducer dataProducer = responseProducer.get();
+        if (dataProducer != null) {
+            dataProducer.failed(cause);
         }
         releaseResources();
     }

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractAsyncServerExchangeHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractAsyncServerExchangeHandler.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractAsyncServerExchangeHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractClassicServerExchangeHandler.java (from r1762528, httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractClassicExchangeHandler.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractClassicServerExchangeHandler.java?p2=httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractClassicServerExchangeHandler.java&p1=httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractClassicExchangeHandler.java&r1=1762528&r2=1762692&rev=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractClassicExchangeHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractClassicServerExchangeHandler.java Wed Sep 28 17:26:17 2016
@@ -57,7 +57,7 @@ import org.apache.hc.core5.util.Asserts;
 /**
  * @since 5.0
  */
-public abstract class AbstractClassicExchangeHandler implements AsyncExchangeHandler {
+public abstract class AbstractClassicServerExchangeHandler implements AsyncServerExchangeHandler {
 
     private enum State { IDLE, ACTIVE, COMPLETED }
 
@@ -69,7 +69,7 @@ public abstract class AbstractClassicExc
     private volatile SharedInputBuffer inputBuffer;
     private volatile SharedOutputBuffer outputBuffer;
 
-    public AbstractClassicExchangeHandler(final int initialBufferSize, final Executor executor) {
+    public AbstractClassicServerExchangeHandler(final int initialBufferSize, final Executor executor) {
         this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size");
         this.executor = Args.notNull(executor, "Executor");
         this.exception = new AtomicReference<>(null);

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractClassicServerExchangeHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractClassicServerExchangeHandler.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AbstractClassicServerExchangeHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncClientExchangeHandler.java (from r1762528, httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncResponseConsumer.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncClientExchangeHandler.java?p2=httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncClientExchangeHandler.java&p1=httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncResponseConsumer.java&r1=1762528&r2=1762692&rev=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncResponseConsumer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncClientExchangeHandler.java Wed Sep 28 17:26:17 2016
@@ -28,18 +28,24 @@ package org.apache.hc.core5.http2.nio;
 
 import java.io.IOException;
 
-import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpResponse;
 
 /**
- * Abstract asynchronous response consumer.
+ * Abstract asynchronous client side message exchange handler that acts as a request producer
+ * and a response consumer.
  *
  * @since 5.0
  */
-public interface AsyncResponseConsumer<T> extends AsyncDataConsumer {
+public interface AsyncClientExchangeHandler extends AsyncDataConsumer, AsyncDataProducer {
 
-    void consumeResponse(HttpResponse response, EntityDetails entityDetails, FutureCallback<T> resultCallback) throws HttpException, IOException;
+    void submitRequest(RequestChannel channel) throws HttpException, IOException;
+
+    void consumeResponse(HttpResponse response, EntityDetails entityDetails) throws HttpException, IOException;
+
+    void failed(Exception cause);
+
+    void cancel();
 
 }

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncRequestConsumer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncRequestConsumer.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncRequestConsumer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncRequestConsumer.java Wed Sep 28 17:26:17 2016
@@ -42,4 +42,6 @@ public interface AsyncRequestConsumer<T>
 
     void consumeRequest(HttpRequest request, EntityDetails entityDetails, FutureCallback<T> resultCallback) throws HttpException, IOException;
 
+    void failed(Exception cause);
+
 }

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncResponseConsumer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncResponseConsumer.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncResponseConsumer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncResponseConsumer.java Wed Sep 28 17:26:17 2016
@@ -42,4 +42,6 @@ public interface AsyncResponseConsumer<T
 
     void consumeResponse(HttpResponse response, EntityDetails entityDetails, FutureCallback<T> resultCallback) throws HttpException, IOException;
 
+    void failed(Exception cause);
+
 }

Copied: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncServerExchangeHandler.java (from r1762528, httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncExchangeHandler.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncServerExchangeHandler.java?p2=httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncServerExchangeHandler.java&p1=httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncExchangeHandler.java&r1=1762528&r2=1762692&rev=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncExchangeHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncServerExchangeHandler.java Wed Sep 28 17:26:17 2016
@@ -33,11 +33,12 @@ import org.apache.hc.core5.http.HttpExce
 import org.apache.hc.core5.http.HttpRequest;
 
 /**
- * Abstract asynchronous message exchange handler that acts as a request consumer and a response producer.
+ * Abstract asynchronous server side message exchange handler that acts as a request consumer
+ * and a response producer.
  *
  * @since 5.0
  */
-public interface AsyncExchangeHandler extends AsyncDataConsumer, AsyncDataProducer {
+public interface AsyncServerExchangeHandler extends AsyncDataConsumer, AsyncDataProducer {
 
     void verify(HttpRequest request, EntityDetails entityDetails, ExpectationChannel expectationChannel) throws HttpException, IOException;
 

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncServerExchangeHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncServerExchangeHandler.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncServerExchangeHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicClientExchangeHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicClientExchangeHandler.java?rev=1762692&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicClientExchangeHandler.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicClientExchangeHandler.java Wed Sep 28 17:26:17 2016
@@ -0,0 +1,159 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http2.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * @since 5.0
+ */
+public class BasicClientExchangeHandler<T> implements AsyncClientExchangeHandler{
+
+    private final AsyncRequestProducer requestProducer;
+    private final AsyncResponseConsumer<T> responseConsumer;
+    private final FutureCallback<T> resultCallback;
+    private final AtomicBoolean dataStarted;
+    private final AtomicBoolean outputTerminated;
+
+    public BasicClientExchangeHandler(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
+            final FutureCallback<T> resultCallback) {
+        this.requestProducer = Args.notNull(requestProducer, "Request producer");
+        this.responseConsumer = Args.notNull(responseConsumer, "Response consumer");
+        this.resultCallback = resultCallback;
+        this.dataStarted = new AtomicBoolean(false);
+        this.outputTerminated = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void submitRequest(final RequestChannel requestChannel) throws HttpException, IOException {
+        final HttpRequest request = requestProducer.produceRequest();
+        final EntityDetails entityDetails = requestProducer.getEntityDetails();
+        requestChannel.sendRequest(request, entityDetails);
+    }
+
+    @Override
+    public int available() {
+        return requestProducer.available();
+    }
+
+    @Override
+    public void produce(final DataStreamChannel channel) throws IOException {
+        if (outputTerminated.get()) {
+            channel.endStream();
+            return;
+        }
+        if (this.dataStarted.compareAndSet(false, true)) {
+            requestProducer.dataStart(channel);
+        }
+        requestProducer.produce(channel);
+    }
+
+    @Override
+    public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
+        if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
+            outputTerminated.set(true);
+            requestProducer.releaseResources();
+        }
+        responseConsumer.consumeResponse(response, entityDetails, new FutureCallback<T>() {
+
+            @Override
+            public void completed(final T result) {
+                if (resultCallback != null) {
+                    resultCallback.completed(result);
+                }
+            }
+
+            @Override
+            public void failed(final Exception ex) {
+                if (resultCallback != null) {
+                    resultCallback.failed(ex);
+                }
+            }
+
+            @Override
+            public void cancelled() {
+                if (resultCallback != null) {
+                    resultCallback.cancelled();
+                }
+            }
+
+        });
+    }
+
+    @Override
+    public void cancel() {
+        releaseResources();
+    }
+
+    @Override
+    public int capacity() {
+        return responseConsumer.capacity();
+    }
+
+    @Override
+    public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        responseConsumer.updateCapacity(capacityChannel);
+    }
+
+    @Override
+    public void consume(final ByteBuffer src) throws IOException {
+        responseConsumer.consume(src);
+    }
+
+    @Override
+    public void streamEnd(final List<Header> trailers) throws HttpException, IOException {
+        responseConsumer.streamEnd(trailers);
+    }
+
+    @Override
+    public final void failed(final Exception cause) {
+        requestProducer.failed(cause);
+        responseConsumer.failed(cause);
+        releaseResources();
+    }
+
+    @Override
+    public final void releaseResources() {
+        requestProducer.releaseResources();
+        responseConsumer.releaseResources();
+    }
+
+}

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicClientExchangeHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicClientExchangeHandler.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicClientExchangeHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicRequestConsumer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicRequestConsumer.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicRequestConsumer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicRequestConsumer.java Wed Sep 28 17:26:17 2016
@@ -101,6 +101,11 @@ public class BasicRequestConsumer<T> imp
     }
 
     @Override
+    public void failed(final Exception cause) {
+        releaseResources();
+    }
+
+    @Override
     public void releaseResources() {
         dataConsumer.releaseResources();
     }

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicResponseConsumer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicResponseConsumer.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicResponseConsumer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicResponseConsumer.java Wed Sep 28 17:26:17 2016
@@ -97,6 +97,11 @@ public class BasicResponseConsumer<T> im
     }
 
     @Override
+    public void failed(final Exception cause) {
+        releaseResources();
+    }
+
+    @Override
     public void releaseResources() {
         dataConsumer.releaseResources();
     }

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicResponseProducer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicResponseProducer.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicResponseProducer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/BasicResponseProducer.java Wed Sep 28 17:26:17 2016
@@ -31,7 +31,9 @@ import java.io.IOException;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.HttpResponse;
 import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.entity.ContentType;
 import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http2.nio.entity.StringAsyncEntityProducer;
 import org.apache.hc.core5.util.Args;
 
 /**
@@ -51,6 +53,10 @@ public class BasicResponseProducer imple
         this(new BasicHttpResponse(code), dataProducer);
     }
 
+    public BasicResponseProducer(final int code, final String message) {
+        this(new BasicHttpResponse(code), new StringAsyncEntityProducer(message, ContentType.TEXT_PLAIN));
+    }
+
     public BasicResponseProducer(final AsyncEntityProducer dataProducer) {
         this(HttpStatus.SC_OK, dataProducer);
     }

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/FixedResponseExchangeHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/FixedResponseExchangeHandler.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/FixedResponseExchangeHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/FixedResponseExchangeHandler.java Wed Sep 28 17:26:17 2016
@@ -41,7 +41,7 @@ import org.apache.hc.core5.util.Args;
 /**
  * @since 5.0
  */
-public final class FixedResponseExchangeHandler extends AbstractAsyncExchangeHandler<Void>{
+public final class FixedResponseExchangeHandler extends AbstractAsyncServerExchangeHandler<Void> {
 
     private final AsyncResponseProducer responseProducer;
 

Copied: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/RequestChannel.java (from r1762528, httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncRequestConsumer.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/RequestChannel.java?p2=httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/RequestChannel.java&p1=httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncRequestConsumer.java&r1=1762528&r2=1762692&rev=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncRequestConsumer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/RequestChannel.java Wed Sep 28 17:26:17 2016
@@ -24,22 +24,27 @@
  * <http://www.apache.org/>.
  *
  */
+
 package org.apache.hc.core5.http2.nio;
 
 import java.io.IOException;
 
-import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpRequest;
 
 /**
- * Abstract asynchronous request consumer.
+ * Abstract request channel.
+ * <p>
+ * Implementations are expected to be thread-safe.
  *
  * @since 5.0
  */
-public interface AsyncRequestConsumer<T> extends AsyncDataConsumer {
+@Contract(threading = ThreadingBehavior.SAFE)
+public interface RequestChannel {
 
-    void consumeRequest(HttpRequest request, EntityDetails entityDetails, FutureCallback<T> resultCallback) throws HttpException, IOException;
+    void sendRequest(HttpRequest request, EntityDetails entityDetails) throws HttpException, IOException;
 
 }

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/ClientCommandEndpoint.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/ClientCommandEndpoint.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/ClientCommandEndpoint.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/ClientCommandEndpoint.java Wed Sep 28 17:26:17 2016
@@ -36,8 +36,10 @@ import org.apache.hc.core5.concurrent.Ba
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.http2.nio.AsyncClientExchangeHandler;
 import org.apache.hc.core5.http2.nio.AsyncRequestProducer;
 import org.apache.hc.core5.http2.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http2.nio.BasicClientExchangeHandler;
 import org.apache.hc.core5.reactor.Command;
 import org.apache.hc.core5.reactor.IOSession;
 
@@ -56,36 +58,42 @@ public final class ClientCommandEndpoint
         this.ioSession = ioSession;
     }
 
+    public void execute(
+            final AsyncClientExchangeHandler exchangeHandler,
+            final HttpContext context) {
+        final Command executionCommand = new ExecutionCommand(
+                exchangeHandler,
+                context != null ? context : HttpCoreContext.create());
+        ioSession.getCommandQueue().add(executionCommand);
+        ioSession.setEvent(SelectionKey.OP_WRITE);
+    }
+
     public <T> Future<T> execute(
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
             final HttpContext context,
             final FutureCallback<T> callback) {
         final BasicFuture<T> future = new BasicFuture<>(callback);
-        final Command executionCommand = new ExecutionCommand<>(
-                requestProducer,
-                responseConsumer,
-                context != null ? context : HttpCoreContext.create(),
+        execute(new BasicClientExchangeHandler<>(requestProducer, responseConsumer,
                 new FutureCallback<T>() {
 
-            @Override
-            public void completed(final T result) {
-                future.completed(result);
-            }
-
-            @Override
-            public void failed(final Exception ex) {
-                future.failed(ex);
-            }
-
-            @Override
-            public void cancelled() {
-                future.cancel();
-            }
+                    @Override
+                    public void completed(final T result) {
+                        future.completed(result);
+                    }
+
+                    @Override
+                    public void failed(final Exception ex) {
+                        future.failed(ex);
+                    }
+
+                    @Override
+                    public void cancelled() {
+                        future.cancel();
+                    }
 
-        });
-        ioSession.getCommandQueue().add(executionCommand);
-        ioSession.setEvent(SelectionKey.OP_WRITE);
+                }),
+                context != null ? context : HttpCoreContext.create());
         return future;
     }
 

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/ExecutionCommand.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/ExecutionCommand.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/ExecutionCommand.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/ExecutionCommand.java Wed Sep 28 17:26:17 2016
@@ -27,71 +27,38 @@
 
 package org.apache.hc.core5.http2.nio.command;
 
-import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.protocol.HttpContext;
-import org.apache.hc.core5.http2.nio.AsyncRequestProducer;
-import org.apache.hc.core5.http2.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http2.nio.AsyncClientExchangeHandler;
 import org.apache.hc.core5.reactor.Command;
 import org.apache.hc.core5.util.Args;
 
 /**
  * Request execution command.
  *
- * @param <T> message processing result type.
- *
  * @since 5.0
  */
-public final class ExecutionCommand<T> implements Command {
+public final class ExecutionCommand implements Command {
 
-    private final AsyncRequestProducer requestProducer;
-    private final AsyncResponseConsumer<T> responseConsumer;
+    private final AsyncClientExchangeHandler exchangeHandler;
     private final HttpContext context;
-    private final FutureCallback<T> callback;
 
-    public ExecutionCommand(
-            final AsyncRequestProducer requestProducer,
-            final AsyncResponseConsumer<T> responseConsumer,
-            final HttpContext context,
-            final FutureCallback<T> callback) {
-        this.requestProducer = Args.notNull(requestProducer, "Request producer");
-        this.responseConsumer = Args.notNull(responseConsumer, "Response consumer");
+    public ExecutionCommand(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
+        this.exchangeHandler = Args.notNull(exchangeHandler, "Handler");
         this.context = Args.notNull(context, "Context");
-        this.callback = callback;
-    }
-
-    public AsyncRequestProducer getRequestProducer() {
-        return requestProducer;
-    }
-
-    public AsyncResponseConsumer<T> getResponseConsumer() {
-        return responseConsumer;
     }
 
     public HttpContext getContext() {
         return context;
     }
 
-    public FutureCallback<T> getCallback() {
-        return callback;
+    public AsyncClientExchangeHandler getExchangeHandler() {
+        return exchangeHandler;
     }
 
     @Override
     public boolean cancel() {
-        try {
-            requestProducer.releaseResources();
-            responseConsumer.releaseResources();
-        } finally {
-            if (callback != null) {
-                callback.cancelled();
-            }
-        }
+        exchangeHandler.cancel();
         return true;
     }
 
-    @Override
-    public String toString() {
-        return "Request: " + requestProducer;
-    }
-
-
 }

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/integration/Http2IntegrationTest.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/integration/Http2IntegrationTest.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/integration/Http2IntegrationTest.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/integration/Http2IntegrationTest.java Wed Sep 28 17:26:17 2016
@@ -51,6 +51,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.Header;
@@ -66,12 +68,13 @@ import org.apache.hc.core5.http.message.
 import org.apache.hc.core5.http2.H2Error;
 import org.apache.hc.core5.http2.H2StreamResetException;
 import org.apache.hc.core5.http2.config.H2Config;
-import org.apache.hc.core5.http2.nio.AbstractAsyncExchangeHandler;
 import org.apache.hc.core5.http2.nio.AbstractAsyncPushHandler;
-import org.apache.hc.core5.http2.nio.AbstractClassicExchangeHandler;
-import org.apache.hc.core5.http2.nio.AsyncExchangeHandler;
+import org.apache.hc.core5.http2.nio.AbstractAsyncServerExchangeHandler;
+import org.apache.hc.core5.http2.nio.AbstractClassicServerExchangeHandler;
 import org.apache.hc.core5.http2.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http2.nio.AsyncResponseProducer;
 import org.apache.hc.core5.http2.nio.AsyncResponseTrigger;
+import org.apache.hc.core5.http2.nio.AsyncServerExchangeHandler;
 import org.apache.hc.core5.http2.nio.BasicPushProducer;
 import org.apache.hc.core5.http2.nio.BasicRequestProducer;
 import org.apache.hc.core5.http2.nio.BasicResponseConsumer;
@@ -126,7 +129,7 @@ public class Http2IntegrationTest extend
 
     }
 
-    static class SingleLineResponseHandler extends AbstractAsyncExchangeHandler<String> {
+    static class SingleLineResponseHandler extends AbstractAsyncServerExchangeHandler<String> {
 
         private final String message;
 
@@ -147,10 +150,10 @@ public class Http2IntegrationTest extend
 
     @Test
     public void testSimpleGet() throws Exception {
-        server.registerHandler("/hello", new Supplier<AsyncExchangeHandler>() {
+        server.registerHandler("/hello", new Supplier<AsyncServerExchangeHandler>() {
 
             @Override
-            public AsyncExchangeHandler get() {
+            public AsyncServerExchangeHandler get() {
                 return new SingleLineResponseHandler("Hi there");
             }
 
@@ -227,7 +230,7 @@ public class Http2IntegrationTest extend
 
     }
 
-    static class MultiLineResponseHandler extends AbstractAsyncExchangeHandler<String> {
+    static class MultiLineResponseHandler extends AbstractAsyncServerExchangeHandler<String> {
 
         private final String message;
         private final int count;
@@ -252,10 +255,10 @@ public class Http2IntegrationTest extend
 
     @Test
     public void testLargeGet() throws Exception {
-        server.registerHandler("/", new Supplier<AsyncExchangeHandler>() {
+        server.registerHandler("/", new Supplier<AsyncServerExchangeHandler>() {
 
             @Override
-            public AsyncExchangeHandler get() {
+            public AsyncServerExchangeHandler get() {
                 return new MultiLineResponseHandler("0123456789abcdef", 5000);
             }
 
@@ -303,10 +306,10 @@ public class Http2IntegrationTest extend
 
     @Test
     public void testBasicPost() throws Exception {
-        server.registerHandler("/hello", new Supplier<AsyncExchangeHandler>() {
+        server.registerHandler("/hello", new Supplier<AsyncServerExchangeHandler>() {
 
             @Override
-            public AsyncExchangeHandler get() {
+            public AsyncServerExchangeHandler get() {
                 return new SingleLineResponseHandler("Hi back");
             }
 
@@ -330,7 +333,7 @@ public class Http2IntegrationTest extend
         Assert.assertEquals("Hi back", entity1);
     }
 
-    static class EchoHandler implements AsyncExchangeHandler {
+    static class EchoHandler implements AsyncServerExchangeHandler {
 
         private volatile ByteBuffer buffer;
         private volatile CapacityChannel inputCapacityChannel;
@@ -446,10 +449,10 @@ public class Http2IntegrationTest extend
 
     @Test
     public void testLargePost() throws Exception {
-        server.registerHandler("*", new Supplier<AsyncExchangeHandler>() {
+        server.registerHandler("*", new Supplier<AsyncServerExchangeHandler>() {
 
             @Override
-            public AsyncExchangeHandler get() {
+            public AsyncServerExchangeHandler get() {
                 return new EchoHandler(2048);
             }
 
@@ -481,10 +484,10 @@ public class Http2IntegrationTest extend
 
     @Test
     public void testSlowResponseConsumer() throws Exception {
-        server.registerHandler("/", new Supplier<AsyncExchangeHandler>() {
+        server.registerHandler("/", new Supplier<AsyncServerExchangeHandler>() {
 
             @Override
-            public AsyncExchangeHandler get() {
+            public AsyncServerExchangeHandler get() {
                 return new MultiLineResponseHandler("0123456789abcd", 3);
             }
 
@@ -540,10 +543,10 @@ public class Http2IntegrationTest extend
 
     @Test
     public void testSlowRequestProducer() throws Exception {
-        server.registerHandler("*", new Supplier<AsyncExchangeHandler>() {
+        server.registerHandler("*", new Supplier<AsyncServerExchangeHandler>() {
 
             @Override
-            public AsyncExchangeHandler get() {
+            public AsyncServerExchangeHandler get() {
                 return new EchoHandler(2048);
             }
 
@@ -595,11 +598,11 @@ public class Http2IntegrationTest extend
 
     @Test
     public void testSlowResponseProducer() throws Exception {
-        server.registerHandler("*", new Supplier<AsyncExchangeHandler>() {
+        server.registerHandler("*", new Supplier<AsyncServerExchangeHandler>() {
 
             @Override
-            public AsyncExchangeHandler get() {
-                return new AbstractClassicExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
+            public AsyncServerExchangeHandler get() {
+                return new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
 
                     @Override
                     protected void handle(
@@ -680,11 +683,11 @@ public class Http2IntegrationTest extend
     @Test
     public void testPush() throws Exception {
         final InetSocketAddress serverEndpoint = server.start();
-        server.registerHandler("/hello", new Supplier<AsyncExchangeHandler>() {
+        server.registerHandler("/hello", new Supplier<AsyncServerExchangeHandler>() {
 
             @Override
-            public AsyncExchangeHandler get() {
-                return new AbstractAsyncExchangeHandler<Void>(new NoopEntityConsumer()) {
+            public AsyncServerExchangeHandler get() {
+                return new AbstractAsyncServerExchangeHandler<Void>(new NoopEntityConsumer()) {
 
                     @Override
                     protected void handle(
@@ -759,11 +762,11 @@ public class Http2IntegrationTest extend
     public void testPushRefused() throws Exception {
         final BlockingQueue<Exception> pushResultQueue = new LinkedBlockingDeque<>();
         final InetSocketAddress serverEndpoint = server.start();
-        server.registerHandler("/hello", new Supplier<AsyncExchangeHandler>() {
+        server.registerHandler("/hello", new Supplier<AsyncServerExchangeHandler>() {
 
             @Override
-            public AsyncExchangeHandler get() {
-                return new AbstractAsyncExchangeHandler<Void>(new NoopEntityConsumer()) {
+            public AsyncServerExchangeHandler get() {
+                return new AbstractAsyncServerExchangeHandler<Void>(new NoopEntityConsumer()) {
 
                     @Override
                     protected void handle(
@@ -830,10 +833,10 @@ public class Http2IntegrationTest extend
 
     @Test
     public void testExcessOfConcurrentStreams() throws Exception {
-        server.registerHandler("/", new Supplier<AsyncExchangeHandler>() {
+        server.registerHandler("/", new Supplier<AsyncServerExchangeHandler>() {
 
             @Override
-            public AsyncExchangeHandler get() {
+            public AsyncServerExchangeHandler get() {
                 return new MultiLineResponseHandler("0123456789abcdef", 2000);
             }
 
@@ -863,4 +866,165 @@ public class Http2IntegrationTest extend
         }
     }
 
+    @Test
+    public void testExpecationFailed() throws Exception {
+        server.registerHandler("*", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new AbstractAsyncServerExchangeHandler<String>(new StringAsyncEntityConsumer()) {
+
+                    @Override
+                    protected AsyncResponseProducer verify(final HttpRequest request) throws IOException, HttpException {
+                        final Header h = request.getFirstHeader("password");
+                        if (h != null && "secret".equals(h.getValue())) {
+                            return null;
+                        } else {
+                            return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
+                        }
+                    }
+
+                    @Override
+                    protected void handle(
+                            final Message<HttpRequest, String> request,
+                            final AsyncResponseTrigger responseTrigger) throws IOException, HttpException {
+                        responseTrigger.submitResponse(
+                                new BasicResponseProducer(HttpStatus.SC_OK, "All is well"));
+
+                    }
+                };
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientCommandEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), 5000);
+        final ClientCommandEndpoint streamEndpoint = connectFuture.get();
+
+        client.start();
+
+        final HttpRequest request1 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/echo"));
+        request1.addHeader("password", "secret");
+        final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+                new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result1 = future1.get(5, TimeUnit.SECONDS);
+        Assert.assertNotNull(result1);
+        final HttpResponse response1 = result1.getHead();
+        Assert.assertNotNull(response1);
+        Assert.assertEquals(200, response1.getCode());
+        Assert.assertNotNull("All is well", result1.getBody());
+
+        final HttpRequest request2 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/echo"));
+        final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
+                new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result2 = future2.get(5, TimeUnit.SECONDS);
+        Assert.assertNotNull(result2);
+        final HttpResponse response2 = result2.getHead();
+        Assert.assertNotNull(response2);
+        Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
+        Assert.assertNotNull("You shall not pass", result2.getBody());
+    }
+
+    @Test
+    public void testPrematureResponse() throws Exception {
+        server.registerHandler("*", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new AsyncServerExchangeHandler() {
+
+                    private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>(null);
+                    private final AtomicBoolean dataStarted = new AtomicBoolean(false);
+
+                    @Override
+                    public void verify(
+                            final HttpRequest request,
+                            final EntityDetails entityDetails,
+                            final ExpectationChannel expectationChannel) throws HttpException, IOException {
+                        expectationChannel.sendContinue();
+                    }
+
+                    @Override
+                    public int capacity() {
+                        return Integer.MAX_VALUE;
+                    }
+
+                    @Override
+                    public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+                        capacityChannel.update(Integer.MAX_VALUE);
+                    }
+
+                    @Override
+                    public void consume(final ByteBuffer src) throws IOException {
+                    }
+
+                    @Override
+                    public void streamEnd(final List<Header> trailers) throws HttpException, IOException {
+                    }
+
+                    @Override
+                    public void handleRequest(
+                            final HttpRequest request,
+                            final EntityDetails entityDetails,
+                            final ResponseChannel responseChannel) throws HttpException, IOException {
+                        final AsyncResponseProducer producer;
+                        final Header h = request.getFirstHeader("password");
+                        if (h != null && "secret".equals(h.getValue())) {
+                            producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
+                        } else {
+                            producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
+                        }
+                        responseProducer.set(producer);
+                        responseChannel.sendResponse(producer.produceResponse(), producer.getEntityDetails());
+                    }
+
+                    @Override
+                    public int available() {
+                        final AsyncResponseProducer producer = this.responseProducer.get();
+                        return producer.available();
+                    }
+
+                    @Override
+                    public void produce(final DataStreamChannel channel) throws IOException {
+                        final AsyncResponseProducer producer = this.responseProducer.get();
+                        if (dataStarted.compareAndSet(false, true)) {
+                            producer.dataStart(channel);
+                        }
+                        producer.produce(channel);
+                    }
+
+                    @Override
+                    public void failed(final Exception cause) {
+                    }
+
+                    @Override
+                    public void releaseResources() {
+                    }
+                };
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientCommandEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), 5000);
+        final ClientCommandEndpoint streamEndpoint = connectFuture.get();
+
+        client.start();
+
+        final HttpRequest request1 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/echo"));
+        final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+                new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result1 = future1.get(5, TimeUnit.SECONDS);
+        Assert.assertNotNull(result1);
+        final HttpResponse response1 = result1.getHead();
+        Assert.assertNotNull(response1);
+        Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
+        Assert.assertNotNull("You shall not pass", result1.getBody());
+    }
+
 }

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/integration/Http2TestServer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/integration/Http2TestServer.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/integration/Http2TestServer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/integration/Http2TestServer.java Wed Sep 28 17:26:17 2016
@@ -49,7 +49,7 @@ import org.apache.hc.core5.http.protocol
 import org.apache.hc.core5.http.protocol.ResponseServer;
 import org.apache.hc.core5.http.protocol.UriPatternMatcher;
 import org.apache.hc.core5.http2.config.H2Config;
-import org.apache.hc.core5.http2.nio.AsyncExchangeHandler;
+import org.apache.hc.core5.http2.nio.AsyncServerExchangeHandler;
 import org.apache.hc.core5.http2.nio.FixedResponseExchangeHandler;
 import org.apache.hc.core5.http2.nio.HandlerFactory;
 import org.apache.hc.core5.http2.nio.Supplier;
@@ -70,7 +70,7 @@ import org.apache.hc.core5.util.Args;
 public class Http2TestServer {
 
     private final ExecutorService executorService;
-    private final UriPatternMatcher<Supplier<AsyncExchangeHandler>> responseHandlerMatcher;
+    private final UriPatternMatcher<Supplier<AsyncServerExchangeHandler>> responseHandlerMatcher;
 
     private volatile DefaultListeningIOReactor ioReactor;
     private volatile Exception exception;
@@ -85,7 +85,7 @@ public class Http2TestServer {
         return start(H2Config.DEFAULT);
     }
 
-    private AsyncExchangeHandler createHandler(final HttpRequest request) throws HttpException {
+    private AsyncServerExchangeHandler createHandler(final HttpRequest request) throws HttpException {
 
         final HttpHost authority;
         try {
@@ -101,14 +101,14 @@ public class Http2TestServer {
         if (i != -1) {
             path = path.substring(0, i - 1);
         }
-        final Supplier<AsyncExchangeHandler> supplier = responseHandlerMatcher.lookup(path);
+        final Supplier<AsyncServerExchangeHandler> supplier = responseHandlerMatcher.lookup(path);
         if (supplier != null) {
             return supplier.get();
         }
         return new FixedResponseExchangeHandler(HttpStatus.SC_NOT_FOUND, "Resource not found");
     }
 
-    public void registerHandler(final String uriPattern, final Supplier<AsyncExchangeHandler> supplier) {
+    public void registerHandler(final String uriPattern, final Supplier<AsyncServerExchangeHandler> supplier) {
         Args.notNull(uriPattern, "URI pattern");
         Args.notNull(supplier, "Supplier");
         responseHandlerMatcher.register(uriPattern, supplier);
@@ -127,10 +127,10 @@ public class Http2TestServer {
                 });
         ioReactor = new DefaultListeningIOReactor(new InternalServerHttp2EventHandlerFactory(
                 httpProcessor,
-                new HandlerFactory<AsyncExchangeHandler>() {
+                new HandlerFactory<AsyncServerExchangeHandler>() {
 
                     @Override
-                    public AsyncExchangeHandler create(
+                    public AsyncServerExchangeHandler create(
                             final HttpRequest request,
                             final HttpContext context) throws HttpException {
                         return createHandler(request);

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/integration/InternalServerHttp2EventHandlerFactory.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/integration/InternalServerHttp2EventHandlerFactory.java?rev=1762692&r1=1762691&r2=1762692&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/integration/InternalServerHttp2EventHandlerFactory.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/integration/InternalServerHttp2EventHandlerFactory.java Wed Sep 28 17:26:17 2016
@@ -37,7 +37,7 @@ import org.apache.hc.core5.http2.config.
 import org.apache.hc.core5.http2.impl.nio.HttpErrorListener;
 import org.apache.hc.core5.http2.impl.nio.ServerHttp2StreamMultiplexer;
 import org.apache.hc.core5.http2.impl.nio.ServerHttpProtocolNegotiator;
-import org.apache.hc.core5.http2.nio.AsyncExchangeHandler;
+import org.apache.hc.core5.http2.nio.AsyncServerExchangeHandler;
 import org.apache.hc.core5.http2.nio.HandlerFactory;
 import org.apache.hc.core5.reactor.IOEventHandler;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
@@ -49,13 +49,13 @@ public class InternalServerHttp2EventHan
     private static final AtomicLong COUNT = new AtomicLong();
 
     private final HttpProcessor httpProcessor;
-    private final HandlerFactory<AsyncExchangeHandler> exchangeHandlerFactory;
+    private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
     private final Charset charset;
     private final H2Config h2Config;
 
     public InternalServerHttp2EventHandlerFactory(
             final HttpProcessor httpProcessor,
-            final HandlerFactory<AsyncExchangeHandler> exchangeHandlerFactory,
+            final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
             final Charset charset,
             final H2Config h2Config) {
         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");



Mime
View raw message