cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r1383023 - in /cxf/sandbox/dkulp_async_clients/http-hc: ./ src/main/java/org/apache/cxf/transport/http/asyncclient/ src/main/java/org/apache/cxf/transport/http/asyncclient/impl/
Date Mon, 10 Sep 2012 18:28:25 GMT
Author: dkulp
Date: Mon Sep 10 18:28:25 2012
New Revision: 1383023

URL: http://svn.apache.org/viewvc?rev=1383023&view=rev
Log:
Flip to using  AsynClient.   Allows removal of a lot of code

Removed:
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequestExecutor.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnection.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpHost.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/LoggingIOSession.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/Wire.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/impl/
Modified:
    cxf/sandbox/dkulp_async_clients/http-hc/pom.xml
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java

Modified: cxf/sandbox/dkulp_async_clients/http-hc/pom.xml
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/pom.xml?rev=1383023&r1=1383022&r2=1383023&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/pom.xml (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/pom.xml Mon Sep 10 18:28:25 2012
@@ -73,7 +73,7 @@
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
+            <artifactId>slf4j-jdk14</artifactId>
             <version>1.6.2</version>
             <scope>test</scope>
         </dependency>
@@ -82,6 +82,11 @@
             <artifactId>httpcore-nio</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpasyncclient</artifactId>
+            <version>4.0-beta3-SNAPSHOT</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-rt-transports-http-jetty</artifactId>
             <version>${project.version}</version>

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1383023&r1=1383022&r2=1383023&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
(original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
Mon Sep 10 18:28:25 2012
@@ -24,10 +24,13 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PushbackInputStream;
 import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
+import java.net.Proxy;
 import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.security.GeneralSecurityException;
 import java.security.Principal;
 import java.security.cert.Certificate;
 import java.util.ArrayList;
@@ -37,11 +40,16 @@ import java.util.concurrent.Future;
 
 import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLSession;
+import javax.net.ssl.X509KeyManager;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.common.util.StringUtils;
 import org.apache.cxf.common.util.SystemPropertyAction;
+import org.apache.cxf.configuration.jsse.SSLUtils;
 import org.apache.cxf.configuration.jsse.TLSClientParameters;
 import org.apache.cxf.helpers.HttpHeaderHelper;
 import org.apache.cxf.io.CacheAndWriteOutputStream;
@@ -50,14 +58,22 @@ import org.apache.cxf.message.MessageUti
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.http.Headers;
 import org.apache.cxf.transport.http.URLConnectionHTTPConduit;
+import org.apache.cxf.transport.https.AliasedX509ExtendedKeyManager;
 import org.apache.cxf.transport.https.CertificateHostnameVerifier;
 import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.version.Version;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.http.Header;
+import org.apache.http.HttpHost;
 import org.apache.http.HttpResponse;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.Credentials;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.protocol.ClientContext;
 import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.conn.params.ConnRouteParams;
 import org.apache.http.entity.BasicHttpEntity;
 import org.apache.http.nio.util.HeapByteBufferAllocator;
 import org.apache.http.params.CoreConnectionPNames;
@@ -80,7 +96,10 @@ public class AsyncHTTPConduit extends UR
     }
     
 
-    AsyncHTTPConduitFactory factory;
+    final AsyncHTTPConduitFactory factory;
+    volatile int lastTlsHash = -1;
+    volatile Object sslState; 
+    volatile SSLContext sslContext;
     
     public AsyncHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t,
                             AsyncHTTPConduitFactory factory) throws IOException {
@@ -143,6 +162,12 @@ public class AsyncHTTPConduit extends UR
         e.getParams().setParameter(CoreConnectionPNames.SO_TIMEOUT, 
                 Integer.valueOf((int) csPolicy.getReceiveTimeout()));
         
+        Proxy p = proxyFactory.createProxy(csPolicy , uri);
+        if (p != null) {
+            InetSocketAddress isa = (InetSocketAddress)p.address();
+            HttpHost proxy = new HttpHost(isa.getHostName(), isa.getPort());
+            ConnRouteParams.setDefaultProxy(e.getParams(), proxy);
+        }
         message.put(CXFHttpRequest.class, e);
     }
     
@@ -302,15 +327,32 @@ public class AsyncHTTPConduit extends UR
             if (url.getScheme().equals("https") && tlsClientParameters == null) {
                 tlsClientParameters = new TLSClientParameters();
             }
-            CXFHttpHost host = new CXFHttpHost(url.getHost(), url.getPort(), url.getScheme(),
-                                               tlsClientParameters, proxyFactory.createProxy(csPolicy
, url));
             
+            BasicHttpContext ctx = new BasicHttpContext();
+            if (AsyncHTTPConduit.this.proxyAuthorizationPolicy != null
+                && AsyncHTTPConduit.this.proxyAuthorizationPolicy.getUserName() !=
null) {
+                ctx.setAttribute(ClientContext.CREDS_PROVIDER, new CredentialsProvider()
{
+                    public void setCredentials(AuthScope authscope, Credentials credentials)
{
+                    }
+                    public Credentials getCredentials(AuthScope authscope) {
+                        return new UsernamePasswordCredentials(AsyncHTTPConduit.this
+                                                               .proxyAuthorizationPolicy.getUserName(),
+                                               AsyncHTTPConduit.this.proxyAuthorizationPolicy.getPassword());
+                    }
+                    public void clear() {
+                    }
+                });
+            }
+            if (tlsClientParameters != null && tlsClientParameters.hashCode() ==
lastTlsHash && sslState != null) {
+                ctx.setAttribute(ClientContext.USER_TOKEN , sslState);
+            }
             connectionFuture = factory.getRequester().execute(
-                        host,
+                        AsyncHTTPConduit.this,
+                        url,
                         csPolicy.getConnectionTimeout(),
                         new CXFHttpAsyncRequestProducer(entity, outbuf),
                         new CXFHttpAsyncResponseConsumer(inbuf, responseCallback),
-                        new BasicHttpContext(),
+                        ctx,
                         callback);
         }
 
@@ -351,8 +393,8 @@ public class AsyncHTTPConduit extends UR
                 if (httpResponse == null) {
                     outbuf.shutdown();
                     inbuf.shutdown();
-                    outbuf = null;
-                    inbuf = null;
+                    //outbuf = null;
+                    //inbuf = null;
                     
                     if (exception != null) {
                         if (exception instanceof IOException) {
@@ -562,11 +604,70 @@ public class AsyncHTTPConduit extends UR
         public void setSSLSession(SSLSession sslsession) {            
             session = sslsession;
             synchronized (sessionLock) {
+                sslState = sslsession.getLocalPrincipal();
                 sessionLock.notifyAll();
             }
         }
     }
 
 
+    public synchronized SSLContext getSSLContext() throws GeneralSecurityException {
+        TLSClientParameters tlsClientParameters = getTlsClientParameters();
+        if (tlsClientParameters == null) {
+            tlsClientParameters = new TLSClientParameters();
+        }
+        int hash = tlsClientParameters.hashCode();
+        if (hash == lastTlsHash) {
+            return sslContext;
+        }
+        
+        String provider = tlsClientParameters.getJsseProvider();
+
+        String protocol = tlsClientParameters.getSecureSocketProtocol() != null ? tlsClientParameters
+            .getSecureSocketProtocol() : "TLS";
+
+        SSLContext ctx = provider == null ? SSLContext.getInstance(protocol) : SSLContext
+            .getInstance(protocol, provider);
+        ctx.getClientSessionContext().setSessionTimeout(tlsClientParameters.getSslCacheTimeout());
+        KeyManager[] keyManagers = tlsClientParameters.getKeyManagers();
+        if (tlsClientParameters.getCertAlias() != null) {
+            getKeyManagersWithCertAlias(tlsClientParameters, keyManagers);
+        }
+        ctx.init(keyManagers, tlsClientParameters.getTrustManagers(),
+                 tlsClientParameters.getSecureRandom());
+
+        sslContext = ctx;
+        lastTlsHash = hash;
+        sslState = null;
+        return ctx;
+    }
+
+    public void initializeSSLEngine(SSLContext sslcontext, SSLEngine sslengine) {
+        TLSClientParameters tlsClientParameters = getTlsClientParameters();
+        if (tlsClientParameters == null) {
+            tlsClientParameters = new TLSClientParameters();
+        }
+        String[] cipherSuites = SSLUtils.getCiphersuites(tlsClientParameters.getCipherSuites(),
+                                                         SSLUtils.getSupportedCipherSuites(sslcontext),

+                                                         tlsClientParameters.getCipherSuitesFilter(),
LOG, false);
+        sslengine.setEnabledCipherSuites(cipherSuites);
+    }
+
+    protected static void getKeyManagersWithCertAlias(TLSClientParameters tlsClientParameters,
+                                                      KeyManager[] keyManagers) throws GeneralSecurityException
{
+        if (tlsClientParameters.getCertAlias() != null) {
+            for (int idx = 0; idx < keyManagers.length; idx++) {
+                if (keyManagers[idx] instanceof X509KeyManager) {
+                    try {
+                        keyManagers[idx] = new AliasedX509ExtendedKeyManager(tlsClientParameters.getCertAlias(),
+                                                                             (X509KeyManager)keyManagers[idx]);
+                    } catch (Exception e) {
+                        throw new GeneralSecurityException(e);
+                    }
+                }
+            }
+        }
+    }
+
 
 }

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java?rev=1383023&r1=1383022&r2=1383023&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
(original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
Mon Sep 10 18:28:25 2012
@@ -32,23 +32,31 @@ import org.apache.cxf.service.model.Endp
 import org.apache.cxf.transport.http.HTTPConduit;
 import org.apache.cxf.transport.http.HTTPTransportFactory;
 import org.apache.cxf.transport.http.HTTPTransportFactory.HTTPConduitFactory;
-import org.apache.cxf.transport.http.asyncclient.impl.CXFConnectionFactory;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
-import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponseFactory;
+import org.apache.http.impl.DefaultHttpResponseFactory;
+import org.apache.http.impl.client.EntityEnclosingRequestWrapper;
 import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
+import org.apache.http.impl.nio.conn.DefaultClientAsyncConnection;
+import org.apache.http.impl.nio.conn.PoolingClientAsyncConnectionManager;
 import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
 import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.nio.conn.ClientAsyncConnection;
+import org.apache.http.nio.conn.ClientAsyncConnectionFactory;
+import org.apache.http.nio.conn.scheme.AsyncScheme;
+import org.apache.http.nio.conn.scheme.AsyncSchemeRegistry;
+import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
 import org.apache.http.nio.reactor.ConnectingIOReactor;
 import org.apache.http.nio.reactor.IOEventDispatch;
 import org.apache.http.nio.reactor.IOReactorException;
+import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.nio.reactor.ssl.SSLIOSession;
+import org.apache.http.nio.util.ByteBufferAllocator;
+import org.apache.http.nio.util.HeapByteBufferAllocator;
 import org.apache.http.params.BasicHttpParams;
 import org.apache.http.params.CoreConnectionPNames;
 import org.apache.http.params.HttpParams;
-import org.apache.http.protocol.BasicHttpProcessor;
-import org.apache.http.protocol.RequestConnControl;
-import org.apache.http.protocol.RequestContent;
-import org.apache.http.protocol.RequestExpectContinue;
-import org.apache.http.protocol.RequestTargetHost;
 
 /**
  * 
@@ -56,7 +64,8 @@ import org.apache.http.protocol.RequestT
 @NoJSR250Annotations(unlessNull = "bus")
 public class AsyncHTTPConduitFactory implements BusLifeCycleListener, HTTPConduitFactory
{
     CXFAsyncRequester requester;
-    CXFConnectionManager connManager;
+    ConnectingIOReactor ioReactor;
+    PoolingClientAsyncConnectionManager connectionManager;
     boolean isShutdown;
     
     public AsyncHTTPConduitFactory() {
@@ -86,14 +95,21 @@ public class AsyncHTTPConduitFactory imp
     public void initComplete() {
     }
     public synchronized void preShutdown() {
-        if (connManager != null) {
+        if (ioReactor != null) {
             try {
-                connManager.shutdown(1000);
+                connectionManager.shutdown();
+            } catch (IOException e1) {
+                e1.printStackTrace();
+            }
+            try {
+                ioReactor.shutdown();
             } catch (IOException e) {
                 e.printStackTrace();
             }
+            connectionManager = null;
+            ioReactor = null;
+            requester = null;
         }
-        connManager = null;
         isShutdown = true;
     }
     public void postShutdown() {
@@ -111,28 +127,14 @@ public class AsyncHTTPConduitFactory imp
      // HTTP parameters for the client
         HttpParams params = new BasicHttpParams();
         params.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 16 * 1024);
-        // Create HTTP protocol processing chain
-        BasicHttpProcessor httpproc = new BasicHttpProcessor();
-        httpproc.addInterceptor(new RequestContent());
-        httpproc.addInterceptor(new RequestTargetHost());
-        httpproc.addInterceptor(new RequestConnControl());
-        httpproc.addInterceptor(new RequestExpectContinue());
-
-        // Create client-side HTTP protocol handler
-        CXFAsyncRequestExecutor protocolHandler = new CXFAsyncRequestExecutor();
-        // Create client-side I/O event dispatch
-        CXFConnectionFactory connFactory = new CXFConnectionFactory(params);
-        final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler,

-                connFactory);
+       
         // Create client-side I/O reactor
         IOReactorConfig config = new IOReactorConfig();
         config.setTcpNoDelay(true);
         
-        final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(config);
-        // Create HTTP connection pool
-        connManager = new CXFConnectionManager(ioReactor, connFactory, params);
-        connManager.setDefaultMaxPerRoute(1000);
-        connManager.setMaxTotal(5000);
+        final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(new HttpAsyncRequestExecutor(),
+                                                                                params);
+        ioReactor = new DefaultConnectingIOReactor(config);
 
         // Run the I/O reactor in a separate thread
         Thread t = new Thread(new Runnable() {
@@ -152,9 +154,46 @@ public class AsyncHTTPConduitFactory imp
         // Start the client thread
         t.start();
         
-        requester = new CXFAsyncRequester(connManager, httpproc, 
-                new DefaultConnectionReuseStrategy(), params,
-                connFactory);
+        AsyncSchemeRegistry registry = new AsyncSchemeRegistry();
+        registry.register(new AsyncScheme("http", 80, null));
+        registry.register(new AsyncScheme("https", 443, null));
+
+        connectionManager = new PoolingClientAsyncConnectionManager(ioReactor, registry)
{
+            @Override
+            protected ClientAsyncConnectionFactory createClientAsyncConnectionFactory() {
+                final HttpResponseFactory responseFactory = new DefaultHttpResponseFactory();
+                final ByteBufferAllocator allocator = new HeapByteBufferAllocator();
+
+                return new ClientAsyncConnectionFactory() {
+                    @Override
+                    public ClientAsyncConnection create(String id, IOSession iosession, HttpParams
params) {
+                        return new DefaultClientAsyncConnection(id, iosession, 
+                                                                responseFactory, 
+                                                                allocator, params) {
+                            @Override
+                            protected void onRequestSubmitted(HttpRequest request) {
+                                super.onRequestSubmitted(request);
+                                if (request instanceof EntityEnclosingRequestWrapper) {
+                                    request = ((EntityEnclosingRequestWrapper)request).getOriginal();
+                                }
+                                if (getIOSession() instanceof SSLIOSession) {
+                                    SSLIOSession sslio = (SSLIOSession)getIOSession();
+                                    getIOSession().setAttribute(CXFHttpRequest.class.getName(),
request);
+                                    if (getIOSession().getAttribute("cxf.handshake.done")
!= null) {
+                                        ((CXFHttpRequest)request).getOutputStream()
+                                            .setSSLSession(sslio.getSSLSession());
+                                    }
+                                }
+                            }
+                        };
+                    }
+                };
+            }
+            
+        };
+        connectionManager.setDefaultMaxPerRoute(1000);
+        connectionManager.setMaxTotal(5000);
+        requester = new CXFAsyncRequester(connectionManager);
     }
     
     public CXFAsyncRequester getRequester() throws IOException {

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java?rev=1383023&r1=1383022&r2=1383023&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
(original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
Mon Sep 10 18:28:25 2012
@@ -19,52 +19,41 @@
 
 package org.apache.cxf.transport.http.asyncclient;
 
-import java.io.IOException;
+import java.net.URI;
+import java.security.GeneralSecurityException;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.transport.http.asyncclient.impl.CXFConnectionFactory;
-import org.apache.cxf.transport.http.asyncclient.impl.CXFNIOPoolEntry;
-import org.apache.http.ConnectionClosedException;
-import org.apache.http.ConnectionReuseStrategy;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
+
+import org.apache.http.client.protocol.ClientContext;
 import org.apache.http.concurrent.BasicFuture;
 import org.apache.http.concurrent.FutureCallback;
-import org.apache.http.nio.NHttpClientConnection;
-import org.apache.http.nio.protocol.BasicAsyncRequestExecutionHandler;
-import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
+import org.apache.http.impl.nio.client.DefaultHttpAsyncClient;
+import org.apache.http.nio.conn.ClientAsyncConnectionManager;
+import org.apache.http.nio.conn.scheme.AsyncScheme;
+import org.apache.http.nio.conn.scheme.AsyncSchemeRegistry;
+import org.apache.http.nio.conn.ssl.SSLLayeringStrategy;
 import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
 import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
-import org.apache.http.params.HttpParams;
+import org.apache.http.nio.reactor.IOSession;
 import org.apache.http.protocol.HttpContext;
-import org.apache.http.protocol.HttpProcessor;
 
 public class CXFAsyncRequester {
 
-    private static final Logger LOG = LogUtils.getL7dLogger(CXFAsyncRequester.class);
+    private final ClientAsyncConnectionManager caConMan;
     
-    private final CXFConnectionManager connManager;
-    private final HttpProcessor httppocessor;
-    private final ConnectionReuseStrategy reuseStrategy;
-    private final HttpParams params;
-
     public CXFAsyncRequester(
-            final CXFConnectionManager connManager,
-            final HttpProcessor httppocessor,
-            final ConnectionReuseStrategy reuseStrategy,
-            final HttpParams params,
-            CXFConnectionFactory sslConnFactory) {
+            ClientAsyncConnectionManager caConMan) {
         super();
-        this.connManager = connManager;
-        this.httppocessor = httppocessor;
-        this.reuseStrategy = reuseStrategy;
-        this.params = params;
+        this.caConMan = caConMan;
     }
 
     public <T> Future<T> execute(
-            final CXFHttpHost target,
+            final AsyncHTTPConduit conduit,
+            final URI uri,
             final long connectionTimeout,
             final HttpAsyncRequestProducer requestProducer,
             final HttpAsyncResponseConsumer<T> responseConsumer,
@@ -80,134 +69,41 @@ public class CXFAsyncRequester {
             throw new IllegalArgumentException("HTTP context may not be null");
         }
         BasicFuture<T> future = new BasicFuture<T>(callback);
-        this.connManager.leaseConnection(
-                target, null, 
-                connectionTimeout, TimeUnit.MILLISECONDS,
-                new ConnRequestCallback<T>(
-                future, requestProducer, responseConsumer, context));
-        return future;
-    }
-
-    class ConnRequestCallback<T> implements FutureCallback<CXFNIOPoolEntry> {
-
-        private final BasicFuture<T> requestFuture;
-        private final HttpAsyncRequestProducer requestProducer;
-        private final HttpAsyncResponseConsumer<T> responseConsumer;
-        private final HttpContext context;
-
-        ConnRequestCallback(
-                final BasicFuture<T> requestFuture,
-                final HttpAsyncRequestProducer requestProducer,
-                final HttpAsyncResponseConsumer<T> responseConsumer,
-                final HttpContext context) {
-            super();
-            this.requestFuture = requestFuture;
-            this.requestProducer = requestProducer;
-            this.responseConsumer = responseConsumer;
-            this.context = context;
-        }
-
-        public void completed(final CXFNIOPoolEntry result) {
-            if (this.requestFuture.isDone()) {
-                connManager.releaseConnection(result, 0, null);
-                return;
-            }
-            NHttpClientConnection conn = result.getConnection();
-            BasicAsyncRequestExecutionHandler<T> handler = new BasicAsyncRequestExecutionHandler<T>(
-                    this.requestProducer, this.responseConsumer,
-                    new RequestExecutionCallback<T>(this.requestFuture, result),
-                    this.context, httppocessor, reuseStrategy, params);
-            conn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, handler);
-            conn.requestOutput();
-            if (!conn.isOpen()) {
-                handler.failed(new ConnectionClosedException("Connection closed"));
-                try {
-                    handler.close();
-                } catch (IOException ex) {
-                    LOG.log(Level.SEVERE, ex.getMessage(), ex);
-                }
-            }
-        }
-
-        public void failed(final Exception ex) {
-            try {
-                try {
-                    this.responseConsumer.failed(ex);
-                } finally {
-                    releaseResources();
-                }
-            } finally {
-                this.requestFuture.failed(ex);
-            }
-        }
-
-        public void cancelled() {
-            try {
-                try {
-                    this.responseConsumer.cancel();
-                } finally {
-                    releaseResources();
-                }
-            } finally {
-                this.requestFuture.cancel(true);
-            }
-        }
-
-        public void releaseResources() {
-            try {
-                this.requestProducer.close();
-            } catch (IOException ioex) {
-                LOG.log(Level.SEVERE, ioex.getMessage(), ioex);
-            }
-            try {
-                this.responseConsumer.close();
-            } catch (IOException ioex) {
-                LOG.log(Level.SEVERE, ioex.getMessage(), ioex);
-            }
-        }
-
-    }
-
-    class RequestExecutionCallback<T> implements FutureCallback<T> {
-
-        private final BasicFuture<T> future;
-        private final CXFNIOPoolEntry poolEntry;
-
-        RequestExecutionCallback(
-                final BasicFuture<T> future,
-                final CXFNIOPoolEntry poolEntry) {
-            super();
-            this.future = future;
-            this.poolEntry = poolEntry;
-        }
-
-        public void completed(final T result) {
-            try {
-                // Keep alive indefinitely
-                connManager.releaseConnection(this.poolEntry, 0, TimeUnit.MILLISECONDS);
-            } finally {
-                this.future.completed(result);
-            }
-        }
-
-        public void failed(final Exception ex) {
-            try {
-                this.poolEntry.close();
-                connManager.releaseConnection(this.poolEntry, 0, TimeUnit.MILLISECONDS);
-            } finally {
-                this.future.failed(ex);
-            }
-        }
-
-        public void cancelled() {
-            try {
-                this.poolEntry.close();
-                connManager.releaseConnection(this.poolEntry, 0, TimeUnit.MILLISECONDS);
-            } finally {
-                this.future.cancel(true);
-            }
-        }
+        final AsyncSchemeRegistry reg = new AsyncSchemeRegistry();
+        reg.register(new AsyncScheme("http", 80, null));
+               
+        if ("https".equals(uri.getScheme())) {
+            try {
+                final SSLContext sslcontext = conduit.getSSLContext();
+                reg.register(new AsyncScheme("https", 443, new SSLLayeringStrategy(sslcontext)
{
+                    @Override
+                    protected void initializeEngine(SSLEngine engine) {
+                        conduit.initializeSSLEngine(sslcontext, engine);
+                    }
+                    @Override
+                    protected void verifySession(final IOSession iosession,
+                                          final SSLSession sslsession) throws SSLException
{
+                        super.verifySession(iosession, sslsession);
+                        iosession.setAttribute("cxf.handshake.done", Boolean.TRUE);
+                        CXFHttpRequest req = (CXFHttpRequest)iosession
+                            .removeAttribute(CXFHttpRequest.class.getName());
+                        if (req != null) {
+                            req.getOutputStream().setSSLSession(sslsession);
+                        }
+                    }
+                }));
+            } catch (GeneralSecurityException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+        DefaultHttpAsyncClient dhac = new DefaultHttpAsyncClient(caConMan);
+        context.setAttribute(ClientContext.SCHEME_REGISTRY, reg);
+        dhac.execute(requestProducer, responseConsumer, context, callback);
 
+        return future;
     }
-
+    
+    
+    
 }



Mime
View raw message