hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r1166291 - in /httpcomponents/httpcore/trunk/httpcore-nio/src: examples/org/apache/http/examples/nio/ main/java/org/apache/http/nio/protocol/
Date Wed, 07 Sep 2011 18:12:13 GMT
Author: olegk
Date: Wed Sep  7 18:12:13 2011
New Revision: 1166291

URL: http://svn.apache.org/viewvc?rev=1166291&view=rev
Log:
Updated the reverse proxy example; fixed a minor bug in HttpAsyncServiceHandler

Modified:
    httpcomponents/httpcore/trunk/httpcore-nio/src/examples/org/apache/http/examples/nio/NHttpReverseProxy.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseTrigger.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/examples/org/apache/http/examples/nio/NHttpReverseProxy.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/examples/org/apache/http/examples/nio/NHttpReverseProxy.java?rev=1166291&r1=1166290&r2=1166291&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/examples/org/apache/http/examples/nio/NHttpReverseProxy.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/examples/org/apache/http/examples/nio/NHttpReverseProxy.java Wed Sep  7 18:12:13 2011
@@ -29,47 +29,65 @@ package org.apache.http.examples.nio;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.nio.ByteBuffer;
+import java.util.Locale;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.http.ConnectionReuseStrategy;
-import org.apache.http.HttpConnection;
 import org.apache.http.HttpEntityEnclosingRequest;
 import org.apache.http.HttpException;
 import org.apache.http.HttpHost;
 import org.apache.http.HttpRequest;
 import org.apache.http.HttpRequestInterceptor;
 import org.apache.http.HttpResponse;
-import org.apache.http.HttpResponseFactory;
 import org.apache.http.HttpResponseInterceptor;
 import org.apache.http.HttpStatus;
 import org.apache.http.HttpVersion;
-import org.apache.http.ProtocolVersion;
+import org.apache.http.entity.ContentType;
 import org.apache.http.impl.DefaultConnectionReuseStrategy;
-import org.apache.http.impl.DefaultHttpResponseFactory;
+import org.apache.http.impl.EnglishReasonPhraseCatalog;
 import org.apache.http.impl.nio.DefaultClientIODispatch;
 import org.apache.http.impl.nio.DefaultServerIODispatch;
+import org.apache.http.impl.nio.pool.BasicNIOConnFactory;
+import org.apache.http.impl.nio.pool.BasicNIOConnPool;
+import org.apache.http.impl.nio.pool.BasicNIOPoolEntry;
 import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
 import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
 import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.message.BasicHttpEntityEnclosingRequest;
+import org.apache.http.message.BasicHttpRequest;
+import org.apache.http.message.BasicHttpResponse;
 import org.apache.http.nio.ContentDecoder;
 import org.apache.http.nio.ContentEncoder;
 import org.apache.http.nio.IOControl;
 import org.apache.http.nio.NHttpClientConnection;
-import org.apache.http.nio.NHttpClientHandler;
 import org.apache.http.nio.NHttpConnection;
 import org.apache.http.nio.NHttpServerConnection;
-import org.apache.http.nio.NHttpServiceHandler;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.nio.pool.NIOConnFactory;
+import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
+import org.apache.http.nio.protocol.HttpAsyncClientProtocolHandler;
+import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
+import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
+import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
+import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry;
+import org.apache.http.nio.protocol.HttpAsyncRequestHandlerResolver;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
+import org.apache.http.nio.protocol.HttpAsyncResponseProducer;
+import org.apache.http.nio.protocol.HttpAsyncResponseTrigger;
+import org.apache.http.nio.protocol.HttpAsyncServiceHandler;
 import org.apache.http.nio.reactor.ConnectingIOReactor;
 import org.apache.http.nio.reactor.IOEventDispatch;
 import org.apache.http.nio.reactor.ListeningIOReactor;
 import org.apache.http.params.CoreConnectionPNames;
-import org.apache.http.params.DefaultedHttpParams;
-import org.apache.http.params.HttpParams;
 import org.apache.http.params.CoreProtocolPNames;
+import org.apache.http.params.HttpParams;
 import org.apache.http.params.SyncBasicHttpParams;
-import org.apache.http.protocol.HTTP;
-import org.apache.http.protocol.HttpContext;
+import org.apache.http.pool.PoolStats;
 import org.apache.http.protocol.ExecutionContext;
+import org.apache.http.protocol.HttpContext;
 import org.apache.http.protocol.HttpProcessor;
 import org.apache.http.protocol.ImmutableHttpProcessor;
 import org.apache.http.protocol.RequestConnControl;
@@ -83,35 +101,33 @@ import org.apache.http.protocol.Response
 import org.apache.http.protocol.ResponseServer;
 
 /**
- * Rudimentary HTTP/1.1 reverse proxy based on the non-blocking I/O model.
- * <p>
- * Please note the purpose of this application is demonstrate the usage of HttpCore APIs.
- * It is NOT intended to demonstrate the most efficient way of building an HTTP reverse proxy.
- *
- *
+ * Elemental HTTP/1.1 reverse proxy based on the non-blocking I/O model.
  */
 public class NHttpReverseProxy {
 
     public static void main(String[] args) throws Exception {
-
         if (args.length < 1) {
             System.out.println("Usage: NHttpReverseProxy <hostname> [port]");
             System.exit(1);
         }
-        String hostname = args[0];
-        int port = 80;
+        URI uri = new URI(args[0]);
+        int port = 8080;
         if (args.length > 1) {
             port = Integer.parseInt(args[1]);
         }
 
         // Target host
-        HttpHost targetHost = new HttpHost(hostname, port);
+        HttpHost targetHost = new HttpHost(
+                uri.getHost(),
+                uri.getPort() > 0 ? uri.getPort() : 80,
+                uri.getScheme() != null ? uri.getScheme() : "http");
+
+        System.out.println("Reverse proxy to " + targetHost);
 
         HttpParams params = new SyncBasicHttpParams();
         params
             .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 30000)
             .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
-            .setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false)
             .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
             .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "Test/1.1")
             .setParameter(CoreProtocolPNames.USER_AGENT, "Test/1.1");
@@ -123,41 +139,43 @@ public class NHttpReverseProxy {
 
         // Set up HTTP protocol processor for incoming connections
         HttpProcessor inhttpproc = new ImmutableHttpProcessor(
+                new HttpResponseInterceptor[] {
+                        new ResponseDate(),
+                        new ResponseServer(),
+                        new ResponseContent(),
+                        new ResponseConnControl()
+         });
+
+        // Set up HTTP protocol processor for outgoing connections
+        HttpProcessor outhttpproc = new ImmutableHttpProcessor(
                 new HttpRequestInterceptor[] {
                         new RequestContent(),
                         new RequestTargetHost(),
                         new RequestConnControl(),
                         new RequestUserAgent(),
                         new RequestExpectContinue()
-         });
-
-        // Set up HTTP protocol processor for outgoing connections
-        HttpProcessor outhttpproc = new ImmutableHttpProcessor(
-                new HttpResponseInterceptor[] {
-                        new ResponseDate(),
-                        new ResponseServer(),
-                        new ResponseContent(),
-                        new ResponseConnControl()
         });
 
-        NHttpClientHandler connectingHandler = new ConnectingHandler(
-                inhttpproc,
-                new DefaultConnectionReuseStrategy(),
-                params);
-
-        NHttpServiceHandler listeningHandler = new ListeningHandler(
-                targetHost,
-                connectingIOReactor,
-                outhttpproc,
-                new DefaultHttpResponseFactory(),
-                new DefaultConnectionReuseStrategy(),
-                params);
+        ProxyClientProtocolHandler clientHandler = new ProxyClientProtocolHandler();
+        HttpAsyncRequestExecutor executor = new HttpAsyncRequestExecutor(
+                outhttpproc, new ProxyOutgoingConnectionReuseStrategy(), params);
+
+        ProxyConnPool connPool = new ProxyConnPool(connectingIOReactor,
+                new BasicNIOConnFactory(new LoggingClientConnectionFactory(params)), params);
+        connPool.setMaxTotal(100);
+        connPool.setDefaultMaxPerRoute(20);
+
+        HttpAsyncRequestHandlerRegistry handlerRegistry = new HttpAsyncRequestHandlerRegistry();
+        handlerRegistry.register("*", new ProxyRequestHandler(targetHost, executor, connPool));
+
+        ProxyServiceHandler serviceHandler = new ProxyServiceHandler(
+                handlerRegistry, inhttpproc, new ProxyIncomingConnectionReuseStrategy(), params);
 
         final IOEventDispatch connectingEventDispatch = new DefaultClientIODispatch(
-                connectingHandler, params);
+                clientHandler, new LoggingClientConnectionFactory(params));
 
         final IOEventDispatch listeningEventDispatch = new DefaultServerIODispatch(
-                listeningHandler, params);
+                serviceHandler, new LoggingServerConnectionFactory(params));
 
         Thread t = new Thread(new Runnable() {
 
@@ -166,841 +184,654 @@ public class NHttpReverseProxy {
                     connectingIOReactor.execute(connectingEventDispatch);
                 } catch (InterruptedIOException ex) {
                     System.err.println("Interrupted");
-                } catch (IOException e) {
-                    System.err.println("I/O error: " + e.getMessage());
+                } catch (IOException ex) {
+                    ex.printStackTrace();
+                } finally {
+                    try {
+                        listeningIOReactor.shutdown();
+                    } catch (IOException ex2) {
+                        ex2.printStackTrace();
+                    }
                 }
             }
 
         });
         t.start();
-
         try {
-            listeningIOReactor.listen(new InetSocketAddress(8888));
+            listeningIOReactor.listen(new InetSocketAddress(port));
             listeningIOReactor.execute(listeningEventDispatch);
         } catch (InterruptedIOException ex) {
             System.err.println("Interrupted");
-        } catch (IOException e) {
-            System.err.println("I/O error: " + e.getMessage());
+        } catch (IOException ex) {
+            ex.printStackTrace();
+        } finally {
+            try {
+                connectingIOReactor.shutdown();
+            } catch (IOException ex2) {
+                ex2.printStackTrace();
+            }
         }
     }
 
-    static class ListeningHandler implements NHttpServiceHandler {
+    static class ProxyHttpExchange {
 
-        private final HttpHost targetHost;
-        private final ConnectingIOReactor connectingIOReactor;
-        private final HttpProcessor httpProcessor;
-        private final HttpResponseFactory responseFactory;
-        private final ConnectionReuseStrategy connStrategy;
-        private final HttpParams params;
-
-        public ListeningHandler(
-                final HttpHost targetHost,
-                final ConnectingIOReactor connectingIOReactor,
-                final HttpProcessor httpProcessor,
-                final HttpResponseFactory responseFactory,
-                final ConnectionReuseStrategy connStrategy,
-                final HttpParams params) {
+        private final ByteBuffer inBuffer;
+        private final ByteBuffer outBuffer;
+
+        private volatile String id;
+        private volatile HttpHost target;
+        private volatile HttpAsyncResponseTrigger responseTrigger;
+        private volatile IOControl originIOControl;
+        private volatile IOControl clientIOControl;
+        private volatile HttpRequest request;
+        private volatile boolean requestReceived;
+        private volatile HttpResponse response;
+        private volatile boolean responseReceived;
+        private volatile Exception ex;
+
+        public ProxyHttpExchange() {
             super();
-            this.targetHost = targetHost;
-            this.connectingIOReactor = connectingIOReactor;
-            this.httpProcessor = httpProcessor;
-            this.connStrategy = connStrategy;
-            this.responseFactory = responseFactory;
-            this.params = params;
+            this.inBuffer = ByteBuffer.allocateDirect(10240);
+            this.outBuffer = ByteBuffer.allocateDirect(10240);
         }
 
-        public void connected(final NHttpServerConnection conn) {
-            System.out.println(conn + " [client->proxy] conn open");
-
-            ProxyTask proxyTask = new ProxyTask();
+        public ByteBuffer getInBuffer() {
+            return this.inBuffer;
+        }
 
-            synchronized (proxyTask) {
+        public ByteBuffer getOutBuffer() {
+            return this.outBuffer;
+        }
 
-                // Initialize connection state
-                proxyTask.setTarget(this.targetHost);
-                proxyTask.setClientIOControl(conn);
-                proxyTask.setClientState(ConnState.CONNECTED);
+        public String getId() {
+            return this.id;
+        }
 
-                HttpContext context = conn.getContext();
-                context.setAttribute(ProxyTask.ATTRIB, proxyTask);
+        public void setId(final String id) {
+            this.id = id;
+        }
 
-                InetSocketAddress address = new InetSocketAddress(
-                        this.targetHost.getHostName(),
-                        this.targetHost.getPort());
+        public HttpHost getTarget() {
+            return this.target;
+        }
 
-                this.connectingIOReactor.connect(
-                        address,
-                        null,
-                        proxyTask,
-                        null);
-            }
+        public void setTarget(final HttpHost target) {
+            this.target = target;
         }
 
-        public void requestReceived(final NHttpServerConnection conn) {
-            System.out.println(conn + " [client->proxy] request received");
+        public HttpRequest getRequest() {
+            return this.request;
+        }
 
-            HttpContext context = conn.getContext();
-            ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
+        public void setRequest(final HttpRequest request) {
+            this.request = request;
+        }
 
-            synchronized (proxyTask) {
-                ConnState connState = proxyTask.getClientState();
-                if (connState != ConnState.IDLE
-                        && connState != ConnState.CONNECTED) {
-                    throw new IllegalStateException("Illegal client connection state: " + connState);
-                }
+        public HttpResponse getResponse() {
+            return this.response;
+        }
 
-                try {
+        public void setResponse(final HttpResponse response) {
+            this.response = response;
+        }
 
-                    HttpRequest request = conn.getHttpRequest();
+        public HttpAsyncResponseTrigger getResponseTrigger() {
+            return this.responseTrigger;
+        }
 
-                    System.out.println(conn + " [client->proxy] >> " + request.getRequestLine());
+        public void setResponseTrigger(final HttpAsyncResponseTrigger responseTrigger) {
+            this.responseTrigger = responseTrigger;
+        }
 
-                    ProtocolVersion ver = request.getRequestLine().getProtocolVersion();
-                    if (!ver.lessEquals(HttpVersion.HTTP_1_1)) {
-                        // Downgrade protocol version if greater than HTTP/1.1
-                        ver = HttpVersion.HTTP_1_1;
-                    }
+        public IOControl getClientIOControl() {
+            return this.clientIOControl;
+        }
 
-                    // Update connection state
-                    proxyTask.setRequest(request);
-                    proxyTask.setClientState(ConnState.REQUEST_RECEIVED);
-
-                    // See if the client expects a 100-Continue
-                    if (request instanceof HttpEntityEnclosingRequest) {
-                        if (((HttpEntityEnclosingRequest) request).expectContinue()) {
-                            HttpResponse ack = this.responseFactory.newHttpResponse(
-                                    ver,
-                                    HttpStatus.SC_CONTINUE,
-                                    context);
-                            conn.submitResponse(ack);
-                        }
-                    } else {
-                        // No request content expected. Suspend client input
-                        conn.suspendInput();
-                    }
+        public void setClientIOControl(final IOControl clientIOControl) {
+            this.clientIOControl = clientIOControl;
+        }
 
-                    // If there is already a connection to the origin server
-                    // make sure origin output is active
-                    if (proxyTask.getOriginIOControl() != null) {
-                        proxyTask.getOriginIOControl().requestOutput();
-                    }
+        public IOControl getOriginIOControl() {
+            return this.originIOControl;
+        }
 
-                } catch (IOException ex) {
-                    shutdownConnection(conn);
-                } catch (HttpException ex) {
-                    shutdownConnection(conn);
-                }
-            }
+        public void setOriginIOControl(final IOControl originIOControl) {
+            this.originIOControl = originIOControl;
         }
 
-        public void inputReady(final NHttpServerConnection conn, final ContentDecoder decoder) {
-            System.out.println(conn + " [client->proxy] input ready");
+        public boolean isRequestReceived() {
+            return this.requestReceived;
+        }
 
-            HttpContext context = conn.getContext();
-            ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
+        public void setRequestReceived() {
+            this.requestReceived = true;
+        }
 
-            synchronized (proxyTask) {
-                ConnState connState = proxyTask.getClientState();
-                if (connState != ConnState.REQUEST_RECEIVED
-                        && connState != ConnState.REQUEST_BODY_STREAM) {
-                    throw new IllegalStateException("Illegal client connection state: " + connState);
-                }
+        public boolean isResponseReceived() {
+            return this.responseReceived;
+        }
 
-                try {
+        public void setResponseReceived() {
+            this.responseReceived = true;
+        }
 
-                    ByteBuffer dst = proxyTask.getInBuffer();
-                    int bytesRead = decoder.read(dst);
-                    System.out.println(conn + " [client->proxy] " + bytesRead + " bytes read");
-                    System.out.println(conn + " [client->proxy] " + decoder);
-                    if (!dst.hasRemaining()) {
-                        // Input buffer is full. Suspend client input
-                        // until the origin handler frees up some space in the buffer
-                        conn.suspendInput();
-                    }
-                    // If there is some content in the input buffer make sure origin
-                    // output is active
-                    if (dst.position() > 0) {
-                        if (proxyTask.getOriginIOControl() != null) {
-                            proxyTask.getOriginIOControl().requestOutput();
-                        }
-                    }
+        public Exception getException() {
+            return this.ex;
+        }
 
-                    if (decoder.isCompleted()) {
-                        System.out.println(conn + " [client->proxy] request body received");
-                        // Update connection state
-                        proxyTask.setClientState(ConnState.REQUEST_BODY_DONE);
-                        // Suspend client input
-                        conn.suspendInput();
-                    } else {
-                        proxyTask.setClientState(ConnState.REQUEST_BODY_STREAM);
-                    }
+        public void setException(final Exception ex) {
+            this.ex = ex;
+        }
 
-                } catch (IOException ex) {
-                    shutdownConnection(conn);
-                }
-            }
+        public void reset() {
+            this.inBuffer.clear();
+            this.outBuffer.clear();
+            this.target = null;
+            this.id = null;
+            this.responseTrigger = null;
+            this.clientIOControl = null;
+            this.originIOControl = null;
+            this.request = null;
+            this.requestReceived = false;
+            this.response = null;
+            this.responseReceived = false;
+            this.ex = null;
         }
 
-        public void responseReady(final NHttpServerConnection conn) {
-            System.out.println(conn + " [client<-proxy] response ready");
+    }
 
-            HttpContext context = conn.getContext();
-            ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
+    static class ProxyRequestHandler implements HttpAsyncRequestHandler<ProxyHttpExchange> {
+
+        private final HttpHost target;
+        private final HttpAsyncRequestExecutor executor;
+        private final BasicNIOConnPool connPool;
+        private final AtomicLong counter;
+
+        public ProxyRequestHandler(
+                final HttpHost target,
+                final HttpAsyncRequestExecutor executor,
+                final BasicNIOConnPool connPool) {
+            super();
+            this.target = target;
+            this.executor = executor;
+            this.connPool = connPool;
+            this.counter = new AtomicLong(1);
+        }
+
+        public HttpAsyncRequestConsumer<ProxyHttpExchange> processRequest(
+                final HttpRequest request,
+                final HttpContext context) {
+            ProxyHttpExchange httpExchange = (ProxyHttpExchange) context.getAttribute("http-exchange");
+            if (httpExchange == null) {
+                httpExchange = new ProxyHttpExchange();
+                context.setAttribute("http-exchange", httpExchange);
+            }
+            synchronized (httpExchange) {
+                httpExchange.reset();
+                String id = String.format("%08X", this.counter.getAndIncrement());
+                httpExchange.setId(id);
+                httpExchange.setTarget(this.target);
+                return new ProxyRequestConsumer(httpExchange, this.executor, this.connPool);
+            }
+        }
 
-            synchronized (proxyTask) {
-                ConnState connState = proxyTask.getClientState();
-                if (connState == ConnState.IDLE) {
-                    // Response not available
+        public void handle(
+                final ProxyHttpExchange httpExchange,
+                final HttpAsyncResponseTrigger responseTrigger,
+                final HttpContext context) throws HttpException, IOException {
+            synchronized (httpExchange) {
+                Exception ex = httpExchange.getException();
+                if (ex != null) {
+                    System.out.println("[client<-proxy] " + httpExchange.getId() + " " + ex);
+                    int status = HttpStatus.SC_INTERNAL_SERVER_ERROR;
+                    HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status,
+                            EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US));
+                    String message = ex.getMessage();
+                    if (message == null) {
+                        message = "Unexpected error";
+                    }
+                    response.setEntity(NStringEntity.create(message, ContentType.DEFAULT_TEXT));
+                    responseTrigger.submitResponse(new BasicAsyncResponseProducer(response));
+                    System.out.println("[client<-proxy] " + httpExchange.getId() + " error response triggered");
                     return;
                 }
-                if (connState != ConnState.REQUEST_RECEIVED
-                        && connState != ConnState.REQUEST_BODY_DONE) {
-                    throw new IllegalStateException("Illegal client connection state: " + connState);
-                }
-
-                try {
-
-                    HttpRequest request = proxyTask.getRequest();
-                    HttpResponse response = proxyTask.getResponse();
-                    if (response == null) {
-                        throw new IllegalStateException("HTTP request is null");
-                    }
-                    // Remove hop-by-hop headers
-                    response.removeHeaders(HTTP.CONTENT_LEN);
-                    response.removeHeaders(HTTP.TRANSFER_ENCODING);
-                    response.removeHeaders(HTTP.CONN_DIRECTIVE);
-                    response.removeHeaders("Keep-Alive");
-                    response.removeHeaders("Proxy-Authenticate");
-                    response.removeHeaders("Proxy-Authorization");
-                    response.removeHeaders("TE");
-                    response.removeHeaders("Trailers");
-                    response.removeHeaders("Upgrade");
-
-                    response.setParams(
-                            new DefaultedHttpParams(response.getParams(), this.params));
-
-                    // Close client connection if the connection to the target
-                    // is no longer active / open
-                    if (proxyTask.getOriginState().compareTo(ConnState.CLOSING) >= 0) {
-                        response.addHeader(HTTP.CONN_DIRECTIVE, "Close");
-                    }
-
-                    // Pre-process HTTP request
-                    context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
-                    context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
-                    this.httpProcessor.process(response, context);
-
-                    conn.submitResponse(response);
-
-                    proxyTask.setClientState(ConnState.RESPONSE_SENT);
-
-                    System.out.println(conn + " [client<-proxy] << " + response.getStatusLine());
-
-                    if (!canResponseHaveBody(request, response)) {
-                        conn.resetInput();
-                        if (!this.connStrategy.keepAlive(response, context)) {
-                            System.out.println(conn + " [client<-proxy] close connection");
-                            proxyTask.setClientState(ConnState.CLOSING);
-                            conn.close();
-                        } else {
-                            // Reset connection state
-                            proxyTask.reset();
-                            conn.requestInput();
-                            // Ready to deal with a new request
-                        }
-                    }
-
-                } catch (IOException ex) {
-                    shutdownConnection(conn);
-                } catch (HttpException ex) {
-                    shutdownConnection(conn);
+                HttpResponse response = httpExchange.getResponse();
+                if (response != null) {
+                    responseTrigger.submitResponse(new ProxyResponseProducer(httpExchange));
+                    System.out.println("[client<-proxy] " + httpExchange.getId() + " response triggered");
+                    return;
                 }
+                // No response yet.
+                httpExchange.setResponseTrigger(responseTrigger);
             }
         }
 
-        private boolean canResponseHaveBody(
-                final HttpRequest request, final HttpResponse response) {
+    }
 
-            if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
-                return false;
-            }
+    static class ProxyRequestConsumer implements HttpAsyncRequestConsumer<ProxyHttpExchange> {
 
-            int status = response.getStatusLine().getStatusCode();
-            return status >= HttpStatus.SC_OK
-                && status != HttpStatus.SC_NO_CONTENT
-                && status != HttpStatus.SC_NOT_MODIFIED
-                && status != HttpStatus.SC_RESET_CONTENT;
+        private final ProxyHttpExchange httpExchange;
+        private final HttpAsyncRequestExecutor executor;
+        private final BasicNIOConnPool connPool;
+
+        private volatile boolean completed;
+
+        public ProxyRequestConsumer(
+                final ProxyHttpExchange httpExchange,
+                final HttpAsyncRequestExecutor executor,
+                final BasicNIOConnPool connPool) {
+            super();
+            this.httpExchange = httpExchange;
+            this.executor = executor;
+            this.connPool = connPool;
         }
 
-        public void outputReady(final NHttpServerConnection conn, final ContentEncoder encoder) {
-            System.out.println(conn + " [client<-proxy] output ready");
+        public void close() throws IOException {
+        }
 
-            HttpContext context = conn.getContext();
-            ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
+        public void requestReceived(final HttpRequest request) {
+            synchronized (this.httpExchange) {
+                System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + request.getRequestLine());
+                this.httpExchange.setRequest(request);
+                this.executor.execute(
+                        new ProxyRequestProducer(this.httpExchange),
+                        new ProxyResponseConsumer(this.httpExchange),
+                        this.connPool);
+            }
+        }
 
-            synchronized (proxyTask) {
-                ConnState connState = proxyTask.getClientState();
-                if (connState != ConnState.RESPONSE_SENT
-                        && connState != ConnState.RESPONSE_BODY_STREAM) {
-                    throw new IllegalStateException("Illegal client connection state: " + connState);
+        public void consumeContent(
+                final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
+            synchronized (this.httpExchange) {
+                this.httpExchange.setClientIOControl(ioctrl);
+                // Receive data from the client
+                ByteBuffer buf = this.httpExchange.getInBuffer();
+                int n = decoder.read(buf);
+                System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + n + " bytes read");
+                if (decoder.isCompleted()) {
+                    System.out.println("[client->proxy] " + this.httpExchange.getId() + " content fully read");
                 }
-
-                HttpResponse response = proxyTask.getResponse();
-                if (response == null) {
-                    throw new IllegalStateException("HTTP request is null");
+                // If the buffer is full, suspend client input until there is free
+                // space in the buffer
+                if (!buf.hasRemaining()) {
+                    ioctrl.suspendInput();
                 }
-
-                try {
-
-                    ByteBuffer src = proxyTask.getOutBuffer();
-                    src.flip();
-                    int bytesWritten = encoder.write(src);
-                    System.out.println(conn + " [client<-proxy] " + bytesWritten + " bytes written");
-                    System.out.println(conn + " [client<-proxy] " + encoder);
-                    src.compact();
-
-                    if (src.position() == 0) {
-
-                        if (proxyTask.getOriginState() == ConnState.RESPONSE_BODY_DONE) {
-                            encoder.complete();
-                        } else {
-                            // Input output is empty. Wait until the origin handler
-                            // fills up the buffer
-                            conn.suspendOutput();
-                        }
-                    }
-
-                    // Update connection state
-                    if (encoder.isCompleted()) {
-                        System.out.println(conn + " [proxy] response body sent");
-                        proxyTask.setClientState(ConnState.RESPONSE_BODY_DONE);
-                        if (!this.connStrategy.keepAlive(response, context)) {
-                            System.out.println(conn + " [client<-proxy] close connection");
-                            proxyTask.setClientState(ConnState.CLOSING);
-                            conn.close();
-                        } else {
-                            // Reset connection state
-                            proxyTask.reset();
-                            conn.requestInput();
-                            // Ready to deal with a new request
-                        }
-                    } else {
-                        proxyTask.setClientState(ConnState.RESPONSE_BODY_STREAM);
-                        // Make sure origin input is active
-                        proxyTask.getOriginIOControl().requestInput();
+                // If there is some content in the input buffer make sure origin
+                // output is active
+                if (buf.position() > 0) {
+                    if (this.httpExchange.getOriginIOControl() != null) {
+                        this.httpExchange.getOriginIOControl().requestOutput();
                     }
-
-                } catch (IOException ex) {
-                    shutdownConnection(conn);
                 }
             }
         }
 
-        public void closed(final NHttpServerConnection conn) {
-            System.out.println(conn + " [client->proxy] conn closed");
-            HttpContext context = conn.getContext();
-            ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
-
-            if (proxyTask != null) {
-                synchronized (proxyTask) {
-                    proxyTask.setClientState(ConnState.CLOSED);
-                }
+        public void requestCompleted(final HttpContext context) {
+            synchronized (this.httpExchange) {
+                this.completed = true;;
+                System.out.println("[client->proxy] " + this.httpExchange.getId() + " request completed");
+                this.httpExchange.setRequestReceived();
             }
         }
 
-        public void exception(final NHttpServerConnection conn, final HttpException httpex) {
-            System.out.println(conn + " [client->proxy] HTTP error: " + httpex.getMessage());
-
-            if (conn.isResponseSubmitted()) {
-                shutdownConnection(conn);
-                return;
-            }
-
-            HttpContext context = conn.getContext();
-
-            try {
-                HttpResponse response = this.responseFactory.newHttpResponse(
-                        HttpVersion.HTTP_1_0, HttpStatus.SC_BAD_REQUEST, context);
-                response.setParams(
-                        new DefaultedHttpParams(this.params, response.getParams()));
-                response.addHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);
-                // Pre-process HTTP request
-                context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
-                context.setAttribute(ExecutionContext.HTTP_REQUEST, null);
-                this.httpProcessor.process(response, context);
-
-                conn.submitResponse(response);
-
-                conn.close();
-
-            } catch (IOException ex) {
-                shutdownConnection(conn);
-            } catch (HttpException ex) {
-                shutdownConnection(conn);
-            }
-        }
-
-        public void exception(final NHttpServerConnection conn, final IOException ex) {
-            shutdownConnection(conn);
-            System.out.println(conn + " [client->proxy] I/O error: " + ex.getMessage());
-        }
-
-        public void timeout(final NHttpServerConnection conn) {
-            System.out.println(conn + " [client->proxy] timeout");
-            closeConnection(conn);
+        public Exception getException() {
+            return null;
         }
 
-        private void shutdownConnection(final NHttpConnection conn) {
-            try {
-                conn.shutdown();
-            } catch (IOException ignore) {
-            }
+        public ProxyHttpExchange getResult() {
+            return this.httpExchange;
         }
 
-        private void closeConnection(final NHttpConnection conn) {
-            try {
-                conn.close();
-            } catch (IOException ignore) {
-            }
+        public boolean isDone() {
+            return this.completed;
         }
 
     }
 
-    static class ConnectingHandler implements NHttpClientHandler {
+    static class ProxyRequestProducer implements HttpAsyncRequestProducer {
 
-        private final HttpProcessor httpProcessor;
-        private final ConnectionReuseStrategy connStrategy;
-        private final HttpParams params;
+        private final ProxyHttpExchange httpExchange;
 
-        public ConnectingHandler(
-                final HttpProcessor httpProcessor,
-                final ConnectionReuseStrategy connStrategy,
-                final HttpParams params) {
+        public ProxyRequestProducer(final ProxyHttpExchange httpExchange) {
             super();
-            this.httpProcessor = httpProcessor;
-            this.connStrategy = connStrategy;
-            this.params = params;
+            this.httpExchange = httpExchange;
         }
 
-        public void connected(final NHttpClientConnection conn, final Object attachment) {
-            System.out.println(conn + " [proxy->origin] conn open");
+        public void close() throws IOException {
+        }
 
-            // The shared state object is expected to be passed as an attachment
-            ProxyTask proxyTask = (ProxyTask) attachment;
+        public HttpHost getTarget() {
+            synchronized (this.httpExchange) {
+                return this.httpExchange.getTarget();
+            }
+        }
 
-            synchronized (proxyTask) {
-                ConnState connState = proxyTask.getOriginState();
-                if (connState != ConnState.IDLE) {
-                    throw new IllegalStateException("Illegal target connection state: " + connState);
+        public HttpRequest generateRequest() {
+            synchronized (this.httpExchange) {
+                HttpRequest request = this.httpExchange.getRequest();
+                System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + request.getRequestLine());
+                // Rewrite request!!!!
+                if (request instanceof HttpEntityEnclosingRequest) {
+                    BasicHttpEntityEnclosingRequest r = new BasicHttpEntityEnclosingRequest(
+                            request.getRequestLine());
+                    r.setEntity(((HttpEntityEnclosingRequest) request).getEntity());
+                    return r;
+                } else {
+                    return new BasicHttpRequest(request.getRequestLine());
                 }
+            }
+        }
 
-                // Set origin IO control handle
-                proxyTask.setOriginIOControl(conn);
-                // Store the state object in the context
-                HttpContext context = conn.getContext();
-                context.setAttribute(ProxyTask.ATTRIB, proxyTask);
-                // Update connection state
-                proxyTask.setOriginState(ConnState.CONNECTED);
-
-                if (proxyTask.getRequest() != null) {
-                    conn.requestOutput();
+        public void produceContent(
+                final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
+            synchronized (this.httpExchange) {
+                this.httpExchange.setOriginIOControl(ioctrl);
+                // Send data to the origin server
+                ByteBuffer buf = this.httpExchange.getInBuffer();
+                buf.flip();
+                int n = encoder.write(buf);
+                buf.compact();
+                System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + n + " bytes written");
+                if (encoder.isCompleted()) {
+                    System.out.println("[proxy->origin] " + this.httpExchange.getId() + " content fully written");
+                }
+                // If there is space in the buffer and the message has not been
+                // transferred, make sure the client is sending more data
+                if (buf.hasRemaining() && !this.httpExchange.isRequestReceived()) {
+                    if (this.httpExchange.getClientIOControl() != null) {
+                        this.httpExchange.getClientIOControl().requestInput();
+                    }
+                }
+                if (buf.position() == 0) {
+                    if (this.httpExchange.isRequestReceived()) {
+                        encoder.complete();
+                    } else {
+                        // Input buffer is empty. Wait until the client fills up
+                        // the buffer
+                        ioctrl.suspendOutput();
+                    }
                 }
             }
         }
 
-        public void requestReady(final NHttpClientConnection conn) {
-            System.out.println(conn + " [proxy->origin] request ready");
-
-            HttpContext context = conn.getContext();
-            ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
+        public boolean isRepeatable() {
+            return false;
+        }
 
-            synchronized (proxyTask) {
-                ConnState connState = proxyTask.getOriginState();
-                if (connState == ConnState.REQUEST_SENT
-                        || connState == ConnState.REQUEST_BODY_DONE) {
-                    // Request sent but no response available yet
-                    return;
-                }
+        public void resetRequest() {
+        }
 
-                if (connState != ConnState.IDLE
-                        && connState != ConnState.CONNECTED) {
-                    throw new IllegalStateException("Illegal target connection state: " + connState);
-                }
-
-                HttpRequest request = proxyTask.getRequest();
-                if (request == null) {
-                    throw new IllegalStateException("HTTP request is null");
-                }
-
-                // Remove hop-by-hop headers
-                request.removeHeaders(HTTP.CONTENT_LEN);
-                request.removeHeaders(HTTP.TRANSFER_ENCODING);
-                request.removeHeaders(HTTP.CONN_DIRECTIVE);
-                request.removeHeaders("Keep-Alive");
-                request.removeHeaders("Proxy-Authenticate");
-                request.removeHeaders("Proxy-Authorization");
-                request.removeHeaders("TE");
-                request.removeHeaders("Trailers");
-                request.removeHeaders("Upgrade");
-                // Remove host header
-                request.removeHeaders(HTTP.TARGET_HOST);
+    }
 
-                HttpHost targetHost = proxyTask.getTarget();
+    static class ProxyResponseConsumer implements HttpAsyncResponseConsumer<ProxyHttpExchange> {
 
-                try {
+        private final ProxyHttpExchange httpExchange;
 
-                    request.setParams(
-                            new DefaultedHttpParams(request.getParams(), this.params));
+        private volatile boolean completed;
 
-                    // Pre-process HTTP request
-                    context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
-                    context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, targetHost);
-
-                    this.httpProcessor.process(request, context);
-                    // and send it to the origin server
-                    conn.submitRequest(request);
-                    // Update connection state
-                    proxyTask.setOriginState(ConnState.REQUEST_SENT);
+        public ProxyResponseConsumer(final ProxyHttpExchange httpExchange) {
+            super();
+            this.httpExchange = httpExchange;
+        }
 
-                    System.out.println(conn + " [proxy->origin] >> " + request.getRequestLine().toString());
+        public void close() throws IOException {
+        }
 
-                } catch (IOException ex) {
-                    shutdownConnection(conn);
-                } catch (HttpException ex) {
-                    shutdownConnection(conn);
+        public void responseReceived(final HttpResponse response) {
+            synchronized (this.httpExchange) {
+                System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + response.getStatusLine());
+                this.httpExchange.setResponse(response);
+                HttpAsyncResponseTrigger responseTrigger = this.httpExchange.getResponseTrigger();
+                if (responseTrigger != null && !responseTrigger.isTriggered()) {
+                    System.out.println("[client<-proxy] " + this.httpExchange.getId() + " response triggered");
+                    responseTrigger.submitResponse(new ProxyResponseProducer(this.httpExchange));
                 }
-
             }
         }
 
-        public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
-            System.out.println(conn + " [proxy->origin] output ready");
-
-            HttpContext context = conn.getContext();
-            ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
-
-            synchronized (proxyTask) {
-                ConnState connState = proxyTask.getOriginState();
-                if (connState != ConnState.REQUEST_SENT
-                        && connState != ConnState.REQUEST_BODY_STREAM) {
-                    throw new IllegalStateException("Illegal target connection state: " + connState);
+        public void consumeContent(
+                final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
+            synchronized (this.httpExchange) {
+                this.httpExchange.setOriginIOControl(ioctrl);
+                // Receive data from the origin
+                ByteBuffer buf = this.httpExchange.getOutBuffer();
+                int n = decoder.read(buf);
+                System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + n + " bytes read");
+                if (decoder.isCompleted()) {
+                    System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " content fully read");
                 }
-
-                try {
-
-                    ByteBuffer src = proxyTask.getInBuffer();
-                    src.flip();
-                    int bytesWritten = encoder.write(src);
-                    System.out.println(conn + " [proxy->origin] " + bytesWritten + " bytes written");
-                    System.out.println(conn + " [proxy->origin] " + encoder);
-                    src.compact();
-
-                    if (src.position() == 0) {
-                        if (proxyTask.getClientState() == ConnState.REQUEST_BODY_DONE) {
-                            encoder.complete();
-                        } else {
-                            // Input buffer is empty. Wait until the client fills up
-                            // the buffer
-                            conn.suspendOutput();
-                        }
-                    }
-                    // Update connection state
-                    if (encoder.isCompleted()) {
-                        System.out.println(conn + " [proxy->origin] request body sent");
-                        proxyTask.setOriginState(ConnState.REQUEST_BODY_DONE);
-                    } else {
-                        proxyTask.setOriginState(ConnState.REQUEST_BODY_STREAM);
-                        // Make sure client input is active
-                        proxyTask.getClientIOControl().requestInput();
+                // If the buffer is full, suspend origin input until there is free
+                // space in the buffer
+                if (!buf.hasRemaining()) {
+                    ioctrl.suspendInput();
+                }
+                // If there is some content in the input buffer make sure client
+                // output is active
+                if (buf.position() > 0) {
+                    if (this.httpExchange.getClientIOControl() != null) {
+                        this.httpExchange.getClientIOControl().requestOutput();
                     }
-
-                } catch (IOException ex) {
-                    shutdownConnection(conn);
                 }
             }
         }
 
-        public void responseReceived(final NHttpClientConnection conn) {
-            System.out.println(conn + " [proxy<-origin] response received");
-
-            HttpContext context = conn.getContext();
-            ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
-
-            synchronized (proxyTask) {
-                ConnState connState = proxyTask.getOriginState();
-                if (connState != ConnState.REQUEST_SENT
-                        && connState != ConnState.REQUEST_BODY_DONE) {
-                    throw new IllegalStateException("Illegal target connection state: " + connState);
+        public void responseCompleted(final HttpContext context) {
+            synchronized (this.httpExchange) {
+                if (this.completed) {
+                    return;
                 }
+                this.completed = true;
+                System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " response completed");
+                this.httpExchange.setResponseReceived();
+            }
+        }
 
-                HttpResponse response = conn.getHttpResponse();
-                HttpRequest request = proxyTask.getRequest();
-
-                System.out.println(conn + " [proxy<-origin] << " + response.getStatusLine());
-
-                int statusCode = response.getStatusLine().getStatusCode();
-                if (statusCode < HttpStatus.SC_OK) {
-                    // Ignore 1xx response
+        public void failed(Exception ex) {
+            synchronized (this.httpExchange) {
+                if (this.completed) {
                     return;
                 }
-                try {
-
-                    // Update connection state
-                    proxyTask.setResponse(response);
-                    proxyTask.setOriginState(ConnState.RESPONSE_RECEIVED);
-
-                    if (!canResponseHaveBody(request, response)) {
-                        conn.resetInput();
-                        if (!this.connStrategy.keepAlive(response, context)) {
-                            System.out.println(conn + " [proxy<-origin] close connection");
-                            proxyTask.setOriginState(ConnState.CLOSING);
-                            conn.close();
-                        }
+                this.completed = true;
+                this.httpExchange.setException(ex);
+                HttpAsyncResponseTrigger responseTrigger = this.httpExchange.getResponseTrigger();
+                if (responseTrigger != null && !responseTrigger.isTriggered()) {
+                    System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + ex);
+                    int status = HttpStatus.SC_INTERNAL_SERVER_ERROR;
+                    HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status,
+                            EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US));
+                    String message = ex.getMessage();
+                    if (message == null) {
+                        message = "Unexpected error";
                     }
-                    // Make sure client output is active
-                    proxyTask.getClientIOControl().requestOutput();
-
-                } catch (IOException ex) {
-                    shutdownConnection(conn);
+                    response.setEntity(NStringEntity.create(message, ContentType.DEFAULT_TEXT));
+                    responseTrigger.submitResponse(new BasicAsyncResponseProducer(response));
                 }
             }
-
         }
 
-        private boolean canResponseHaveBody(
-                final HttpRequest request, final HttpResponse response) {
-
-            if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
-                return false;
-            }
-
-            int status = response.getStatusLine().getStatusCode();
-            return status >= HttpStatus.SC_OK
-                && status != HttpStatus.SC_NO_CONTENT
-                && status != HttpStatus.SC_NOT_MODIFIED
-                && status != HttpStatus.SC_RESET_CONTENT;
+        public void cancel() {
+            failed(new InterruptedIOException("Cancelled"));
         }
 
-        public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
-            System.out.println(conn + " [proxy<-origin] input ready");
-
-            HttpContext context = conn.getContext();
-            ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
-
-            synchronized (proxyTask) {
-                ConnState connState = proxyTask.getOriginState();
-                if (connState != ConnState.RESPONSE_RECEIVED
-                        && connState != ConnState.RESPONSE_BODY_STREAM) {
-                    throw new IllegalStateException("Illegal target connection state: " + connState);
-                }
-
-                HttpResponse response = proxyTask.getResponse();
-                try {
-
-                    ByteBuffer dst = proxyTask.getOutBuffer();
-                    int bytesRead = decoder.read(dst);
-                    System.out.println(conn + " [proxy<-origin] " + bytesRead + " bytes read");
-                    System.out.println(conn + " [proxy<-origin] " + decoder);
-                    if (!dst.hasRemaining()) {
-                        // Output buffer is full. Suspend origin input until
-                        // the client handler frees up some space in the buffer
-                        conn.suspendInput();
-                    }
-                    // If there is some content in the buffer make sure client output
-                    // is active
-                    if (dst.position() > 0) {
-                        proxyTask.getClientIOControl().requestOutput();
-                    }
+        public ProxyHttpExchange getResult() {
+            return this.httpExchange;
+        }
 
-                    if (decoder.isCompleted()) {
-                        System.out.println(conn + " [proxy<-origin] response body received");
-                        proxyTask.setOriginState(ConnState.RESPONSE_BODY_DONE);
-
-                        if (!this.connStrategy.keepAlive(response, context)) {
-                            System.out.println(conn + " [proxy<-origin] close connection");
-                            proxyTask.setOriginState(ConnState.CLOSING);
-                            conn.close();
-                        }
-                    } else {
-                        proxyTask.setOriginState(ConnState.RESPONSE_BODY_STREAM);
-                    }
+        public Exception getException() {
+            return null;
+        }
 
-                } catch (IOException ex) {
-                    shutdownConnection(conn);
-                }
-            }
+        public boolean isDone() {
+            return this.completed;
         }
 
-        public void closed(final NHttpClientConnection conn) {
-            System.out.println(conn + " [proxy->origin] conn closed");
-            HttpContext context = conn.getContext();
-            ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
+    }
 
-            if (proxyTask != null) {
-                synchronized (proxyTask) {
-                    proxyTask.setOriginState(ConnState.CLOSED);
-                }
-            }
-        }
+    static class ProxyResponseProducer implements HttpAsyncResponseProducer {
 
-        public void exception(final NHttpClientConnection conn, final HttpException ex) {
-            shutdownConnection(conn);
-            System.out.println(conn + " [proxy->origin] HTTP error: " + ex.getMessage());
-        }
+        private final ProxyHttpExchange httpExchange;
 
-        public void exception(final NHttpClientConnection conn, final IOException ex) {
-            shutdownConnection(conn);
-            System.out.println(conn + " [proxy->origin] I/O error: " + ex.getMessage());
+        public ProxyResponseProducer(final ProxyHttpExchange httpExchange) {
+            super();
+            this.httpExchange = httpExchange;
         }
 
-        public void timeout(final NHttpClientConnection conn) {
-            System.out.println(conn + " [proxy->origin] timeout");
-            closeConnection(conn);
+        public void close() throws IOException {
+            this.httpExchange.reset();
         }
 
-        private void shutdownConnection(final HttpConnection conn) {
-            try {
-                conn.shutdown();
-            } catch (IOException ignore) {
+        public HttpResponse generateResponse() {
+            synchronized (this.httpExchange) {
+                HttpResponse response = this.httpExchange.getResponse();
+                System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + response.getStatusLine());
+                // Rewrite response!!!!
+                BasicHttpResponse r = new BasicHttpResponse(response.getStatusLine());
+                r.setEntity(response.getEntity());
+                return r;
             }
         }
 
-        private void closeConnection(final HttpConnection conn) {
-            try {
-                conn.shutdown();
-            } catch (IOException ignore) {
+        public void produceContent(
+                final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
+            synchronized (this.httpExchange) {
+                this.httpExchange.setClientIOControl(ioctrl);
+                // Send data to the client
+                ByteBuffer buf = this.httpExchange.getOutBuffer();
+                buf.flip();
+                int n = encoder.write(buf);
+                buf.compact();
+                System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + n + " bytes written");
+                if (encoder.isCompleted()) {
+                    System.out.println("[client<-proxy] " + this.httpExchange.getId() + " content fully written");
+                }
+                // If there is space in the buffer and the message has not been
+                // transferred, make sure the origin is sending more data
+                if (buf.hasRemaining() && !this.httpExchange.isResponseReceived()) {
+                    if (this.httpExchange.getOriginIOControl() != null) {
+                        this.httpExchange.getOriginIOControl().requestInput();
+                    }
+                }
+                if (buf.position() == 0) {
+                    if (this.httpExchange.isResponseReceived()) {
+                        encoder.complete();
+                    } else {
+                        // Input buffer is empty. Wait until the origin fills up
+                        // the buffer
+                        ioctrl.suspendOutput();
+                    }
+                }
             }
         }
 
     }
 
-    enum ConnState {
-        IDLE,
-        CONNECTED,
-        REQUEST_RECEIVED,
-        REQUEST_SENT,
-        REQUEST_BODY_STREAM,
-        REQUEST_BODY_DONE,
-        RESPONSE_RECEIVED,
-        RESPONSE_SENT,
-        RESPONSE_BODY_STREAM,
-        RESPONSE_BODY_DONE,
-        CLOSING,
-        CLOSED
-    }
+    static class ProxyIncomingConnectionReuseStrategy extends DefaultConnectionReuseStrategy {
 
-    static class ProxyTask {
-
-        public static final String ATTRIB = "nhttp.proxy-task";
-
-        private final ByteBuffer inBuffer;
-        private final ByteBuffer outBuffer;
-
-        private HttpHost target;
-
-        private IOControl originIOControl;
-        private IOControl clientIOControl;
-
-        private ConnState originState;
-        private ConnState clientState;
+        @Override
+        public boolean keepAlive(final HttpResponse response, final HttpContext context) {
+            NHttpConnection conn = (NHttpConnection) context.getAttribute(
+                    ExecutionContext.HTTP_CONNECTION);
+            boolean keepAlive = super.keepAlive(response, context);
+            if (keepAlive) {
+                System.out.println("[client->proxy] connection kept alive " + conn);
+            }
+            return keepAlive;
+        }
 
-        private HttpRequest request;
-        private HttpResponse response;
+    };
 
-        public ProxyTask() {
-            super();
-            this.originState = ConnState.IDLE;
-            this.clientState = ConnState.IDLE;
-            this.inBuffer = ByteBuffer.allocateDirect(10240);
-            this.outBuffer = ByteBuffer.allocateDirect(10240);
-        }
+    static class ProxyOutgoingConnectionReuseStrategy extends DefaultConnectionReuseStrategy {
 
-        public ByteBuffer getInBuffer() {
-            return this.inBuffer;
+        @Override
+        public boolean keepAlive(final HttpResponse response, final HttpContext context) {
+            NHttpConnection conn = (NHttpConnection) context.getAttribute(
+                    ExecutionContext.HTTP_CONNECTION);
+            boolean keepAlive = super.keepAlive(response, context);
+            if (keepAlive) {
+                System.out.println("[proxy->origin] connection kept alive " + conn);
+            }
+            return keepAlive;
         }
 
-        public ByteBuffer getOutBuffer() {
-            return this.outBuffer;
-        }
+    };
 
-        public HttpHost getTarget() {
-            return this.target;
-        }
+    static class ProxyServiceHandler extends HttpAsyncServiceHandler {
 
-        public void setTarget(final HttpHost target) {
-            this.target = target;
+        public ProxyServiceHandler(
+                final HttpAsyncRequestHandlerResolver handlerResolver,
+                final HttpProcessor httpProcessor,
+                final ConnectionReuseStrategy reuseStrategy,
+                final HttpParams params) {
+            super(handlerResolver, httpProcessor, reuseStrategy, params);
         }
 
-        public HttpRequest getRequest() {
-            return this.request;
+        @Override
+        protected void onException(final Exception ex) {
+            ex.printStackTrace();
         }
 
-        public void setRequest(final HttpRequest request) {
-            this.request = request;
+        @Override
+        public void connected(final NHttpServerConnection conn) {
+            System.out.println("[client->proxy] connection open " + conn);
+            super.connected(conn);
         }
 
-        public HttpResponse getResponse() {
-            return this.response;
+        @Override
+        public void closed(final NHttpServerConnection conn) {
+            System.out.println("[client->proxy] connection closed " + conn);
+            super.closed(conn);
         }
 
-        public void setResponse(final HttpResponse response) {
-            this.response = response;
-        }
+    }
 
-        public IOControl getClientIOControl() {
-            return this.clientIOControl;
-        }
+    static class ProxyClientProtocolHandler extends HttpAsyncClientProtocolHandler {
 
-        public void setClientIOControl(final IOControl clientIOControl) {
-            this.clientIOControl = clientIOControl;
+        public ProxyClientProtocolHandler() {
+            super();
         }
 
-        public IOControl getOriginIOControl() {
-            return this.originIOControl;
+        @Override
+        protected void onException(final Exception ex) {
+            ex.printStackTrace();
         }
 
-        public void setOriginIOControl(final IOControl originIOControl) {
-            this.originIOControl = originIOControl;
+        @Override
+        public void connected(final NHttpClientConnection conn, final Object attachment) {
+            System.out.println("[proxy->origin] connection open " + conn);
+            super.connected(conn, attachment);
         }
 
-        public ConnState getOriginState() {
-            return this.originState;
+        @Override
+        public void closed(final NHttpClientConnection conn) {
+            System.out.println("[proxy->origin] connection closed " + conn);
+            super.closed(conn);
         }
 
-        public void setOriginState(final ConnState state) {
-            this.originState = state;
-        }
+    }
 
-        public ConnState getClientState() {
-            return this.clientState;
-        }
+    static class ProxyConnPool extends BasicNIOConnPool {
 
-        public void setClientState(final ConnState state) {
-            this.clientState = state;
+        public ProxyConnPool(final ConnectingIOReactor ioreactor, final HttpParams params) {
+            super(ioreactor, params);
         }
 
-        public void reset() {
-            this.inBuffer.clear();
-            this.outBuffer.clear();
-            this.originState = ConnState.IDLE;
-            this.clientState = ConnState.IDLE;
-            this.request = null;
-            this.response = null;
+        public ProxyConnPool(
+                final ConnectingIOReactor ioreactor,
+                final NIOConnFactory<HttpHost, NHttpClientConnection> connFactory,
+                final HttpParams params) {
+            super(ioreactor, connFactory, params);
         }
 
-        public void shutdown() {
-            if (this.clientIOControl != null) {
-                try {
-                    this.clientIOControl.shutdown();
-                } catch (IOException ignore) {
-                }
-            }
-            if (this.originIOControl != null) {
-                try {
-                    this.originIOControl.shutdown();
-                } catch (IOException ignore) {
-                }
-            }
+        @Override
+        public void release(final BasicNIOPoolEntry entry, boolean reusable) {
+            System.out.println("[proxy->origin] connection released " + entry.getConnection());
+            super.release(entry, reusable);
+            StringBuilder buf = new StringBuilder();
+            PoolStats totals = getTotalStats();
+            buf.append("[total kept alive: ").append(totals.getAvailable()).append("; ");
+            buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
+            buf.append(" of ").append(totals.getMax()).append("]");
+            System.out.println("[proxy->origin] " + buf.toString());
         }
 
     }

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseTrigger.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseTrigger.java?rev=1166291&r1=1166290&r2=1166291&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseTrigger.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseTrigger.java Wed Sep  7 18:12:13 2011
@@ -34,4 +34,6 @@ public interface HttpAsyncResponseTrigge
 
     void submitResponse(HttpAsyncResponseProducer responseProducer);
 
+    boolean isTriggered();
+
 }

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java?rev=1166291&r1=1166290&r2=1166291&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java Wed Sep  7 18:12:13 2011
@@ -241,12 +241,12 @@ public class HttpAsyncServiceHandler imp
 
             producer.produceContent(encoder, conn);
             if (encoder.isCompleted()) {
-                httpExchange.reset();
                 if (!this.connStrategy.keepAlive(response, context)) {
                     conn.close();
                 } else {
                     conn.requestInput();
                 }
+                httpExchange.reset();
             }
         } catch (RuntimeException ex) {
             shutdownConnection(conn);
@@ -280,7 +280,7 @@ public class HttpAsyncServiceHandler imp
         }
     }
 
-    private ErrorResponseProducer handleException(final Exception ex) {
+    protected HttpAsyncResponseProducer handleException(final Exception ex) {
         int code = HttpStatus.SC_INTERNAL_SERVER_ERROR;
         if (ex instanceof MethodNotSupportedException) {
             code = HttpStatus.SC_NOT_IMPLEMENTED;
@@ -317,7 +317,7 @@ public class HttpAsyncServiceHandler imp
         consumer.requestCompleted(context);
         Exception ex = consumer.getException();
         if (ex != null) {
-            ErrorResponseProducer responseProducer = handleException(ex);
+            HttpAsyncResponseProducer responseProducer = handleException(ex);
             httpExchange.setResponseProducer(responseProducer);
             conn.requestOutput();
         } else {
@@ -352,13 +352,13 @@ public class HttpAsyncServiceHandler imp
         conn.submitResponse(response);
 
         if (entity == null) {
-            httpExchange.reset();
             if (!this.connStrategy.keepAlive(response, context)) {
                 conn.close();
             } else {
                 // Ready to process new request
                 conn.requestInput();
             }
+            httpExchange.reset();
         }
     }
 
@@ -524,6 +524,10 @@ public class HttpAsyncServiceHandler imp
             this.iocontrol.requestOutput();
         }
 
+        public boolean isTriggered() {
+            return this.triggered;
+        }
+
     }
 
 }



Mime
View raw message