hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r1584060 [2/2] - in /httpcomponents/httpcore/trunk/httpcore-nio/src: main/java/org/apache/http/impl/nio/ main/java/org/apache/http/nio/protocol/ test/java/org/apache/http/impl/nio/ test/java/org/apache/http/nio/integration/ test/java/org/ap...
Date Wed, 02 Apr 2014 14:44:37 GMT
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncService.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncService.java?rev=1584060&r1=1584059&r2=1584060&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncService.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncService.java Wed Apr  2 14:44:36 2014
@@ -29,14 +29,16 @@ package org.apache.http.nio.protocol;
 
 import java.io.IOException;
 import java.net.SocketTimeoutException;
+import java.util.Queue;
 
 import org.apache.http.ConnectionReuseStrategy;
+import org.apache.http.HttpEntityEnclosingRequest;
 import org.apache.http.HttpException;
+import org.apache.http.HttpRequest;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpResponseFactory;
 import org.apache.http.HttpStatus;
 import org.apache.http.HttpVersion;
-import org.apache.http.UnsupportedHttpVersionException;
 import org.apache.http.concurrent.Cancellable;
 import org.apache.http.impl.DefaultHttpResponseFactory;
 import org.apache.http.message.BasicHttpEntityEnclosingRequest;
@@ -47,7 +49,11 @@ import org.apache.http.nio.ContentEncode
 import org.apache.http.nio.NHttpClientConnection;
 import org.apache.http.nio.NHttpServerConnection;
 import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.nio.protocol.HttpAsyncService.Incoming;
+import org.apache.http.nio.protocol.HttpAsyncService.Outgoing;
+import org.apache.http.nio.protocol.HttpAsyncService.PipelineEntry;
 import org.apache.http.nio.protocol.HttpAsyncService.State;
+import org.apache.http.nio.reactor.SessionBufferStatus;
 import org.apache.http.protocol.BasicHttpContext;
 import org.apache.http.protocol.HTTP;
 import org.apache.http.protocol.HttpContext;
@@ -57,6 +63,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 
@@ -116,17 +123,24 @@ public class TestHttpAsyncService {
         Assert.assertNotNull(state);
         Assert.assertEquals(MessageState.READY, state.getRequestState());
         Assert.assertEquals(MessageState.READY, state.getResponseState());
-        Assert.assertEquals("request state: READY; request: ; " +
-                "response state: READY; response: ;", state.toString());
+        Assert.assertEquals("[incoming READY; outgoing READY]", state.toString());
     }
 
     @Test
     public void testClosed() throws Exception {
-        final State state = new HttpAsyncService.State();
+        final State state = new State();
         state.setRequestState(MessageState.COMPLETED);
         state.setResponseState(MessageState.COMPLETED);
-        state.setRequestConsumer(this.requestConsumer);
-        state.setResponseProducer(this.responseProducer);
+        final HttpContext exchangeContext = new BasicHttpContext();
+
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         state.setCancellable(this.cancellable);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
@@ -141,10 +155,14 @@ public class TestHttpAsyncService {
 
     @Test
     public void testHttpExceptionHandling() throws Exception {
-        final State state = new HttpAsyncService.State();
+        final State state = new State();
         state.setRequestState(MessageState.READY);
         state.setResponseState(MessageState.READY);
-        state.setRequestConsumer(this.requestConsumer);
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
         state.setCancellable(this.cancellable);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
@@ -153,9 +171,11 @@ public class TestHttpAsyncService {
 
         Assert.assertEquals(MessageState.READY, state.getRequestState());
         Assert.assertEquals(MessageState.BODY_STREAM, state.getResponseState());
-        Assert.assertNotNull(state.getResponseProducer());
-        Assert.assertNotNull(state.getResponse());
-        Assert.assertEquals(500, state.getResponse().getStatusLine().getStatusCode());
+        final Outgoing outgoing = state.getOutgoing();
+        Assert.assertNotNull(outgoing);
+        Assert.assertNotNull(outgoing.getProducer());
+        Assert.assertNotNull(outgoing.getResponse());
+        Assert.assertEquals(500, outgoing.getResponse().getStatusLine().getStatusCode());
 
         Mockito.verify(this.requestConsumer).failed(httpex);
         Mockito.verify(this.requestConsumer).close();
@@ -165,18 +185,36 @@ public class TestHttpAsyncService {
     }
 
     @Test
-    public void testHttpExceptionHandlingRuntimeException() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
+    public void testExceptionHandlingNoState() throws Exception {
+        this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, null);
+
+        final Exception ex = new Exception("Oopsie");
+        this.protocolHandler.exception(conn, ex);
+
+        Mockito.verify(conn).getContext();
+        Mockito.verify(conn).shutdown();
+        Mockito.verifyNoMoreInteractions(conn);
+    }
+
+    @Test
+    public void testExceptionHandlingRuntimeException() throws Exception {
+        final State state = new State();
         state.setRequestState(MessageState.READY);
         state.setResponseState(MessageState.READY);
-        state.setRequestConsumer(this.requestConsumer);
-        state.setResponseProducer(this.responseProducer);
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         state.setCancellable(this.cancellable);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
         Mockito.doThrow(new RuntimeException()).when(this.httpProcessor).process(
-                Mockito.any(HttpResponse.class), Mockito.eq(exchangeContext));
+                Mockito.any(HttpResponse.class), Mockito.any(HttpContext.class));
         final HttpException httpex = new HttpException();
         try {
             this.protocolHandler.exception(this.conn, httpex);
@@ -193,17 +231,23 @@ public class TestHttpAsyncService {
 
     @Test
     public void testHttpExceptionHandlingIOException() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
+        final State state = new State();
         state.setRequestState(MessageState.READY);
         state.setResponseState(MessageState.READY);
-        state.setRequestConsumer(this.requestConsumer);
-        state.setResponseProducer(this.responseProducer);
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         state.setCancellable(this.cancellable);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
         Mockito.doThrow(new IOException()).when(this.httpProcessor).process(
-                Mockito.any(HttpResponse.class), Mockito.eq(exchangeContext));
+                Mockito.any(HttpResponse.class), Mockito.any(HttpContext.class));
         final HttpException httpex = new HttpException();
 
         this.protocolHandler.exception(this.conn, httpex);
@@ -218,11 +262,18 @@ public class TestHttpAsyncService {
 
     @Test
     public void testHttpExceptionHandlingResponseSubmitted() throws Exception {
-        final State state = new HttpAsyncService.State();
+        final State state = new State();
         state.setRequestState(MessageState.READY);
         state.setResponseState(MessageState.READY);
-        state.setRequestConsumer(this.requestConsumer);
-        state.setResponseProducer(this.responseProducer);
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
         Mockito.when(this.conn.isResponseSubmitted()).thenReturn(Boolean.TRUE);
 
@@ -239,36 +290,14 @@ public class TestHttpAsyncService {
     }
 
     @Test
-    public void testIOExceptionHandling() throws Exception {
-        final State state = new HttpAsyncService.State();
-        state.setRequestState(MessageState.READY);
-        state.setResponseState(MessageState.READY);
-        state.setRequestConsumer(this.requestConsumer);
-        state.setResponseProducer(this.responseProducer);
-        this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
-
-        final IOException httpex = new IOException();
-        this.protocolHandler.exception(this.conn, httpex);
-
-        Assert.assertEquals(MessageState.READY, state.getRequestState());
-        Assert.assertEquals(MessageState.READY, state.getResponseState());
-        Mockito.verify(this.conn).shutdown();
-        Mockito.verify(this.requestConsumer).failed(httpex);
-        Mockito.verify(this.requestConsumer).close();
-        Mockito.verify(this.responseProducer).failed(httpex);
-        Mockito.verify(this.responseProducer).close();
-    }
-
-    @Test
     public void testBasicRequest() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
+        final State state = new State();
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final BasicHttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
         Mockito.when(this.conn.getHttpRequest()).thenReturn(request);
         Mockito.when(this.requestHandler.processRequest(
-                request, exchangeContext)).thenReturn(this.requestConsumer);
+                Mockito.eq(request), Mockito.any(HttpContext.class))).thenReturn(this.requestConsumer);
         Mockito.when(this.requestConsumer.getException()).thenReturn(null);
         final Object data = new Object();
         Mockito.when(this.requestConsumer.getResult()).thenReturn(data);
@@ -278,63 +307,157 @@ public class TestHttpAsyncService {
         Assert.assertEquals(MessageState.COMPLETED, state.getRequestState());
         Assert.assertEquals(MessageState.INIT, state.getResponseState());
 
-        Assert.assertSame(request, state.getRequest());
-        Assert.assertSame(this.requestHandler, state.getRequestHandler());
-        Assert.assertSame(this.requestConsumer, state.getRequestConsumer());
+        final Incoming incoming = state.getIncoming();
+        Assert.assertNull(incoming);
+
+        final ArgumentCaptor<HttpContext> argumentCaptor = ArgumentCaptor.forClass(HttpContext.class);
+        Mockito.verify(this.httpProcessor).process(Mockito.eq(request), argumentCaptor.capture());
+        final HttpContext exchangeContext = argumentCaptor.getValue();
+        Assert.assertNotNull(exchangeContext);
+
         Assert.assertSame(request, exchangeContext.getAttribute(HttpCoreContext.HTTP_REQUEST));
         Assert.assertSame(this.conn, exchangeContext.getAttribute(HttpCoreContext.HTTP_CONNECTION));
 
-        Mockito.verify(this.httpProcessor).process(request, exchangeContext);
         Mockito.verify(this.requestConsumer).requestReceived(request);
         Mockito.verify(this.requestConsumer).requestCompleted(exchangeContext);
         Mockito.verify(this.requestHandler).handle(
                 Mockito.eq(data),
                 Mockito.any(HttpAsyncExchange.class),
                 Mockito.eq(exchangeContext));
+        Assert.assertTrue(state.getPipeline().isEmpty());
+    }
+
+    @Test
+    public void testRequestPipelineIfResponseInitiated() throws Exception {
+        final State state = new State();
+        state.setRequestState(MessageState.READY);
+        state.setResponseState(MessageState.INIT);
+        this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
+
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        Mockito.when(this.conn.getHttpRequest()).thenReturn(request);
+        Mockito.when(this.requestHandler.processRequest(
+                Mockito.eq(request), Mockito.any(HttpContext.class))).thenReturn(this.requestConsumer);
+        Mockito.when(this.requestConsumer.getException()).thenReturn(null);
+        final Object data = new Object();
+        Mockito.when(this.requestConsumer.getResult()).thenReturn(data);
+
+        this.protocolHandler.requestReceived(this.conn);
+
+        Assert.assertEquals(MessageState.COMPLETED, state.getRequestState());
+        Assert.assertEquals(MessageState.INIT, state.getResponseState());
+
+        final Incoming incoming = state.getIncoming();
+        Assert.assertNull(incoming);
+
+        Mockito.verify(this.requestConsumer).requestReceived(request);
+        Mockito.verify(this.requestConsumer).requestCompleted(Mockito.<HttpContext>any());
+        Mockito.verify(this.requestHandler, Mockito.never()).handle(
+                Mockito.any(),
+                Mockito.any(HttpAsyncExchange.class),
+                Mockito.any(HttpContext.class));
+
+        Assert.assertFalse(state.getPipeline().isEmpty());
+        final PipelineEntry entry = state.getPipeline().remove();
+        Assert.assertSame(request, entry.getRequest());
+        Assert.assertSame(data, entry.getResult());
+    }
+
+    @Test
+    public void testRequestPipelineIfPipelineNotEmpty() throws Exception {
+        final State state = new State();
+        state.setRequestState(MessageState.READY);
+        state.setResponseState(MessageState.READY);
+
+        final Queue<PipelineEntry> pipeline = state.getPipeline();
+
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest pipelinedRequest = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final PipelineEntry entry = new PipelineEntry(pipelinedRequest, pipelinedRequest,
+                null, requestHandler, exchangeContext);
+        pipeline.add(entry);
+
+        this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
+
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        Mockito.when(this.conn.getHttpRequest()).thenReturn(request);
+        Mockito.when(this.requestHandler.processRequest(
+                Mockito.eq(request), Mockito.any(HttpContext.class))).thenReturn(this.requestConsumer);
+        Mockito.when(this.requestConsumer.getException()).thenReturn(null);
+        final Object data = new Object();
+        Mockito.when(this.requestConsumer.getResult()).thenReturn(data);
+
+        this.protocolHandler.requestReceived(this.conn);
+
+        Assert.assertEquals(MessageState.COMPLETED, state.getRequestState());
+        Assert.assertEquals(MessageState.READY, state.getResponseState());
+
+        final Incoming incoming = state.getIncoming();
+        Assert.assertNull(incoming);
+
+        Mockito.verify(this.requestConsumer).requestReceived(request);
+        Mockito.verify(this.requestConsumer).requestCompleted(Mockito.<HttpContext>any());
+        Mockito.verify(this.requestHandler, Mockito.never()).handle(
+                Mockito.any(),
+                Mockito.any(HttpAsyncExchange.class),
+                Mockito.any(HttpContext.class));
+
+        Assert.assertFalse(state.getPipeline().isEmpty());
+        final PipelineEntry entry1 = state.getPipeline().remove();
+        Assert.assertSame(entry, entry1);
+        final PipelineEntry entry2 = state.getPipeline().remove();
+        Assert.assertSame(request, entry2.getRequest());
+        Assert.assertSame(data, entry2.getResult());
     }
 
     @Test
     public void testRequestNoMatchingHandler() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
+        final State state = new State();
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST",
+        final HttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST",
                 "/stuff", HttpVersion.HTTP_1_1);
         request.setEntity(new NStringEntity("stuff"));
         Mockito.when(this.conn.getHttpRequest()).thenReturn(request);
         Mockito.when(this.requestHandler.processRequest(
-                request, exchangeContext)).thenReturn(this.requestConsumer);
+                Mockito.eq(request), Mockito.any(HttpContext.class))).thenReturn(this.requestConsumer);
 
         this.protocolHandler.requestReceived(this.conn);
 
         Assert.assertEquals(MessageState.BODY_STREAM, state.getRequestState());
         Assert.assertEquals(MessageState.READY, state.getResponseState());
 
-        Assert.assertSame(request, state.getRequest());
-        Assert.assertTrue(state.getRequestHandler() instanceof NullRequestHandler);
+        final Incoming incoming = state.getIncoming();
+        Assert.assertNotNull(incoming);
+        Assert.assertSame(request, incoming.getRequest());
+        Assert.assertTrue(incoming.getHandler() instanceof NullRequestHandler);
     }
 
     @Test
     public void testEntityEnclosingRequest() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
+        final State state = new State();
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
+        final HttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
                 HttpVersion.HTTP_1_1);
         Mockito.when(this.conn.getHttpRequest()).thenReturn(request);
         Mockito.when(this.requestHandler.processRequest(
-                request, exchangeContext)).thenReturn(this.requestConsumer);
+                Mockito.eq(request), Mockito.any(HttpContext.class))).thenReturn(this.requestConsumer);
 
         this.protocolHandler.requestReceived(this.conn);
 
         Assert.assertEquals(MessageState.BODY_STREAM, state.getRequestState());
         Assert.assertEquals(MessageState.READY, state.getResponseState());
 
-        Assert.assertSame(request, state.getRequest());
-        Assert.assertSame(this.requestHandler, state.getRequestHandler());
-        Assert.assertSame(this.requestConsumer, state.getRequestConsumer());
+        final Incoming incoming = state.getIncoming();
+        Assert.assertNotNull(incoming);
+        Assert.assertSame(request, incoming.getRequest());
+        Assert.assertSame(this.requestHandler, incoming.getHandler());
+        Assert.assertSame(this.requestConsumer, incoming.getConsumer());
+
+        final HttpContext exchangeContext = incoming.getContext();
+        Assert.assertNotNull(exchangeContext);
+
         Assert.assertSame(request, exchangeContext.getAttribute(HttpCoreContext.HTTP_REQUEST));
         Assert.assertSame(this.conn, exchangeContext.getAttribute(HttpCoreContext.HTTP_CONNECTION));
 
@@ -345,25 +468,30 @@ public class TestHttpAsyncService {
 
     @Test
     public void testEntityEnclosingRequestContinueWithoutVerification() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
+        final State state = new State();
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
+        final HttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
                 HttpVersion.HTTP_1_1);
         request.addHeader(HTTP.EXPECT_DIRECTIVE, HTTP.EXPECT_CONTINUE);
         Mockito.when(this.conn.getHttpRequest()).thenReturn(request);
         Mockito.when(this.requestHandler.processRequest(
-                request, exchangeContext)).thenReturn(this.requestConsumer);
+                Mockito.eq(request), Mockito.any(HttpContext.class))).thenReturn(this.requestConsumer);
 
         this.protocolHandler.requestReceived(this.conn);
 
         Assert.assertEquals(MessageState.BODY_STREAM, state.getRequestState());
         Assert.assertEquals(MessageState.READY, state.getResponseState());
 
-        Assert.assertSame(request, state.getRequest());
-        Assert.assertSame(this.requestHandler, state.getRequestHandler());
-        Assert.assertSame(this.requestConsumer, state.getRequestConsumer());
+        final Incoming incoming = state.getIncoming();
+        Assert.assertNotNull(incoming);
+        Assert.assertSame(request, incoming.getRequest());
+        Assert.assertSame(this.requestHandler, incoming.getHandler());
+        Assert.assertSame(this.requestConsumer, incoming.getConsumer());
+
+        final HttpContext exchangeContext = incoming.getContext();
+        Assert.assertNotNull(exchangeContext);
+
         Assert.assertSame(request, exchangeContext.getAttribute(HttpCoreContext.HTTP_REQUEST));
         Assert.assertSame(this.conn, exchangeContext.getAttribute(HttpCoreContext.HTTP_CONNECTION));
 
@@ -388,25 +516,30 @@ public class TestHttpAsyncService {
                 this.httpProcessor, this.reuseStrategy, this.responseFactory,
                 this.handlerResolver, expectationVerifier);
 
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
+        final State state = new State();
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
+        final HttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
                 HttpVersion.HTTP_1_1);
         request.addHeader(HTTP.EXPECT_DIRECTIVE, HTTP.EXPECT_CONTINUE);
         Mockito.when(this.conn.getHttpRequest()).thenReturn(request);
         Mockito.when(this.requestHandler.processRequest(
-                request, exchangeContext)).thenReturn(this.requestConsumer);
+                Mockito.eq(request), Mockito.any(HttpContext.class))).thenReturn(this.requestConsumer);
 
         this.protocolHandler.requestReceived(this.conn);
 
         Assert.assertEquals(MessageState.ACK_EXPECTED, state.getRequestState());
         Assert.assertEquals(MessageState.READY, state.getResponseState());
 
-        Assert.assertSame(request, state.getRequest());
-        Assert.assertSame(this.requestHandler, state.getRequestHandler());
-        Assert.assertSame(this.requestConsumer, state.getRequestConsumer());
+        final Incoming incoming = state.getIncoming();
+        Assert.assertNotNull(incoming);
+        Assert.assertSame(request, incoming.getRequest());
+        Assert.assertSame(this.requestHandler, incoming.getHandler());
+        Assert.assertSame(this.requestConsumer, incoming.getConsumer());
+
+        final HttpContext exchangeContext = incoming.getContext();
+        Assert.assertNotNull(exchangeContext);
+
         Assert.assertSame(request, exchangeContext.getAttribute(HttpCoreContext.HTTP_REQUEST));
         Assert.assertSame(this.conn, exchangeContext.getAttribute(HttpCoreContext.HTTP_CONNECTION));
 
@@ -420,21 +553,24 @@ public class TestHttpAsyncService {
 
     @Test
     public void testRequestExpectationFailed() throws Exception {
-        final State state = new HttpAsyncService.State();
+        final State state = new State();
         state.setRequestState(MessageState.ACK_EXPECTED);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final HttpAsyncExchange httpexchanage = new HttpAsyncService.Exchange(
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpAsyncExchange httpexchanage = new HttpAsyncService.HttpAsyncExchangeImpl(
                 new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1),
                 new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK"),
-                state, this.conn);
+                state, this.conn, exchangeContext);
         Assert.assertFalse(httpexchanage.isCompleted());
         httpexchanage.submitResponse(this.responseProducer);
         Assert.assertTrue(httpexchanage.isCompleted());
 
         Assert.assertEquals(MessageState.ACK_EXPECTED, state.getRequestState());
         Assert.assertEquals(MessageState.READY, state.getResponseState());
-        Assert.assertSame(this.responseProducer, state.getResponseProducer());
+        final Outgoing outgoing = state.getOutgoing();
+        Assert.assertNotNull(outgoing);
+        Assert.assertSame(this.responseProducer, outgoing.getProducer());
 
         Mockito.verify(this.conn).requestOutput();
 
@@ -447,32 +583,120 @@ public class TestHttpAsyncService {
 
     @Test(expected=IllegalArgumentException.class)
     public void testRequestExpectationFailedInvalidResponseProducer() throws Exception {
-        final State state = new HttpAsyncService.State();
+        final State state = new State();
         state.setRequestState(MessageState.ACK_EXPECTED);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final HttpAsyncExchange httpexchanage = new HttpAsyncService.Exchange(
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpAsyncExchange httpexchanage = new HttpAsyncService.HttpAsyncExchangeImpl(
                 new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1),
                 new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK"),
-                state, this.conn);
+                state, this.conn, exchangeContext);
         httpexchanage.submitResponse(null);
     }
 
     @Test
+    public void testRequestExpectationNoHandshakeIfResponseInitiated() throws Exception {
+        final State state = new State();
+        state.setRequestState(MessageState.READY);
+        state.setResponseState(MessageState.INIT);
+        this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
+
+        final HttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
+                HttpVersion.HTTP_1_1);
+        request.addHeader(HTTP.EXPECT_DIRECTIVE, HTTP.EXPECT_CONTINUE);
+
+        Mockito.when(this.conn.getHttpRequest()).thenReturn(request);
+        Mockito.when(this.requestHandler.processRequest(
+                Mockito.eq(request), Mockito.any(HttpContext.class))).thenReturn(this.requestConsumer);
+
+        this.protocolHandler.requestReceived(this.conn);
+
+        Mockito.verify(this.requestConsumer).requestReceived(request);
+
+        Assert.assertEquals(MessageState.BODY_STREAM, state.getRequestState());
+        Assert.assertEquals(MessageState.INIT, state.getResponseState());
+    }
+
+    @Test
+    public void testRequestExpectationNoHandshakeIfPipelineNotEmpty() throws Exception {
+        final State state = new State();
+        state.setRequestState(MessageState.READY);
+        state.setResponseState(MessageState.READY);
+
+        final Queue<PipelineEntry> pipeline = state.getPipeline();
+
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest pipelinedRequest = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final PipelineEntry entry = new PipelineEntry(pipelinedRequest, pipelinedRequest,
+                null, requestHandler, exchangeContext);
+        pipeline.add(entry);
+
+        this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
+
+        final HttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
+                HttpVersion.HTTP_1_1);
+        request.addHeader(HTTP.EXPECT_DIRECTIVE, HTTP.EXPECT_CONTINUE);
+
+        Mockito.when(this.conn.getHttpRequest()).thenReturn(request);
+        Mockito.when(this.requestHandler.processRequest(
+                Mockito.eq(request), Mockito.any(HttpContext.class))).thenReturn(this.requestConsumer);
+
+        this.protocolHandler.requestReceived(this.conn);
+
+        Mockito.verify(this.requestConsumer).requestReceived(request);
+
+        Assert.assertEquals(MessageState.BODY_STREAM, state.getRequestState());
+        Assert.assertEquals(MessageState.READY, state.getResponseState());
+    }
+
+    @Test
+    public void testRequestExpectationNoHandshakeIfMoreInputAvailable() throws Exception {
+        final State state = new State();
+        state.setRequestState(MessageState.READY);
+        state.setResponseState(MessageState.READY);
+
+        this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
+
+        this.conn = Mockito.mock(NHttpServerConnection.class,
+                Mockito.withSettings().extraInterfaces(SessionBufferStatus.class));
+
+        final HttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
+                HttpVersion.HTTP_1_1);
+        request.addHeader(HTTP.EXPECT_DIRECTIVE, HTTP.EXPECT_CONTINUE);
+
+        Mockito.when(this.conn.getContext()).thenReturn(this.connContext);
+        Mockito.when(this.conn.getHttpRequest()).thenReturn(request);
+        Mockito.when(this.requestHandler.processRequest(
+                Mockito.eq(request), Mockito.any(HttpContext.class))).thenReturn(this.requestConsumer);
+        Mockito.when(((SessionBufferStatus) this.conn).hasBufferedInput()).thenReturn(Boolean.TRUE);
+
+        this.protocolHandler.requestReceived(this.conn);
+
+        Mockito.verify(this.requestConsumer).requestReceived(request);
+
+        Assert.assertEquals(MessageState.BODY_STREAM, state.getRequestState());
+        Assert.assertEquals(MessageState.READY, state.getResponseState());
+    }
+
+    @Test
     public void testRequestContinue() throws Exception {
-        final State state = new HttpAsyncService.State();
+        final State state = new State();
         state.setRequestState(MessageState.ACK_EXPECTED);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final HttpAsyncExchange httpexchanage = new HttpAsyncService.Exchange(
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpAsyncExchange httpexchanage = new HttpAsyncService.HttpAsyncExchangeImpl(
                 new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1),
                 new BasicHttpResponse(HttpVersion.HTTP_1_1, 100, "Continue"),
-                state, this.conn);
+                state, this.conn, exchangeContext);
         Assert.assertFalse(httpexchanage.isCompleted());
         httpexchanage.submitResponse();
         Assert.assertTrue(httpexchanage.isCompleted());
 
-        final HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
+        final Outgoing outgoing = state.getOutgoing();
+        Assert.assertNotNull(outgoing);
+        final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
         Assert.assertNotNull(responseProducer);
         Assert.assertEquals(MessageState.ACK_EXPECTED, state.getRequestState());
         Assert.assertEquals(MessageState.READY, state.getResponseState());
@@ -490,12 +714,14 @@ public class TestHttpAsyncService {
 
     @Test
     public void testRequestContent() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
+        final State state = new State();
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
                 HttpVersion.HTTP_1_1);
         state.setRequestState(MessageState.BODY_STREAM);
-        state.setRequest(request);
-        state.setRequestConsumer(this.requestConsumer);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
         Mockito.when(this.decoder.isCompleted()).thenReturn(Boolean.FALSE);
 
@@ -510,14 +736,14 @@ public class TestHttpAsyncService {
 
     @Test
     public void testRequestContentCompleted() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
-        final BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
+        final State state = new State();
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
                 HttpVersion.HTTP_1_1);
         state.setRequestState(MessageState.BODY_STREAM);
-        state.setRequest(request);
-        state.setRequestConsumer(this.requestConsumer);
-        state.setRequestHandler(this.requestHandler);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
         Mockito.when(this.decoder.isCompleted()).thenReturn(Boolean.TRUE);
         Mockito.when(this.requestConsumer.getException()).thenReturn(null);
@@ -539,14 +765,14 @@ public class TestHttpAsyncService {
 
     @Test
     public void testRequestCompletedWithException() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
-        final BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
+        final State state = new State();
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
                 HttpVersion.HTTP_1_1);
         state.setRequestState(MessageState.BODY_STREAM);
-        state.setRequest(request);
-        state.setRequestConsumer(this.requestConsumer);
-        state.setRequestHandler(this.requestHandler);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
         Mockito.when(this.decoder.isCompleted()).thenReturn(Boolean.TRUE);
         Mockito.when(this.requestConsumer.getException()).thenReturn(new HttpException());
@@ -556,7 +782,9 @@ public class TestHttpAsyncService {
 
         Assert.assertEquals(MessageState.COMPLETED, state.getRequestState());
         Assert.assertEquals(MessageState.INIT, state.getResponseState());
-        Assert.assertNotNull(state.getResponseProducer());
+        final Outgoing outgoing = state.getOutgoing();
+        Assert.assertNotNull(outgoing);
+        Assert.assertNotNull(outgoing.getProducer());
 
         Mockito.verify(this.requestConsumer).consumeContent(this.decoder, this.conn);
         Mockito.verify(this.requestConsumer).requestCompleted(exchangeContext);
@@ -564,84 +792,98 @@ public class TestHttpAsyncService {
         Mockito.verify(this.requestHandler, Mockito.never()).handle(
                 Mockito.any(),
                 Mockito.any(HttpAsyncExchange.class),
-                Mockito.any(HttpContext.class));
+                Mockito.eq(exchangeContext));
     }
 
     @Test
-    public void testRequestHandlingHttpException() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
-        final BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
-                HttpVersion.HTTP_1_1);
-        state.setRequestState(MessageState.BODY_STREAM);
-        state.setRequest(request);
-        state.setRequestConsumer(this.requestConsumer);
-        state.setRequestHandler(this.requestHandler);
+    public void testBasicResponse() throws Exception {
+        final State state = new State();
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
+        state.setRequestState(MessageState.COMPLETED);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
-        Mockito.when(this.decoder.isCompleted()).thenReturn(Boolean.TRUE);
-        Mockito.when(this.requestConsumer.getException()).thenReturn(null);
-        final Object data = new Object();
-        Mockito.when(this.requestConsumer.getResult()).thenReturn(data);
-        Mockito.doThrow(new UnsupportedHttpVersionException()).when(
-                this.requestHandler).handle(
-                        Mockito.eq(data),
-                        Mockito.any(HttpAsyncExchange.class),
-                        Mockito.eq(exchangeContext));
 
-        this.protocolHandler.inputReady(conn, this.decoder);
+        Mockito.when(this.responseProducer.generateResponse()).thenReturn(response);
+        Mockito.when(this.reuseStrategy.keepAlive(response, exchangeContext)).thenReturn(Boolean.TRUE);
 
-        Assert.assertEquals(MessageState.COMPLETED, state.getRequestState());
-        Assert.assertEquals(MessageState.INIT, state.getResponseState());
-        Assert.assertNotNull(state.getResponseProducer());
+        this.protocolHandler.responseReady(this.conn);
 
-        Mockito.verify(this.requestConsumer).consumeContent(this.decoder, this.conn);
-        Mockito.verify(this.requestConsumer).requestCompleted(exchangeContext);
-        Mockito.verify(this.conn).requestOutput();
+        Assert.assertEquals(MessageState.READY, state.getResponseState());
+
+        Mockito.verify(this.httpProcessor).process(response, exchangeContext);
+        Mockito.verify(this.conn).submitResponse(response);
+        Mockito.verify(this.responseProducer).responseCompleted(exchangeContext);
+        Mockito.verify(this.conn).requestInput();
+        Mockito.verify(this.conn).suspendOutput();
+        Mockito.verify(this.conn, Mockito.never()).close();
     }
 
     @Test
-    public void testBasicResponse() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
-        final BasicHttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
-        state.setRequest(request);
+    public void testBasicResponseWithPipelining() throws Exception {
+        final State state = new State();
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
         state.setRequestState(MessageState.COMPLETED);
-        state.setResponseProducer(this.responseProducer);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
+
+        final Queue<PipelineEntry> pipeline = state.getPipeline();
+
+        final HttpContext exchangeContext2 = new BasicHttpContext();
+        final HttpRequest pipelinedRequest = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final PipelineEntry entry = new PipelineEntry(pipelinedRequest, pipelinedRequest,
+                null, requestHandler, exchangeContext2);
+        pipeline.add(entry);
+
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final BasicHttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
         Mockito.when(this.responseProducer.generateResponse()).thenReturn(response);
         Mockito.when(this.reuseStrategy.keepAlive(response, exchangeContext)).thenReturn(Boolean.TRUE);
 
         this.protocolHandler.responseReady(this.conn);
 
-        Assert.assertEquals(MessageState.READY, state.getRequestState());
         Assert.assertEquals(MessageState.READY, state.getResponseState());
 
         Mockito.verify(this.httpProcessor).process(response, exchangeContext);
         Mockito.verify(this.conn).submitResponse(response);
         Mockito.verify(this.responseProducer).responseCompleted(exchangeContext);
         Mockito.verify(this.conn).requestInput();
+        Mockito.verify(this.conn, Mockito.never()).suspendOutput();
         Mockito.verify(this.conn, Mockito.never()).close();
     }
 
     @Test
     public void testBasicResponseNoKeepAlive() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
-        final BasicHttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
-        state.setRequest(request);
+        final State state = new State();
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
         state.setRequestState(MessageState.COMPLETED);
-        state.setResponseProducer(this.responseProducer);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final BasicHttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
         Mockito.when(this.responseProducer.generateResponse()).thenReturn(response);
         Mockito.when(this.reuseStrategy.keepAlive(response, exchangeContext)).thenReturn(Boolean.FALSE);
 
         this.protocolHandler.responseReady(this.conn);
 
-        Assert.assertEquals(MessageState.READY, state.getRequestState());
         Assert.assertEquals(MessageState.READY, state.getResponseState());
 
         Mockito.verify(this.httpProcessor).process(response, exchangeContext);
@@ -652,15 +894,19 @@ public class TestHttpAsyncService {
 
     @Test
     public void testEntityEnclosingResponse() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
-        final BasicHttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
-        state.setRequest(request);
+        final State state = new State();
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
         state.setRequestState(MessageState.COMPLETED);
-        state.setResponseProducer(this.responseProducer);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final BasicHttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
         response.setEntity(new NStringEntity("stuff"));
         Mockito.when(this.responseProducer.generateResponse()).thenReturn(response);
 
@@ -668,8 +914,8 @@ public class TestHttpAsyncService {
 
         Assert.assertEquals(MessageState.COMPLETED, state.getRequestState());
         Assert.assertEquals(MessageState.BODY_STREAM, state.getResponseState());
-        Assert.assertEquals("request state: COMPLETED; request: GET / HTTP/1.1; " +
-                "response state: BODY_STREAM; response: HTTP/1.1 200 OK;", state.toString());
+        Assert.assertEquals("[incoming COMPLETED GET / HTTP/1.1; outgoing BODY_STREAM HTTP/1.1 200 OK]",
+                state.toString());
 
         Mockito.verify(this.httpProcessor).process(response, exchangeContext);
         Mockito.verify(this.conn).submitResponse(response);
@@ -678,22 +924,25 @@ public class TestHttpAsyncService {
 
     @Test
     public void testResponseToHead() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
-        final BasicHttpRequest request = new BasicHttpRequest("HEAD", "/", HttpVersion.HTTP_1_1);
-        state.setRequest(request);
+        final State state = new State();
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("HEAD", "/", HttpVersion.HTTP_1_1);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
         state.setRequestState(MessageState.COMPLETED);
-        state.setResponseProducer(this.responseProducer);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final BasicHttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
         response.setEntity(new NStringEntity("stuff"));
         Mockito.when(this.responseProducer.generateResponse()).thenReturn(response);
         Mockito.when(this.reuseStrategy.keepAlive(response, exchangeContext)).thenReturn(Boolean.TRUE);
 
         this.protocolHandler.responseReady(this.conn);
 
-        Assert.assertEquals(MessageState.READY, state.getRequestState());
         Assert.assertEquals(MessageState.READY, state.getResponseState());
 
         Mockito.verify(this.httpProcessor).process(response, exchangeContext);
@@ -705,23 +954,26 @@ public class TestHttpAsyncService {
 
     @Test
     public void testResponseNotModified() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
-        final BasicHttpRequest request = new BasicHttpRequest("HEAD", "/", HttpVersion.HTTP_1_1);
-        state.setRequest(request);
+        final State state = new State();
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("HEAD", "/", HttpVersion.HTTP_1_1);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
         state.setRequestState(MessageState.COMPLETED);
-        state.setResponseProducer(this.responseProducer);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1,
+                HttpStatus.SC_NOT_MODIFIED, "Not modified");
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final BasicHttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1,
-                HttpStatus.SC_NOT_MODIFIED, "Not modified");
         response.setEntity(new NStringEntity("stuff"));
         Mockito.when(this.responseProducer.generateResponse()).thenReturn(response);
         Mockito.when(this.reuseStrategy.keepAlive(response, exchangeContext)).thenReturn(Boolean.TRUE);
 
         this.protocolHandler.responseReady(this.conn);
 
-        Assert.assertEquals(MessageState.READY, state.getRequestState());
         Assert.assertEquals(MessageState.READY, state.getResponseState());
 
         Mockito.verify(this.httpProcessor).process(response, exchangeContext);
@@ -733,16 +985,21 @@ public class TestHttpAsyncService {
 
     @Test
     public void testResponseContinue() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
+        final State state = new State();
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
                 HttpVersion.HTTP_1_1);
-        state.setRequest(request);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
         state.setRequestState(MessageState.ACK_EXPECTED);
-        state.setResponseProducer(this.responseProducer);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1,
+                HttpStatus.SC_CONTINUE, "Continue");
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final BasicHttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1,
-                HttpStatus.SC_CONTINUE, "Continue");
         Mockito.when(this.responseProducer.generateResponse()).thenReturn(response);
 
         this.protocolHandler.responseReady(this.conn);
@@ -764,16 +1021,21 @@ public class TestHttpAsyncService {
 
     @Test
     public void testResponseFailedExpectation() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
-        final BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
+        final State state = new State();
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
                 HttpVersion.HTTP_1_1);
-        state.setRequest(request);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
         state.setRequestState(MessageState.ACK_EXPECTED);
-        state.setResponseProducer(this.responseProducer);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1,
+                417, "Expectation failed");
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final BasicHttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 417, "Expectation failed");
         response.setEntity(new NStringEntity("stuff"));
         Mockito.when(this.responseProducer.generateResponse()).thenReturn(response);
 
@@ -788,18 +1050,100 @@ public class TestHttpAsyncService {
         Mockito.verify(this.responseProducer, Mockito.never()).responseCompleted(exchangeContext);
     }
 
+    @Test
+    public void testResponsePipelinedEmpty() throws Exception {
+        final State state = new State();
+
+        state.setRequestState(MessageState.READY);
+        this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
+
+        this.protocolHandler.responseReady(this.conn);
+
+        Assert.assertEquals(MessageState.READY, state.getRequestState());
+        Assert.assertEquals(MessageState.READY, state.getResponseState());
+        Assert.assertNull(state.getOutgoing());
+
+        Mockito.verify(conn).getContext();
+        Mockito.verifyNoMoreInteractions(requestHandler, conn);
+    }
+
+    @Test
+    public void testResponseHandlePipelinedRequest() throws Exception {
+        final State state = new State();
+        final Queue<PipelineEntry> pipeline = state.getPipeline();
+
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final PipelineEntry entry = new PipelineEntry(request, request, null, requestHandler, exchangeContext);
+        pipeline.add(entry);
+
+        state.setRequestState(MessageState.READY);
+        this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
+
+        this.protocolHandler.responseReady(this.conn);
+
+        Assert.assertEquals(MessageState.READY, state.getRequestState());
+        Assert.assertEquals(MessageState.INIT, state.getResponseState());
+        Assert.assertNull(state.getOutgoing());
+
+        final ArgumentCaptor<HttpAsyncExchange> argCaptor = ArgumentCaptor.forClass(HttpAsyncExchange.class);
+        Mockito.verify(this.requestHandler).handle(Mockito.same(request),
+                argCaptor.capture(), Mockito.same(exchangeContext));
+        final HttpAsyncExchange exchange = argCaptor.getValue();
+
+        Assert.assertNotNull(exchange);
+        Assert.assertSame(request, exchange.getRequest());
+        Assert.assertNotNull(exchange.getResponse());
+        Assert.assertEquals(200, exchange.getResponse().getStatusLine().getStatusCode());
+    }
+
+    @Test
+    public void testResponseHandleFailedPipelinedRequest() throws Exception {
+        final State state = new State();
+        final Queue<PipelineEntry> pipeline = state.getPipeline();
+
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final Exception ex = new Exception("Opppsie");
+        final PipelineEntry entry = new PipelineEntry(request, null, ex, requestHandler, exchangeContext);
+        pipeline.add(entry);
+
+        state.setRequestState(MessageState.READY);
+        this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
+
+        this.protocolHandler.responseReady(this.conn);
+
+        Assert.assertEquals(MessageState.READY, state.getRequestState());
+        Assert.assertEquals(MessageState.BODY_STREAM, state.getResponseState());
+
+        final Outgoing outgoing = state.getOutgoing();
+        Assert.assertNotNull(outgoing.getProducer());
+        final HttpResponse response = outgoing.getResponse();
+        Assert.assertNotNull(response);
+        Assert.assertEquals(500, response.getStatusLine().getStatusCode());
+
+        Mockito.verify(this.requestHandler, Mockito.never()).handle(Mockito.<HttpRequest>any(),
+                Mockito.<HttpAsyncExchange>any(), Mockito.<HttpContext>any());
+        Mockito.verify(this.conn).submitResponse(Mockito.same(response));
+    }
+
     @Test(expected=HttpException.class)
     public void testInvalidResponseStatus() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
+        final State state = new State();
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
                 HttpVersion.HTTP_1_1);
-        state.setRequest(request);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
         state.setRequestState(MessageState.COMPLETED);
         state.setResponseState(MessageState.READY);
-        state.setResponseProducer(this.responseProducer);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 112, "Something stupid");
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final BasicHttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 112, "Something stupid");
         Mockito.when(this.responseProducer.generateResponse()).thenReturn(response);
         Mockito.when(this.conn.isResponseSubmitted()).thenReturn(Boolean.FALSE);
 
@@ -808,17 +1152,22 @@ public class TestHttpAsyncService {
 
     @Test(expected=HttpException.class)
     public void testInvalidResponseStatusToExpection() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
+        final State state = new State();
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/",
                 HttpVersion.HTTP_1_1);
-        state.setRequest(request);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
         state.setRequestState(MessageState.ACK_EXPECTED);
         state.setResponseState(MessageState.READY);
-        state.setResponseProducer(this.responseProducer);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
+        response.setEntity(new NStringEntity("stuff"));
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final BasicHttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
-        response.setEntity(new NStringEntity("stuff"));
         Mockito.when(this.responseProducer.generateResponse()).thenReturn(response);
         Mockito.when(this.conn.isResponseSubmitted()).thenReturn(Boolean.FALSE);
 
@@ -827,22 +1176,25 @@ public class TestHttpAsyncService {
 
     @Test
     public void testResponseTrigger() throws Exception {
-        final State state = new HttpAsyncService.State();
+        final State state = new State();
         state.setRequestState(MessageState.COMPLETED);
         state.setResponseState(MessageState.READY);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final HttpAsyncExchange httpexchanage = new HttpAsyncService.Exchange(
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpAsyncExchange httpexchanage = new HttpAsyncService.HttpAsyncExchangeImpl(
                 new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1),
                 new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK"),
-                state, this.conn);
+                state, this.conn, exchangeContext);
         Assert.assertFalse(httpexchanage.isCompleted());
         httpexchanage.submitResponse(this.responseProducer);
         Assert.assertTrue(httpexchanage.isCompleted());
 
         Assert.assertEquals(MessageState.COMPLETED, state.getRequestState());
         Assert.assertEquals(MessageState.READY, state.getResponseState());
-        Assert.assertSame(this.responseProducer, state.getResponseProducer());
+        final Outgoing outgoing = state.getOutgoing();
+        Assert.assertNotNull(outgoing);
+        Assert.assertSame(this.responseProducer, outgoing.getProducer());
 
         Mockito.verify(this.conn).requestOutput();
 
@@ -855,26 +1207,30 @@ public class TestHttpAsyncService {
 
     @Test(expected=IllegalArgumentException.class)
     public void testResponseTriggerInvalidResponseProducer() throws Exception {
-        final State state = new HttpAsyncService.State();
+        final State state = new State();
         state.setRequestState(MessageState.ACK_EXPECTED);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
-        final HttpAsyncExchange httpexchanage = new HttpAsyncService.Exchange(
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpAsyncExchange httpexchanage = new HttpAsyncService.HttpAsyncExchangeImpl(
                 new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1),
                 new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK"),
-                state, this.conn);
+                state, this.conn, exchangeContext);
         httpexchanage.submitResponse(null);
     }
 
     @Test
     public void testResponseContent() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final BasicHttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
-        response.setEntity(new NStringEntity("stuff"));
+        final State state = new State();
         state.setRequestState(MessageState.COMPLETED);
         state.setResponseState(MessageState.BODY_STREAM);
-        state.setResponse(response);
-        state.setResponseProducer(this.responseProducer);
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
+        response.setEntity(new NStringEntity("stuff"));
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
         Mockito.when(this.encoder.isCompleted()).thenReturn(Boolean.FALSE);
 
@@ -890,21 +1246,22 @@ public class TestHttpAsyncService {
 
     @Test
     public void testResponseContentCompleted() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
-        final BasicHttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
-        response.setEntity(new NStringEntity("stuff"));
+        final State state = new State();
         state.setRequestState(MessageState.COMPLETED);
         state.setResponseState(MessageState.BODY_STREAM);
-        state.setResponse(response);
-        state.setResponseProducer(this.responseProducer);
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
+        response.setEntity(new NStringEntity("stuff"));
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
         Mockito.when(this.encoder.isCompleted()).thenReturn(true);
         Mockito.when(this.reuseStrategy.keepAlive(response, exchangeContext)).thenReturn(Boolean.TRUE);
 
         this.protocolHandler.outputReady(conn, this.encoder);
 
-        Assert.assertEquals(MessageState.READY, state.getRequestState());
         Assert.assertEquals(MessageState.READY, state.getResponseState());
 
         Mockito.verify(this.responseProducer).produceContent(this.encoder, this.conn);
@@ -915,21 +1272,22 @@ public class TestHttpAsyncService {
 
     @Test
     public void testResponseContentCompletedNoKeepAlive() throws Exception {
-        final State state = new HttpAsyncService.State();
-        final HttpContext exchangeContext = state.getContext();
-        final BasicHttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
-        response.setEntity(new NStringEntity("stuff"));
+        final State state = new State();
         state.setRequestState(MessageState.COMPLETED);
         state.setResponseState(MessageState.BODY_STREAM);
-        state.setResponse(response);
-        state.setResponseProducer(this.responseProducer);
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
+        response.setEntity(new NStringEntity("stuff"));
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
         Mockito.when(this.encoder.isCompleted()).thenReturn(true);
         Mockito.when(this.reuseStrategy.keepAlive(response, exchangeContext)).thenReturn(Boolean.FALSE);
 
         this.protocolHandler.outputReady(conn, this.encoder);
 
-        Assert.assertEquals(MessageState.READY, state.getRequestState());
         Assert.assertEquals(MessageState.READY, state.getResponseState());
 
         Mockito.verify(this.responseProducer).produceContent(this.encoder, this.conn);
@@ -939,8 +1297,30 @@ public class TestHttpAsyncService {
     }
 
     @Test
+    public void testEndOfInput() throws Exception {
+
+        Mockito.when(this.conn.getSocketTimeout()).thenReturn(1000);
+
+        this.protocolHandler.endOfInput(this.conn);
+
+        Mockito.verify(this.conn, Mockito.never()).setSocketTimeout(Mockito.anyInt());
+        Mockito.verify(this.conn).close();
+    }
+
+    @Test
+    public void testEndOfInputNoTimeout() throws Exception {
+
+        Mockito.when(this.conn.getSocketTimeout()).thenReturn(0);
+
+        this.protocolHandler.endOfInput(this.conn);
+
+        Mockito.verify(this.conn).setSocketTimeout(1000);
+        Mockito.verify(this.conn).close();
+    }
+
+    @Test
     public void testTimeoutActiveConnection() throws Exception {
-        final State state = new HttpAsyncService.State();
+        final State state = new State();
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
         Mockito.when(this.conn.getStatus()).thenReturn(NHttpClientConnection.ACTIVE, NHttpClientConnection.CLOSED);
 
@@ -952,7 +1332,7 @@ public class TestHttpAsyncService {
 
     @Test
     public void testTimeoutActiveConnectionBufferedData() throws Exception {
-        final State state = new HttpAsyncService.State();
+        final State state = new State();
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
         Mockito.when(this.conn.getStatus()).thenReturn(NHttpClientConnection.ACTIVE, NHttpClientConnection.CLOSING);
 
@@ -964,9 +1344,16 @@ public class TestHttpAsyncService {
 
     @Test
     public void testTimeoutClosingConnection() throws Exception {
-        final State state = new HttpAsyncService.State();
-        state.setRequestConsumer(this.requestConsumer);
-        state.setResponseProducer(this.responseProducer);
+        final State state = new State();
+        final HttpContext exchangeContext = new BasicHttpContext();
+        final HttpRequest request = new BasicHttpRequest("GET", "/", HttpVersion.HTTP_1_1);
+        final Incoming incoming = new Incoming(
+                request, this.requestHandler, this.requestConsumer, exchangeContext);
+        state.setIncoming(incoming);
+        final HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
+        final Outgoing outgoing = new Outgoing(
+                request, response, this.responseProducer, exchangeContext);
+        state.setOutgoing(outgoing);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
         Mockito.when(this.conn.getStatus()).thenReturn(NHttpClientConnection.CLOSING);
 



Mime
View raw message