Return-Path: Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: (qmail 56491 invoked from network); 10 Sep 2010 18:22:40 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 10 Sep 2010 18:22:40 -0000 Received: (qmail 25131 invoked by uid 500); 10 Sep 2010 18:22:40 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 25066 invoked by uid 500); 10 Sep 2010 18:22:39 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 25059 invoked by uid 99); 10 Sep 2010 18:22:39 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Sep 2010 18:22:39 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED,T_FRT_STOCK2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Sep 2010 18:22:36 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 068152388A6B; Fri, 10 Sep 2010 18:22:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r995918 - in /cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async: AsyncHTTPConduit.java HttpClientController.java Date: Fri, 10 Sep 2010 18:22:14 -0000 To: commits@cxf.apache.org From: dkulp@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100910182215.068152388A6B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dkulp Date: Fri Sep 10 18:22:14 2010 New Revision: 995918 URL: http://svn.apache.org/viewvc?rev=995918&view=rev Log: Support keep-alives Modified: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java Modified: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java?rev=995918&r1=995917&r2=995918&view=diff ============================================================================== --- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java (original) +++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java Fri Sep 10 18:22:14 2010 @@ -89,7 +89,6 @@ public class AsyncHTTPConduit extends HT Map> headers = getSetProtocolHeaders(message); URL currentURL = setupURL(message); - MessageUtils.getContextualBoolean(message, "force.http.url.connection", false); if (MessageUtils.getContextualBoolean(message, "force.http.url.connection", false) || "https".equalsIgnoreCase(currentURL.getProtocol())) { //delegate to the parent for any https things for now @@ -836,6 +835,14 @@ public class AsyncHTTPConduit extends HT return i; } public void close() throws IOException { + if (!decoder.isCompleted()) { + ByteBuffer buf = ByteBuffer.allocate(4096); + int i = decoder.read(buf); + while (i != -1) { + buf.clear(); + i = decoder.read(buf); + } + } } }; this.handleResponseInternal(); Modified: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java?rev=995918&r1=995917&r2=995918&view=diff ============================================================================== --- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java (original) +++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java Fri Sep 10 18:22:14 2010 @@ -21,8 +21,12 @@ package org.apache.cxf.transport.http.as import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.net.URL; +import java.util.Map; +import java.util.Stack; +import java.util.concurrent.ConcurrentHashMap; import org.apache.cxf.Bus; import org.apache.cxf.BusFactory; @@ -64,9 +68,44 @@ public class HttpClientController implem NHttpRequestExecutionHandler { ConnectingIOReactor ioReactor; + Map> sessions + = new ConcurrentHashMap>(); + + HttpClientController() { } + static final class MessageHolder { + private Message msg; + private String key; + private SessionRequest session; + + public MessageHolder(Message m, String key) { + msg = m; + this.key = key; + } + public String getKey() { + return key; + } + public Message get() { + return msg; + } + public Message getAndRemove() { + Message m = msg; + msg = null; + return m; + } + public void set(Message message) { + msg = message; + } + public void setSession(SessionRequest r) { + session = r; + } + public SessionRequest getSession() { + return session; + } + } + public void setUp() { try { HttpParams params = new BasicHttpParams(); @@ -93,8 +132,9 @@ public class HttpClientController implem params) { protected void handleTimeout(final NHttpConnection conn) { super.handleTimeout(conn); - Message m = (Message)conn.getContext().getAttribute("MESSAGE"); - m.get(AsyncHTTPConduit.class).sendException(m, new SocketTimeoutException()); + MessageHolder m = (MessageHolder)conn.getContext().getAttribute("MESSAGE"); + m.get().get(AsyncHTTPConduit.class).sendException(m.get(), + new SocketTimeoutException()); } }; @@ -102,8 +142,8 @@ public class HttpClientController implem final IOEventDispatch ioEventDispatch = new DefaultClientIOEventDispatch(handler, params) { protected NHttpClientIOTarget createConnection(IOSession session) { - Message m = (Message)session.getAttribute(IOSession.ATTACHMENT_KEY); - HTTPClientPolicy client = (HTTPClientPolicy)m + MessageHolder m = (MessageHolder)session.getAttribute(IOSession.ATTACHMENT_KEY); + HTTPClientPolicy client = (HTTPClientPolicy)m.get() .get(HTTPClientPolicy.class.getName() + ".complete"); if (client != null) { session.setSocketTimeout((int)client.getReceiveTimeout()); @@ -128,19 +168,43 @@ public class HttpClientController implem ex.printStackTrace(); } } + public SessionRequest findSession(String key) { + Stack stack = sessions.get(key); + if (stack != null && !stack.isEmpty()) { + return stack.pop(); + } + return null; + } + public void execute(AsyncHTTPConduit conduit, final URL address, HttpUriRequest request, final Message message) throws IOException { + message.put(HttpUriRequest.class, request); int port = address.getPort(); if (port == -1) { port = 80; } - InetSocketAddress add = new InetSocketAddress(address.getHost(), port); - message.put(HttpUriRequest.class, request); + String key = address.getHost() + ":" + port; + SessionRequest srequest = findSession(key); + if (srequest != null && !srequest.getSession().isClosed()) { + message.put(SessionRequest.class, srequest); + Object o = srequest.getSession().getAttribute("http.connection"); + ((MessageHolder)srequest.getAttachment()).set(message); + NHttpConnection ioc = (NHttpConnection)o; + HTTPClientPolicy client = (HTTPClientPolicy)message + .get(HTTPClientPolicy.class.getName() + ".complete"); + if (client != null) { + ioc.setSocketTimeout((int)client.getReceiveTimeout()); + } + + ioc.requestOutput(); + return; + } + SocketAddress add = new InetSocketAddress(address.getHost(), port); synchronized (message) { SessionRequest req - = ioReactor.connect(add, null, message, + = ioReactor.connect(add, null, new MessageHolder(message, key), new SessionRequestCallback() { public void completed(SessionRequest request) { synchronized (message) { @@ -185,6 +249,7 @@ public class HttpClientController implem //ignore } message.put(SessionRequest.class, req); + srequest = req; } } @@ -222,26 +287,43 @@ public class HttpClientController implem } public HttpRequest submitRequest(HttpContext context) { - Message m = (Message)context.getAttribute("MESSAGE"); + Message m = ((MessageHolder)context.getAttribute("MESSAGE")).get(); if (m == null) { return null; } - return m.get(HttpUriRequest.class); + return (HttpRequest)m.remove(HttpUriRequest.class.getName()); } public ConsumingNHttpEntity responseEntity(HttpResponse response, HttpContext context) throws IOException { - Message m = (Message)context.getAttribute("MESSAGE"); + MessageHolder holder = (MessageHolder)context.getAttribute("MESSAGE"); + Message m = holder.get(); WrappedOutputStream out = m.get(WrappedOutputStream.class); out.setResponse(response); return out; } public void handleResponse(HttpResponse response, HttpContext context) throws IOException { + MessageHolder holder = (MessageHolder)context.getAttribute("MESSAGE"); + Message m = holder.getAndRemove(); + SessionRequest r = m.get(SessionRequest.class); + holder.setSession(r); + String key = holder.getKey(); + Stack srs = sessions.get(key); + if (srs == null) { + srs = new Stack(); + sessions.put(key, srs); + } + srs.push(r); } public void finalizeContext(HttpContext context) { - context.removeAttribute("MESSAGE"); + MessageHolder holder = (MessageHolder)context.getAttribute("MESSAGE"); + String key = holder.getKey(); + Stack srs = sessions.get(key); + if (srs != null) { + srs.removeElement(holder.getSession()); + } } } \ No newline at end of file