Return-Path: X-Original-To: apmail-hc-commits-archive@www.apache.org Delivered-To: apmail-hc-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6636A7FB7 for ; Tue, 29 Nov 2011 14:12:16 +0000 (UTC) Received: (qmail 97577 invoked by uid 500); 29 Nov 2011 14:12:16 -0000 Delivered-To: apmail-hc-commits-archive@hc.apache.org Received: (qmail 97516 invoked by uid 500); 29 Nov 2011 14:12:15 -0000 Mailing-List: contact commits-help@hc.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: "HttpComponents Project" Delivered-To: mailing list commits@hc.apache.org Received: (qmail 97502 invoked by uid 99); 29 Nov 2011 14:12:15 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Nov 2011 14:12:15 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT,T_FRT_STOCK2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Nov 2011 14:12:13 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id EB4F723889DA for ; Tue, 29 Nov 2011 14:11:52 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1207880 [1/3] - in /httpcomponents/httpcore/trunk/httpcore-nio/src: main/java/org/apache/http/nio/protocol/ test/java/org/apache/http/nio/protocol/ Date: Tue, 29 Nov 2011 14:11:51 -0000 To: commits@hc.apache.org From: olegk@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111129141152.EB4F723889DA@eris.apache.org> Author: olegk Date: Tue Nov 29 14:11:49 2011 New Revision: 1207880 URL: http://svn.apache.org/viewvc?rev=1207880&view=rev Log: Refactored expectation handling code in HttpAsyncServiceHandler; refactored exception handling code in HttpAsyncServiceHandler and HttpAsyncClientProtocolHandler Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/AbstractAsyncRequestConsumer.java httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncRequestProducer.java httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncResponseProducer.java httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/ErrorResponseProducer.java httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestConsumer.java httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestProducer.java httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseProducer.java httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncClientProtocolHandler.java httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncServiceHandler.java Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/AbstractAsyncRequestConsumer.java URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/AbstractAsyncRequestConsumer.java?rev=1207880&r1=1207879&r2=1207880&view=diff ============================================================================== --- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/AbstractAsyncRequestConsumer.java (original) +++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/AbstractAsyncRequestConsumer.java Tue Nov 29 14:11:49 2011 @@ -130,6 +130,15 @@ public abstract class AbstractAsyncReque } } + public final synchronized void failed(final Exception ex) { + if (this.completed) { + return; + } + this.completed = true; + this.ex = ex; + releaseResources(); + } + public final synchronized void close() throws IOException { if (this.completed) { return; Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncRequestProducer.java URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncRequestProducer.java?rev=1207880&r1=1207879&r2=1207880&view=diff ============================================================================== --- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncRequestProducer.java (original) +++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncRequestProducer.java Tue Nov 29 14:11:49 2011 @@ -114,12 +114,17 @@ public class BasicAsyncRequestProducer i public void requestCompleted(final HttpContext context) { } + public void failed(final Exception ex) { + } + public synchronized boolean isRepeatable() { return this.producer == null || this.producer.isRepeatable(); } public synchronized void resetRequest() throws IOException { - this.producer.close(); + if (this.producer != null) { + this.producer.close(); + } } public synchronized void close() throws IOException { Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncResponseProducer.java URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncResponseProducer.java?rev=1207880&r1=1207879&r2=1207880&view=diff ============================================================================== --- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncResponseProducer.java (original) +++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncResponseProducer.java Tue Nov 29 14:11:49 2011 @@ -96,6 +96,9 @@ public class BasicAsyncResponseProducer public void responseCompleted(final HttpContext context) { } + public void failed(final Exception ex) { + } + public synchronized void close() throws IOException { if (this.producer != null) { this.producer.close(); Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/ErrorResponseProducer.java URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/ErrorResponseProducer.java?rev=1207880&r1=1207879&r2=1207880&view=diff ============================================================================== --- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/ErrorResponseProducer.java (original) +++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/ErrorResponseProducer.java Tue Nov 29 14:11:49 2011 @@ -87,6 +87,9 @@ class ErrorResponseProducer implements H public void responseCompleted(final HttpContext context) { } + public void failed(final Exception ex) { + } + public void close() throws IOException { this.contentProducer.close(); } Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java?rev=1207880&r1=1207879&r2=1207880&view=diff ============================================================================== --- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java (original) +++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java Tue Nov 29 14:11:49 2011 @@ -59,51 +59,53 @@ public class HttpAsyncClientProtocolHand } public void connected(final NHttpClientConnection conn, final Object attachment) { - HttpExchange httpexchange = new HttpExchange(); + HttpExchangeState state = new HttpExchangeState(); HttpContext context = conn.getContext(); - context.setAttribute(HTTP_EXCHANGE, httpexchange); + context.setAttribute(HTTP_EXCHANGE, state); requestReady(conn); } public void closed(final NHttpClientConnection conn) { - HttpExchange httpexchange = getHttpExchange(conn); - if (httpexchange != null) { - httpexchange.clear(); + HttpExchangeState state = getHttpExchange(conn); + if (state != null) { + state.clear(); } } public void exception(final NHttpClientConnection conn, final HttpException ex) { - HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn)); - HttpAsyncClientExchangeHandler handler = httpexchange.getHandler(); - if (handler != null) { - handler.failed(ex); + HttpExchangeState state = getHttpExchange(conn); + if (state != null) { + handleProtocolFailure(conn, state, ex); + } else { + shutdownConnection(conn); + onException(ex); } - closeConnection(conn); } public void exception(final NHttpClientConnection conn, final IOException ex) { - HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn)); - HttpAsyncClientExchangeHandler handler = httpexchange.getHandler(); - if (handler != null) { - handler.failed(ex); + HttpExchangeState state = getHttpExchange(conn); + if (state != null) { + handleFailure(conn, state, ex); + } else { + shutdownConnection(conn); + onException(ex); } - shutdownConnection(conn); } public void requestReady(final NHttpClientConnection conn) { - HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn)); - if (httpexchange.getRequestState() != MessageState.READY) { + HttpExchangeState state = ensureNotNull(getHttpExchange(conn)); + if (state.getRequestState() != MessageState.READY) { return; } - HttpAsyncClientExchangeHandler handler = httpexchange.getHandler(); + HttpAsyncClientExchangeHandler handler = state.getHandler(); if (handler != null && handler.isDone()) { - httpexchange.clear(); + state.clear(); handler = null; } if (handler == null) { handler = (HttpAsyncClientExchangeHandler) conn.getContext().removeAttribute( HTTP_HANDLER); - httpexchange.setHandler(handler); + state.setHandler(handler); } if (handler == null) { return; @@ -111,68 +113,64 @@ public class HttpAsyncClientProtocolHand try { HttpContext context = handler.getContext(); HttpRequest request = handler.generateRequest(); - httpexchange.setRequest(request); + state.setRequest(request); conn.submitRequest(request); if (request instanceof HttpEntityEnclosingRequest) { if (((HttpEntityEnclosingRequest) request).expectContinue()) { int timeout = conn.getSocketTimeout(); - httpexchange.setTimeout(timeout); + state.setTimeout(timeout); timeout = request.getParams().getIntParameter( CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000); conn.setSocketTimeout(timeout); - httpexchange.setRequestState(MessageState.ACK_EXPECTED); + state.setRequestState(MessageState.ACK_EXPECTED); } else { - httpexchange.setRequestState(MessageState.BODY_STREAM); + state.setRequestState(MessageState.BODY_STREAM); } } else { handler.requestCompleted(context); - httpexchange.setRequestState(MessageState.COMPLETED); + state.setRequestState(MessageState.COMPLETED); } + } catch (HttpException ex) { + handleProtocolFailure(conn, state, ex); + } catch (IOException ex) { + handleFailure(conn, state, ex); } catch (RuntimeException ex) { - shutdownConnection(conn); - handler.failed(ex); + handleFailure(conn, state, ex); throw ex; - } catch (Exception ex) { - shutdownConnection(conn); - handler.failed(ex); - onException(ex); } } public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) { - HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn)); - HttpAsyncClientExchangeHandler handler = ensureNotNull(httpexchange.getHandler()); + HttpExchangeState state = ensureNotNull(getHttpExchange(conn)); + HttpAsyncClientExchangeHandler handler = ensureNotNull(state.getHandler()); try { - if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) { + if (state.getRequestState() == MessageState.ACK_EXPECTED) { conn.suspendOutput(); return; } HttpContext context = handler.getContext(); handler.produceContent(encoder, conn); - httpexchange.setRequestState(MessageState.BODY_STREAM); + state.setRequestState(MessageState.BODY_STREAM); if (encoder.isCompleted()) { handler.requestCompleted(context); - httpexchange.setRequestState(MessageState.COMPLETED); + state.setRequestState(MessageState.COMPLETED); } + } catch (IOException ex) { + handleFailure(conn, state, ex); } catch (RuntimeException ex) { - shutdownConnection(conn); - handler.failed(ex); + handleFailure(conn, state, ex); throw ex; - } catch (Exception ex) { - shutdownConnection(conn); - handler.failed(ex); - onException(ex); } } public void responseReceived(final NHttpClientConnection conn) { - HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn)); - HttpAsyncClientExchangeHandler handler = ensureNotNull(httpexchange.getHandler()); + HttpExchangeState state = ensureNotNull(getHttpExchange(conn)); + HttpAsyncClientExchangeHandler handler = ensureNotNull(state.getHandler()); try { HttpResponse response = conn.getHttpResponse(); - HttpRequest request = httpexchange.getRequest(); + HttpRequest request = state.getRequest(); int statusCode = response.getStatusLine().getStatusCode(); if (statusCode < HttpStatus.SC_OK) { @@ -181,94 +179,86 @@ public class HttpAsyncClientProtocolHand throw new ProtocolException( "Unexpected response: " + response.getStatusLine()); } - if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) { - int timeout = httpexchange.getTimeout(); + if (state.getRequestState() == MessageState.ACK_EXPECTED) { + int timeout = state.getTimeout(); conn.setSocketTimeout(timeout); conn.requestOutput(); - httpexchange.setRequestState(MessageState.ACK); + state.setRequestState(MessageState.ACK); } return; } - httpexchange.setResponse(response); - if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) { - int timeout = httpexchange.getTimeout(); + state.setResponse(response); + if (state.getRequestState() == MessageState.ACK_EXPECTED) { + int timeout = state.getTimeout(); conn.setSocketTimeout(timeout); conn.resetOutput(); - httpexchange.setRequestState(MessageState.COMPLETED); - } else if (httpexchange.getRequestState() == MessageState.BODY_STREAM) { + state.setRequestState(MessageState.COMPLETED); + } else if (state.getRequestState() == MessageState.BODY_STREAM) { // Early response conn.resetOutput(); conn.suspendOutput(); - httpexchange.setRequestState(MessageState.COMPLETED); - httpexchange.invalidate(); + state.setRequestState(MessageState.COMPLETED); + state.invalidate(); } handler.responseReceived(response); - httpexchange.setResponseState(MessageState.BODY_STREAM); + state.setResponseState(MessageState.BODY_STREAM); if (!canResponseHaveBody(request, response)) { conn.resetInput(); - processResponse(conn, httpexchange, handler); + processResponse(conn, state, handler); } + } catch (HttpException ex) { + handleProtocolFailure(conn, state, ex); + } catch (IOException ex) { + handleFailure(conn, state, ex); } catch (RuntimeException ex) { - shutdownConnection(conn); - handler.failed(ex); + handleFailure(conn, state, ex); throw ex; - } catch (Exception ex) { - shutdownConnection(conn); - handler.failed(ex); - onException(ex); } } public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) { - HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn)); - HttpAsyncClientExchangeHandler handler = ensureNotNull(httpexchange.getHandler()); + HttpExchangeState state = ensureNotNull(getHttpExchange(conn)); + HttpAsyncClientExchangeHandler handler = ensureNotNull(state.getHandler()); try { handler.consumeContent(decoder, conn); - httpexchange.setResponseState(MessageState.BODY_STREAM); + state.setResponseState(MessageState.BODY_STREAM); if (decoder.isCompleted()) { - processResponse(conn, httpexchange, handler); + processResponse(conn, state, handler); } + } catch (IOException ex) { + handleFailure(conn, state, ex); } catch (RuntimeException ex) { - shutdownConnection(conn); - handler.failed(ex); + handleFailure(conn, state, ex); throw ex; - } catch (Exception ex) { - shutdownConnection(conn); - handler.failed(ex); - onException(ex); } } public void timeout(final NHttpClientConnection conn) { - HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn)); - HttpAsyncClientExchangeHandler handler = httpexchange.getHandler(); - if (handler == null) { - shutdownConnection(conn); - return; - } - try { - if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) { - int timeout = httpexchange.getTimeout(); + HttpExchangeState state = getHttpExchange(conn); + if (state != null) { + if (state.getRequestState() == MessageState.ACK_EXPECTED) { + int timeout = state.getTimeout(); conn.setSocketTimeout(timeout); conn.requestOutput(); - httpexchange.setRequestState(MessageState.BODY_STREAM); + state.setRequestState(MessageState.BODY_STREAM); + return; } else { - handler.failed(new SocketTimeoutException()); - if (conn.getStatus() == NHttpConnection.ACTIVE) { - closeConnection(conn); - if (conn.getStatus() == NHttpConnection.CLOSING) { - // Give the connection some grace time to - // close itself nicely - conn.setSocketTimeout(250); - } - } else { - shutdownConnection(conn); + shutdownHttpExchange(state, new SocketTimeoutException()); + } + } + try { + if (conn.getStatus() == NHttpConnection.ACTIVE) { + conn.close(); + if (conn.getStatus() == NHttpConnection.CLOSING) { + // Give the connection some grace time to + // close itself nicely + conn.setSocketTimeout(250); } + } else { + conn.shutdown(); } - } catch (RuntimeException ex) { - shutdownConnection(conn); - handler.failed(ex); - throw ex; + } catch (IOException ex) { + onException(ex); } } @@ -291,11 +281,11 @@ public class HttpAsyncClientProtocolHand } } - private HttpExchange getHttpExchange(final NHttpConnection conn) { - return (HttpExchange) conn.getContext().getAttribute(HTTP_EXCHANGE); + private HttpExchangeState getHttpExchange(final NHttpConnection conn) { + return (HttpExchangeState) conn.getContext().getAttribute(HTTP_EXCHANGE); } - private HttpExchange ensureNotNull(final HttpExchange httpExchange) { + private HttpExchangeState ensureNotNull(final HttpExchangeState httpExchange) { if (httpExchange == null) { throw new IllegalStateException("HTTP exchange is null"); } @@ -309,14 +299,48 @@ public class HttpAsyncClientProtocolHand return handler; } + private void shutdownHttpExchange( + final HttpExchangeState state, + final Exception ex) { + HttpAsyncClientExchangeHandler handler = state.getHandler(); + if (handler != null) { + state.setHandler(null); + try { + handler.failed(ex); + } finally { + try { + handler.close(); + } catch (IOException ioex) { + onException(ioex); + } + } + } + } + + private void handleFailure( + final NHttpClientConnection conn, + final HttpExchangeState state, + final Exception ex) { + shutdownConnection(conn); + shutdownHttpExchange(state, ex); + } + + private void handleProtocolFailure( + final NHttpClientConnection conn, + final HttpExchangeState state, + final HttpException ex) { + closeConnection(conn); + shutdownHttpExchange(state, ex); + } + private void processResponse( final NHttpClientConnection conn, - final HttpExchange httpexchange, + final HttpExchangeState state, final HttpAsyncClientExchangeHandler handler) throws IOException { HttpContext context = handler.getContext(); - if (httpexchange.isValid()) { - HttpRequest request = httpexchange.getRequest(); - HttpResponse response = httpexchange.getResponse(); + if (state.isValid()) { + HttpRequest request = state.getRequest(); + HttpResponse response = state.getResponse(); String method = request.getRequestLine().getMethod(); int status = response.getStatusLine().getStatusCode(); if (!(method.equalsIgnoreCase("CONNECT") && status < 300)) { @@ -329,7 +353,7 @@ public class HttpAsyncClientProtocolHand conn.close(); } handler.responseCompleted(context); - httpexchange.reset(); + state.reset(); } private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) { @@ -351,7 +375,7 @@ public class HttpAsyncClientProtocolHand static final String HTTP_EXCHANGE = "http.nio.exchange"; - class HttpExchange { + class HttpExchangeState { private volatile HttpAsyncClientExchangeHandler handler; private volatile MessageState requestState; @@ -361,7 +385,7 @@ public class HttpAsyncClientProtocolHand private volatile boolean valid; private volatile int timeout; - HttpExchange() { + HttpExchangeState() { super(); this.valid = true; this.requestState = MessageState.READY; @@ -373,9 +397,6 @@ public class HttpAsyncClientProtocolHand } public void setHandler(final HttpAsyncClientExchangeHandler handler) { - if (this.handler != null) { - throw new IllegalStateException("Handler already set"); - } this.handler = handler; } @@ -400,9 +421,6 @@ public class HttpAsyncClientProtocolHand } public void setRequest(final HttpRequest request) { - if (this.request != null) { - throw new IllegalStateException("Request already set"); - } this.request = request; } @@ -411,9 +429,6 @@ public class HttpAsyncClientProtocolHand } public void setResponse(final HttpResponse response) { - if (this.response != null) { - throw new IllegalStateException("Response already set"); - } this.response = response; } Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestConsumer.java URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestConsumer.java?rev=1207880&r1=1207879&r2=1207880&view=diff ============================================================================== --- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestConsumer.java (original) +++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestConsumer.java Tue Nov 29 14:11:49 2011 @@ -80,6 +80,13 @@ public interface HttpAsyncRequestConsume void requestCompleted(HttpContext context); /** + * Invoked to signal that the response processing terminated abnormally. + * + * @param ex exception + */ + void failed(Exception ex); + + /** * Returns an exception in case of an abnormal termination. This method * returns null if the request execution is still ongoing * or if it completed successfully. Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestProducer.java URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestProducer.java?rev=1207880&r1=1207879&r2=1207880&view=diff ============================================================================== --- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestProducer.java (original) +++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestProducer.java Tue Nov 29 14:11:49 2011 @@ -92,6 +92,13 @@ public interface HttpAsyncRequestProduce void requestCompleted(HttpContext context); /** + * Invoked to signal that the response processing terminated abnormally. + * + * @param ex exception + */ + void failed(Exception ex); + + /** * Determines whether or not this producer is capable of producing * HTTP request messages more than once. */ Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseProducer.java URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseProducer.java?rev=1207880&r1=1207879&r2=1207880&view=diff ============================================================================== --- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseProducer.java (original) +++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseProducer.java Tue Nov 29 14:11:49 2011 @@ -75,4 +75,11 @@ public interface HttpAsyncResponseProduc */ void responseCompleted(HttpContext context); + /** + * Invoked to signal that the response processing terminated abnormally. + * + * @param ex exception + */ + void failed(Exception ex); + } Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java?rev=1207880&r1=1207879&r2=1207880&view=diff ============================================================================== --- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java (original) +++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java Tue Nov 29 14:11:49 2011 @@ -28,6 +28,7 @@ package org.apache.http.nio.protocol; import java.io.IOException; +import java.net.SocketTimeoutException; import org.apache.http.ConnectionReuseStrategy; import org.apache.http.HttpEntity; @@ -107,150 +108,171 @@ public class HttpAsyncServiceHandler imp } public void connected(final NHttpServerConnection conn) { - HttpExchange httpExchange = new HttpExchange(); - conn.getContext().setAttribute(HTTP_EXCHANGE, httpExchange); + HttpExchangeState state = new HttpExchangeState(); + conn.getContext().setAttribute(HTTP_EXCHANGE, state); } public void closed(final NHttpServerConnection conn) { - HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn)); - Cancellable asyncProcess = httpExchange.getAsyncProcess(); - httpExchange.clear(); + HttpExchangeState state = ensureNotNull(getHttpExchange(conn)); + Cancellable asyncProcess = state.getAsyncProcess(); + state.clear(); if (asyncProcess != null) { asyncProcess.cancel(); } } public void exception(final NHttpServerConnection conn, final HttpException httpex) { - if (conn.isResponseSubmitted()) { - // There is not much that we can do if a response head - // has already been submitted - closeConnection(conn); + HttpExchangeState state = ensureNotNull(getHttpExchange(conn)); + if (state != null) { + handleProtocolFailure(conn, state, httpex); + } else { + shutdownConnection(conn); onException(httpex); - return; } + } - HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn)); - try { - HttpAsyncResponseProducer responseProducer = handleException(httpex); - httpExchange.setResponseProducer(responseProducer); - commitResponse(conn, httpExchange); - } catch (RuntimeException ex) { - shutdownConnection(conn); - throw ex; - } catch (Exception ex) { + public void exception(final NHttpServerConnection conn, final IOException ex) { + HttpExchangeState state = getHttpExchange(conn); + if (state != null) { + handleFailure(conn, state, ex); + } else { shutdownConnection(conn); onException(ex); } } - public void exception(final NHttpServerConnection conn, final IOException ex) { - shutdownConnection(conn); - onException(ex); - } - public void requestReceived(final NHttpServerConnection conn) { - HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn)); + HttpExchangeState state = ensureNotNull(getHttpExchange(conn)); try { HttpRequest request = conn.getHttpRequest(); - HttpContext context = httpExchange.getContext(); + HttpContext context = state.getContext(); request.setParams(new DefaultedHttpParams(request.getParams(), this.params)); context.setAttribute(ExecutionContext.HTTP_REQUEST, request); context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn); this.httpProcessor.process(request, context); - httpExchange.setRequest(request); + state.setRequest(request); HttpAsyncRequestHandler requestHandler = getRequestHandler(request); - httpExchange.setRequestHandler(requestHandler); + state.setRequestHandler(requestHandler); HttpAsyncRequestConsumer consumer = requestHandler.processRequest(request, context); - httpExchange.setRequestConsumer(consumer); + state.setRequestConsumer(consumer); consumer.requestReceived(request); if (request instanceof HttpEntityEnclosingRequest) { if (((HttpEntityEnclosingRequest) request).expectContinue()) { - httpExchange.setRequestState(MessageState.ACK_EXPECTED); + state.setRequestState(MessageState.ACK_EXPECTED); if (this.expectationVerifier != null) { conn.suspendInput(); - HttpAsyncContinueTrigger trigger = new ContinueTriggerImpl(httpExchange, conn); + HttpAsyncContinueTrigger trigger = new ContinueTriggerImpl(state, conn); Cancellable asyncProcess = this.expectationVerifier.verify(request, trigger, context); - httpExchange.setAsyncProcess(asyncProcess); + state.setAsyncProcess(asyncProcess); } else { HttpResponse response = create100Continue(request); conn.submitResponse(response); - httpExchange.setRequestState(MessageState.BODY_STREAM); + state.setRequestState(MessageState.BODY_STREAM); } } else { - httpExchange.setRequestState(MessageState.BODY_STREAM); + state.setRequestState(MessageState.BODY_STREAM); } } else { // No request content is expected. // Process request right away conn.suspendInput(); - processRequest(conn, httpExchange); + processRequest(conn, state); } + } catch (HttpException ex) { + handleProtocolFailure(conn, state, ex); + } catch (IOException ex) { + handleFailure(conn, state, ex); } catch (RuntimeException ex) { - shutdownConnection(conn); + handleFailure(conn, state, ex); throw ex; - } catch (Exception ex) { - shutdownConnection(conn); - onException(ex); } } public void inputReady(final NHttpServerConnection conn, final ContentDecoder decoder) { - HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn)); + HttpExchangeState state = ensureNotNull(getHttpExchange(conn)); try { - HttpAsyncRequestConsumer consumer = ensureNotNull(httpExchange.getRequestConsumer()); + HttpAsyncRequestConsumer consumer = ensureNotNull(state.getRequestConsumer()); consumer.consumeContent(decoder, conn); - httpExchange.setRequestState(MessageState.BODY_STREAM); + state.setRequestState(MessageState.BODY_STREAM); if (decoder.isCompleted()) { conn.suspendInput(); - processRequest(conn, httpExchange); + processRequest(conn, state); } + } catch (HttpException ex) { + handleProtocolFailure(conn, state, ex); + } catch (IOException ex) { + handleFailure(conn, state, ex); } catch (RuntimeException ex) { - shutdownConnection(conn); + handleFailure(conn, state, ex); throw ex; - } catch (Exception ex) { - shutdownConnection(conn); - onException(ex); } } public void responseReady(final NHttpServerConnection conn) { - HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn)); + HttpExchangeState state = ensureNotNull(getHttpExchange(conn)); try { - if (httpExchange.getRequestState() == MessageState.ACK) { - conn.requestInput(); - httpExchange.setRequestState(MessageState.BODY_STREAM); - HttpRequest request = httpExchange.getRequest(); - HttpResponse response = create100Continue(request); - conn.submitResponse(response); - } else if (httpExchange.getResponse() == null && httpExchange.getResponseProducer() != null) { - if (httpExchange.getRequestState() == MessageState.ACK_EXPECTED) { + if (state.getResponse() != null) { + return; + } + HttpAsyncResponseProducer responseProducer = state.getResponseProducer(); + if (responseProducer == null) { + return; + } + HttpContext context = state.getContext(); + HttpResponse response = responseProducer.generateResponse(); + int status = response.getStatusLine().getStatusCode(); + if (state.getRequestState() == MessageState.ACK_EXPECTED) { + if (status == 100) { + state.setResponseProducer(null); + try { + // Make sure 100 response has no entity + response.setEntity(null); + conn.requestInput(); + state.setRequestState(MessageState.BODY_STREAM); + conn.submitResponse(response); + responseProducer.responseCompleted(context); + } finally { + responseProducer.close(); + } + } else if (status >= 400) { conn.resetInput(); - httpExchange.setRequestState(MessageState.COMPLETED); + state.setRequestState(MessageState.COMPLETED); + state.setResponse(response); + commitFinalResponse(conn, state); + } else { + throw new HttpException("Invalid response: " + response.getStatusLine()); + } + } else { + if (status >= 200) { + state.setResponse(response); + commitFinalResponse(conn, state); + } else { + throw new HttpException("Invalid response: " + response.getStatusLine()); } - commitResponse(conn, httpExchange); } + } catch (HttpException ex) { + handleProtocolFailure(conn, state, ex); + } catch (IOException ex) { + handleFailure(conn, state, ex); } catch (RuntimeException ex) { - shutdownConnection(conn); + handleFailure(conn, state, ex); throw ex; - } catch (Exception ex) { - shutdownConnection(conn); - onException(ex); } } public void outputReady(final NHttpServerConnection conn, final ContentEncoder encoder) { - HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn)); + HttpExchangeState state = ensureNotNull(getHttpExchange(conn)); try { - HttpAsyncResponseProducer responseProducer = httpExchange.getResponseProducer(); - HttpContext context = httpExchange.getContext(); - HttpResponse response = httpExchange.getResponse(); + HttpAsyncResponseProducer responseProducer = state.getResponseProducer(); + HttpContext context = state.getContext(); + HttpResponse response = state.getResponse(); responseProducer.produceContent(encoder, conn); - httpExchange.setResponseState(MessageState.BODY_STREAM); + state.setResponseState(MessageState.BODY_STREAM); if (encoder.isCompleted()) { responseProducer.responseCompleted(context); if (!this.connStrategy.keepAlive(response, context)) { @@ -258,18 +280,21 @@ public class HttpAsyncServiceHandler imp } else { conn.requestInput(); } - httpExchange.clear(); + state.clear(); } + } catch (IOException ex) { + handleFailure(conn, state, ex); } catch (RuntimeException ex) { - shutdownConnection(conn); + handleFailure(conn, state, ex); throw ex; - } catch (Exception ex) { - shutdownConnection(conn); - onException(ex); } } public void timeout(final NHttpServerConnection conn) { + HttpExchangeState state = getHttpExchange(conn); + if (state != null) { + shutdownHttpExchange(state, new SocketTimeoutException()); + } try { if (conn.getStatus() == NHttpConnection.ACTIVE) { conn.close(); @@ -286,15 +311,15 @@ public class HttpAsyncServiceHandler imp } } - private HttpExchange getHttpExchange(final NHttpConnection conn) { - return (HttpExchange) conn.getContext().getAttribute(HTTP_EXCHANGE); + private HttpExchangeState getHttpExchange(final NHttpConnection conn) { + return (HttpExchangeState) conn.getContext().getAttribute(HTTP_EXCHANGE); } - private HttpExchange ensureNotNull(final HttpExchange httpExchange) { - if (httpExchange == null) { + private HttpExchangeState ensureNotNull(final HttpExchangeState state) { + if (state == null) { throw new IllegalStateException("HTTP exchange is null"); } - return httpExchange; + return state; } private HttpAsyncRequestConsumer ensureNotNull(final HttpAsyncRequestConsumer requestConsumer) { @@ -323,6 +348,36 @@ public class HttpAsyncServiceHandler imp } } + private void shutdownHttpExchange(final HttpExchangeState state, final Exception ex) { + HttpAsyncRequestConsumer consumer = state.getRequestConsumer(); + if (consumer != null) { + state.setRequestConsumer(null); + try { + consumer.failed(ex); + } finally { + try { + consumer.close(); + } catch (IOException ioex) { + onException(ioex); + } + } + } + HttpAsyncResponseProducer producer = state.getResponseProducer(); + if (producer != null) { + state.setResponseProducer(null); + try { + producer.failed(ex); + } finally { + try { + producer.close(); + } catch (IOException ioex) { + onException(ioex); + } + } + } + state.setRequestHandler(null); + } + protected HttpAsyncResponseProducer handleException(final Exception ex) { int code = HttpStatus.SC_INTERNAL_SERVER_ERROR; if (ex instanceof MethodNotSupportedException) { @@ -360,44 +415,74 @@ public class HttpAsyncServiceHandler imp && status != HttpStatus.SC_RESET_CONTENT; } + private void handleFailure( + final NHttpServerConnection conn, + final HttpExchangeState state, + final Exception ex) { + shutdownConnection(conn); + shutdownHttpExchange(state, ex); + } + + private void handleProtocolFailure( + final NHttpServerConnection conn, + final HttpExchangeState state, + final HttpException httpex) { + shutdownHttpExchange(state, httpex); + if (conn.isResponseSubmitted() || state.getResponseState() != MessageState.READY) { + // There is not much that we can do if a response + // has already been submitted + closeConnection(conn); + } else { + HttpAsyncResponseProducer responseProducer = handleException(httpex); + state.setResponseProducer(responseProducer); + try { + HttpResponse response = responseProducer.generateResponse(); + state.setResponse(response); + commitFinalResponse(conn, state); + } catch (RuntimeException ex) { + handleFailure(conn, state, ex); + throw ex; + } catch (Exception ex) { + handleFailure(conn, state, ex); + } + } + } + private void processRequest( final NHttpServerConnection conn, - final HttpExchange httpExchange) throws HttpException, IOException { - HttpAsyncRequestHandler handler = httpExchange.getRequestHandler(); - HttpContext context = httpExchange.getContext(); - HttpAsyncRequestConsumer consumer = httpExchange.getRequestConsumer(); + final HttpExchangeState state) throws HttpException, IOException { + HttpAsyncRequestHandler handler = state.getRequestHandler(); + HttpContext context = state.getContext(); + HttpAsyncRequestConsumer consumer = state.getRequestConsumer(); consumer.requestCompleted(context); - httpExchange.setRequestState(MessageState.COMPLETED); + state.setRequestState(MessageState.COMPLETED); Exception exception = consumer.getException(); if (exception != null) { HttpAsyncResponseProducer responseProducer = handleException(exception); - httpExchange.setResponseProducer(responseProducer); + state.setResponseProducer(responseProducer); conn.requestOutput(); } else { Object result = consumer.getResult(); - HttpAsyncResponseTrigger trigger = new ResponseTriggerImpl(httpExchange, conn); + HttpAsyncResponseTrigger trigger = new ResponseTriggerImpl(state, conn); try { Cancellable asyncProcess = handler.handle(result, trigger, context); - httpExchange.setAsyncProcess(asyncProcess); + state.setAsyncProcess(asyncProcess); } catch (HttpException ex) { HttpAsyncResponseProducer responseProducer = handleException(ex); - httpExchange.setResponseProducer(responseProducer); + state.setResponseProducer(responseProducer); conn.requestOutput(); } } } - private void commitResponse( + private void commitFinalResponse( final NHttpServerConnection conn, - final HttpExchange httpExchange) throws IOException, HttpException { - HttpContext context = httpExchange.getContext(); - HttpRequest request = httpExchange.getRequest(); - HttpAsyncResponseProducer responseProducer = httpExchange.getResponseProducer(); - HttpResponse response = responseProducer.generateResponse(); - response.setParams(new DefaultedHttpParams(response.getParams(), this.params)); - - httpExchange.setResponse(response); + final HttpExchangeState state) throws IOException, HttpException { + HttpContext context = state.getContext(); + HttpRequest request = state.getRequest(); + HttpResponse response = state.getResponse(); + response.setParams(new DefaultedHttpParams(response.getParams(), this.params)); context.setAttribute(ExecutionContext.HTTP_RESPONSE, response); this.httpProcessor.process(response, context); @@ -410,6 +495,7 @@ public class HttpAsyncServiceHandler imp conn.submitResponse(response); if (entity == null) { + HttpAsyncResponseProducer responseProducer = state.getResponseProducer(); responseProducer.responseCompleted(context); if (!this.connStrategy.keepAlive(response, context)) { conn.close(); @@ -417,9 +503,9 @@ public class HttpAsyncServiceHandler imp // Ready to process new request conn.requestInput(); } - httpExchange.clear(); + state.clear(); } else { - httpExchange.setResponseState(MessageState.BODY_STREAM); + state.setResponseState(MessageState.BODY_STREAM); } } @@ -436,7 +522,7 @@ public class HttpAsyncServiceHandler imp return handler; } - class HttpExchange { + class HttpExchangeState { private final BasicHttpContext context; private volatile HttpAsyncRequestHandler requestHandler; @@ -448,7 +534,7 @@ public class HttpAsyncServiceHandler imp private volatile HttpResponse response; private volatile Cancellable asyncProcess; - HttpExchange() { + HttpExchangeState() { super(); this.context = new BasicHttpContext(); this.requestState = MessageState.READY; @@ -464,9 +550,6 @@ public class HttpAsyncServiceHandler imp } public void setRequestHandler(final HttpAsyncRequestHandler requestHandler) { - if (this.requestHandler != null) { - throw new IllegalStateException("Request handler already set"); - } this.requestHandler = requestHandler; } @@ -486,15 +569,11 @@ public class HttpAsyncServiceHandler imp this.responseState = state; } - public HttpAsyncRequestConsumer getRequestConsumer() { return this.requestConsumer; } public void setRequestConsumer(final HttpAsyncRequestConsumer requestConsumer) { - if (this.requestConsumer != null) { - throw new IllegalStateException("Request consumer already set"); - } this.requestConsumer = requestConsumer; } @@ -503,9 +582,6 @@ public class HttpAsyncServiceHandler imp } public void setResponseProducer(final HttpAsyncResponseProducer responseProducer) { - if (this.responseProducer != null) { - throw new IllegalStateException("Response producer already set"); - } this.responseProducer = responseProducer; } @@ -514,9 +590,6 @@ public class HttpAsyncServiceHandler imp } public void setRequest(final HttpRequest request) { - if (this.request != null) { - throw new IllegalStateException("Request already set"); - } this.request = request; } @@ -525,9 +598,6 @@ public class HttpAsyncServiceHandler imp } public void setResponse(final HttpResponse response) { - if (this.response != null) { - throw new IllegalStateException("Response already set"); - } this.response = response; } @@ -588,14 +658,14 @@ public class HttpAsyncServiceHandler imp class ResponseTriggerImpl implements HttpAsyncResponseTrigger { - private final HttpExchange httpExchange; + private final HttpExchangeState state; private final IOControl iocontrol; private volatile boolean triggered; - public ResponseTriggerImpl(final HttpExchange httpExchange, final IOControl iocontrol) { + public ResponseTriggerImpl(final HttpExchangeState state, final IOControl iocontrol) { super(); - this.httpExchange = httpExchange; + this.state = state; this.iocontrol = iocontrol; } @@ -607,7 +677,7 @@ public class HttpAsyncServiceHandler imp throw new IllegalStateException("Response already triggered"); } this.triggered = true; - this.httpExchange.setResponseProducer(responseProducer); + this.state.setResponseProducer(responseProducer); this.iocontrol.requestOutput(); } @@ -619,14 +689,14 @@ public class HttpAsyncServiceHandler imp class ContinueTriggerImpl implements HttpAsyncContinueTrigger { - private final HttpExchange httpExchange; + private final HttpExchangeState state; private final IOControl iocontrol; private volatile boolean triggered; - public ContinueTriggerImpl(final HttpExchange httpExchange, final IOControl iocontrol) { + public ContinueTriggerImpl(final HttpExchangeState state, final IOControl iocontrol) { super(); - this.httpExchange = httpExchange; + this.state = state; this.iocontrol = iocontrol; } @@ -635,7 +705,8 @@ public class HttpAsyncServiceHandler imp throw new IllegalStateException("Response already triggered"); } this.triggered = true; - this.httpExchange.setRequestState(MessageState.ACK); + HttpResponse ack = new BasicHttpResponse(HttpVersion.HTTP_1_1, HttpStatus.SC_CONTINUE, "Continue"); + this.state.setResponseProducer(new BasicAsyncResponseProducer(ack)); this.iocontrol.requestOutput(); } @@ -647,7 +718,7 @@ public class HttpAsyncServiceHandler imp throw new IllegalStateException("Response already triggered"); } this.triggered = true; - this.httpExchange.setResponseProducer(responseProducer); + this.state.setResponseProducer(responseProducer); this.iocontrol.requestOutput(); }