cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r1369610 - in /cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient: AsyncHTTPConduit.java AsyncHTTPTransportFactory.java CXFAsyncRequester.java CXFConnectionManager.java
Date Sun, 05 Aug 2012 15:08:53 GMT
Author: olegk
Date: Sun Aug  5 15:08:53 2012
New Revision: 1369610

URL: http://svn.apache.org/viewvc?rev=1369610&view=rev
Log:
Custom async requester

Added:
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
  (with props)
Modified:
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1369610&r1=1369609&r2=1369610&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
(original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
Sun Aug  5 15:08:53 2012
@@ -208,7 +208,6 @@ public class AsyncHTTPConduit extends HT
             
             factory.getRequester().execute(new CXFHttpAsyncRequestProducer(entity, outbuf),
                          consumer,
-                         factory.getPool(),
                          new BasicHttpContext(),
                          callback);
             

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java?rev=1369610&r1=1369609&r2=1369610&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
(original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
Sun Aug  5 15:08:53 2012
@@ -38,11 +38,9 @@ import org.apache.cxf.ws.addressing.Endp
 import org.apache.http.impl.DefaultConnectionReuseStrategy;
 import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
 import org.apache.http.impl.nio.pool.BasicNIOConnFactory;
-import org.apache.http.impl.nio.pool.BasicNIOConnPool;
 import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
 import org.apache.http.impl.nio.reactor.IOReactorConfig;
 import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
-import org.apache.http.nio.protocol.HttpAsyncRequester;
 import org.apache.http.nio.reactor.ConnectingIOReactor;
 import org.apache.http.nio.reactor.IOEventDispatch;
 import org.apache.http.nio.reactor.IOReactorException;
@@ -60,8 +58,8 @@ import org.apache.http.protocol.RequestT
  */
 @NoJSR250Annotations(unlessNull = "bus")
 public class AsyncHTTPTransportFactory extends HTTPTransportFactory implements BusLifeCycleListener
{
-    HttpAsyncRequester requester;
-    BasicNIOConnPool pool;
+    CXFAsyncRequester requester;
+    CXFConnectionManager connManager;
     
     public AsyncHTTPTransportFactory() {
         super();
@@ -88,7 +86,7 @@ public class AsyncHTTPTransportFactory e
     }
     public synchronized void preShutdown() {
         try {
-            pool.shutdown(1000);
+            connManager.shutdown(1000);
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -130,9 +128,9 @@ public class AsyncHTTPTransportFactory e
         // Create HTTP connection pool
         BasicNIOConnFactory poolConnFactory = new BasicNIOConnFactory(
                 plainConnFactory, sslConnFactory);
-        pool = new BasicNIOConnPool(ioReactor, poolConnFactory, params);
-        pool.setDefaultMaxPerRoute(1000);
-        pool.setMaxTotal(5000);
+        connManager = new CXFConnectionManager(ioReactor, poolConnFactory, params);
+        connManager.setDefaultMaxPerRoute(1000);
+        connManager.setMaxTotal(5000);
 
         // Run the I/O reactor in a separate thread
         Thread t = new Thread(new Runnable() {
@@ -152,15 +150,13 @@ public class AsyncHTTPTransportFactory e
         // Start the client thread
         t.start();
         
-        requester = new HttpAsyncRequester(httpproc, new DefaultConnectionReuseStrategy(),
params);
+        requester = new CXFAsyncRequester(connManager, httpproc, 
+                new DefaultConnectionReuseStrategy(), params);
     }
     
-    public HttpAsyncRequester getRequester() {
+    public CXFAsyncRequester getRequester() {
         return requester;
     }
-    public BasicNIOConnPool getPool() {
-        return pool;
-    }
 
     /**
      * This call creates a new HTTP Conduit based on the EndpointInfo and

Added: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java?rev=1369610&view=auto
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
(added)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
Sun Aug  5 15:08:53 2012
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import java.io.IOException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.ConnectionClosedException;
+import org.apache.http.ConnectionReuseStrategy;
+import org.apache.http.HttpHost;
+import org.apache.http.concurrent.BasicFuture;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.pool.BasicNIOPoolEntry;
+import org.apache.http.nio.NHttpClientConnection;
+import org.apache.http.nio.protocol.BasicAsyncRequestExecutionHandler;
+import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
+import org.apache.http.params.HttpParams;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.protocol.HttpProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CXFAsyncRequester {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CXFAsyncRequester.class);
+    
+    private final CXFConnectionManager connManager;
+    private final HttpProcessor httppocessor;
+    private final ConnectionReuseStrategy reuseStrategy;
+    private final HttpParams params;
+
+    public CXFAsyncRequester(
+            final CXFConnectionManager connManager,
+            final HttpProcessor httppocessor,
+            final ConnectionReuseStrategy reuseStrategy,
+            final HttpParams params) {
+        super();
+        this.connManager = connManager;
+        this.httppocessor = httppocessor;
+        this.reuseStrategy = reuseStrategy;
+        this.params = params;
+    }
+
+    public <T> Future<T> execute(
+            final HttpAsyncRequestProducer requestProducer,
+            final HttpAsyncResponseConsumer<T> responseConsumer,
+            final HttpContext context,
+            final FutureCallback<T> callback) {
+        if (requestProducer == null) {
+            throw new IllegalArgumentException("HTTP request producer may not be null");
+        }
+        if (responseConsumer == null) {
+            throw new IllegalArgumentException("HTTP response consumer may not be null");
+        }
+        if (context == null) {
+            throw new IllegalArgumentException("HTTP context may not be null");
+        }
+        BasicFuture<T> future = new BasicFuture<T>(callback);
+        HttpHost target = requestProducer.getTarget();
+        this.connManager.leaseConnection(
+                target, null, 
+                -1, TimeUnit.MILLISECONDS,
+                new ConnRequestCallback<T>(
+                future, requestProducer, responseConsumer, context));
+        return future;
+    }
+
+    class ConnRequestCallback<T> implements FutureCallback<BasicNIOPoolEntry>
{
+
+        private final BasicFuture<T> requestFuture;
+        private final HttpAsyncRequestProducer requestProducer;
+        private final HttpAsyncResponseConsumer<T> responseConsumer;
+        private final HttpContext context;
+
+        ConnRequestCallback(
+                final BasicFuture<T> requestFuture,
+                final HttpAsyncRequestProducer requestProducer,
+                final HttpAsyncResponseConsumer<T> responseConsumer,
+                final HttpContext context) {
+            super();
+            this.requestFuture = requestFuture;
+            this.requestProducer = requestProducer;
+            this.responseConsumer = responseConsumer;
+            this.context = context;
+        }
+
+        public void completed(final BasicNIOPoolEntry result) {
+            if (this.requestFuture.isDone()) {
+                connManager.releaseConnection(result, 0, null);
+                return;
+            }
+            NHttpClientConnection conn = result.getConnection();
+            BasicAsyncRequestExecutionHandler<T> handler = new BasicAsyncRequestExecutionHandler<T>(
+                    this.requestProducer, this.responseConsumer,
+                    new RequestExecutionCallback<T>(this.requestFuture, result),
+                    this.context, httppocessor, reuseStrategy, params);
+            conn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, handler);
+            conn.requestOutput();
+            if (!conn.isOpen()) {
+                handler.failed(new ConnectionClosedException("Connection closed"));
+                try {
+                    handler.close();
+                } catch (IOException ex) {
+                    LOG.error(ex.getMessage(), ex);
+                }
+            }
+        }
+
+        public void failed(final Exception ex) {
+            try {
+                try {
+                    this.responseConsumer.failed(ex);
+                } finally {
+                    releaseResources();
+                }
+            } finally {
+                this.requestFuture.failed(ex);
+            }
+        }
+
+        public void cancelled() {
+            try {
+                try {
+                    this.responseConsumer.cancel();
+                } finally {
+                    releaseResources();
+                }
+            } finally {
+                this.requestFuture.cancel(true);
+            }
+        }
+
+        public void releaseResources() {
+            try {
+                this.requestProducer.close();
+            } catch (IOException ioex) {
+                LOG.error(ioex.getMessage(), ioex);
+            }
+            try {
+                this.responseConsumer.close();
+            } catch (IOException ioex) {
+                LOG.error(ioex.getMessage(), ioex);
+            }
+        }
+
+    }
+
+    class RequestExecutionCallback<T> implements FutureCallback<T> {
+
+        private final BasicFuture<T> future;
+        private final BasicNIOPoolEntry poolEntry;
+
+        RequestExecutionCallback(
+                final BasicFuture<T> future,
+                final BasicNIOPoolEntry poolEntry) {
+            super();
+            this.future = future;
+            this.poolEntry = poolEntry;
+        }
+
+        public void completed(final T result) {
+            try {
+                // Keep alive indefinitely
+                connManager.releaseConnection(this.poolEntry, 0, TimeUnit.MILLISECONDS);
+            } finally {
+                this.future.completed(result);
+            }
+        }
+
+        public void failed(final Exception ex) {
+            try {
+                this.poolEntry.close();
+                connManager.releaseConnection(this.poolEntry, 0, TimeUnit.MILLISECONDS);
+            } finally {
+                this.future.failed(ex);
+            }
+        }
+
+        public void cancelled() {
+            try {
+                this.poolEntry.close();
+                connManager.releaseConnection(this.poolEntry, 0, TimeUnit.MILLISECONDS);
+            } finally {
+                this.future.cancel(true);
+            }
+        }
+
+    }
+
+}

Propchange: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java?rev=1369610&r1=1369609&r2=1369610&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java
(original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java
Sun Aug  5 15:08:53 2012
@@ -31,14 +31,13 @@ import org.apache.http.impl.nio.pool.Bas
 import org.apache.http.nio.NHttpClientConnection;
 import org.apache.http.nio.pool.NIOConnFactory;
 import org.apache.http.nio.reactor.ConnectingIOReactor;
-import org.apache.http.nio.reactor.IOEventDispatch;
 import org.apache.http.nio.reactor.IOReactorStatus;
 import org.apache.http.params.HttpParams;
 import org.apache.http.pool.PoolStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class CXFConnectionManager {
+class CXFConnectionManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(BasicNIOConnPool.class);
     
@@ -48,8 +47,6 @@ public class CXFConnectionManager {
     public CXFConnectionManager(
             final ConnectingIOReactor ioreactor,
             final NIOConnFactory<HttpHost, NHttpClientConnection> connFactory,
-            final long timeToLive, 
-            final TimeUnit tunit,
             final HttpParams params) {
         super();
         this.ioreactor = ioreactor;
@@ -65,10 +62,6 @@ public class CXFConnectionManager {
         }
     }
 
-    public void execute(final IOEventDispatch eventDispatch) throws IOException {
-        this.ioreactor.execute(eventDispatch);
-    }
-
     public IOReactorStatus getStatus() {
         return this.ioreactor.getStatus();
     }
@@ -150,6 +143,20 @@ public class CXFConnectionManager {
         if (this.pool.isShutdown()) {
             return;
         }
+        if (!entry.isClosed()) {
+            entry.updateExpiry(keepalive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
+            if (LOG.isDebugEnabled()) {
+                String s;
+                if (keepalive > 0) {
+                    s = "for " + keepalive + " " + tunit;
+                } else {
+                    s = "indefinitely";
+                }
+                LOG.debug("Connection " + format(entry) + " can be kept alive " + s);
+            }
+            // Do not time out pooled connection
+            entry.getConnection().setSocketTimeout(0);
+        }
         this.pool.release(entry, !entry.isClosed());
         if (LOG.isDebugEnabled()) {
             LOG.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));



Mime
View raw message