cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r1530340 - in /cxf/trunk: parent/ rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/
Date Tue, 08 Oct 2013 16:27:00 GMT
Author: dkulp
Date: Tue Oct  8 16:27:00 2013
New Revision: 1530340

URL: http://svn.apache.org/r1530340
Log:
Update to the latest http async stuff (SNAPSHOT for now, release should be very shortly) to
help test it.

Modified:
    cxf/trunk/parent/pom.xml
    cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
    cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
    cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpRequest.java
    cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java

Modified: cxf/trunk/parent/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/parent/pom.xml?rev=1530340&r1=1530339&r2=1530340&view=diff
==============================================================================
--- cxf/trunk/parent/pom.xml (original)
+++ cxf/trunk/parent/pom.xml Tue Oct  8 16:27:00 2013
@@ -90,11 +90,11 @@
         <cxf.ehcache.version>2.7.4</cxf.ehcache.version>
         <cxf.fastinfoset.bundle.version>1.2.7_4</cxf.fastinfoset.bundle.version>
         <cxf.hazelcast.version>1.9.4</cxf.hazelcast.version>
-        <cxf.httpcomponents.asyncclient.version>4.0-beta3</cxf.httpcomponents.asyncclient.version>
-        <cxf.httpcomponents.asyncclient.version.range>[4.0-beta3,4.1)</cxf.httpcomponents.asyncclient.version.range>
-        <cxf.httpcomponents.client.version>4.2.5</cxf.httpcomponents.client.version>
-        <cxf.httpcomponents.core.version>4.2.4</cxf.httpcomponents.core.version>
-        <cxf.httpcomponents.core.version.range>[4.2.1,4.3.0)</cxf.httpcomponents.core.version.range>
+        <cxf.httpcomponents.asyncclient.version>4.0-beta5-SNAPSHOT</cxf.httpcomponents.asyncclient.version>
+        <cxf.httpcomponents.asyncclient.version.range>[4.0-beta4,4.1)</cxf.httpcomponents.asyncclient.version.range>
+        <cxf.httpcomponents.client.version>4.3.1</cxf.httpcomponents.client.version>
+        <cxf.httpcomponents.core.version>4.3</cxf.httpcomponents.core.version>
+        <cxf.httpcomponents.core.version.range>[4.3,4.4.0)</cxf.httpcomponents.core.version.range>
         <cxf.james.mim4j.version>0.7.2</cxf.james.mim4j.version>
         <cxf.logback.classic.version>1.0.13</cxf.logback.classic.version>
         <cxf.log4j.version>1.2.17</cxf.log4j.version>

Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1530340&r1=1530339&r2=1530340&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
(original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
Tue Oct  8 16:27:00 2013
@@ -74,20 +74,20 @@ import org.apache.http.HttpResponse;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.Credentials;
 import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.protocol.ClientContext;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.protocol.HttpClientContext;
 import org.apache.http.concurrent.BasicFuture;
 import org.apache.http.concurrent.FutureCallback;
-import org.apache.http.conn.params.ConnRouteParams;
+import org.apache.http.config.RegistryBuilder;
 import org.apache.http.entity.BasicHttpEntity;
-import org.apache.http.impl.nio.client.DefaultHttpAsyncClient;
-import org.apache.http.nio.conn.scheme.AsyncScheme;
-import org.apache.http.nio.conn.scheme.AsyncSchemeRegistry;
-import org.apache.http.nio.conn.ssl.SSLLayeringStrategy;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.conn.NoopIOSessionStrategy;
+import org.apache.http.nio.conn.SchemeIOSessionStrategy;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
 import org.apache.http.nio.reactor.IOSession;
 import org.apache.http.nio.util.HeapByteBufferAllocator;
-import org.apache.http.params.CoreConnectionPNames;
-import org.apache.http.protocol.BasicHttpContext;
 
 /**
  * 
@@ -98,10 +98,12 @@ public class AsyncHTTPConduit extends UR
     final AsyncHTTPConduitFactory factory;
     volatile int lastTlsHash = -1;
     volatile Object sslState; 
+    volatile URI sslURL;
     volatile SSLContext sslContext;
-    volatile DefaultHttpAsyncClient client;
-    
-    public AsyncHTTPConduit(Bus b, 
+    volatile SSLSession session;
+    volatile CloseableHttpAsyncClient client;
+
+    public AsyncHTTPConduit(Bus b,
                             EndpointInfo ei, 
                             EndpointReferenceType t,
                             AsyncHTTPConduitFactory factory) throws IOException {
@@ -109,7 +111,7 @@ public class AsyncHTTPConduit extends UR
         this.factory = factory;
     }
 
-    public synchronized DefaultHttpAsyncClient getHttpAsyncClient() throws IOException {
+    public synchronized CloseableHttpAsyncClient getHttpAsyncClient() throws IOException
{
         if (client == null) {
             client = factory.createClient(this);
         }
@@ -195,17 +197,18 @@ public class AsyncHTTPConduit extends UR
         e.setURI(uri);
         
         e.setEntity(entity);
-        
-        // Set socket timeout
-        e.getParams().setParameter(CoreConnectionPNames.SO_TIMEOUT, 
-                Integer.valueOf((int) csPolicy.getReceiveTimeout()));
-        
+
+        RequestConfig.Builder b = RequestConfig.custom()
+            .setSocketTimeout((int) csPolicy.getReceiveTimeout())
+            .setConnectTimeout((int) csPolicy.getConnectionTimeout());
         Proxy p = proxyFactory.createProxy(csPolicy , uri);
         if (p != null) {
             InetSocketAddress isa = (InetSocketAddress)p.address();
             HttpHost proxy = new HttpHost(isa.getHostName(), isa.getPort());
-            ConnRouteParams.setDefaultProxy(e.getParams(), proxy);
+            b.setProxy(proxy);
         }
+        e.setConfig(b.build());
+
         message.put(CXFHttpRequest.class, e);
     }
     
@@ -244,7 +247,6 @@ public class AsyncHTTPConduit extends UR
         // Objects for the response
         volatile HttpResponse httpResponse;
         volatile Exception exception;
-        volatile SSLSession session;
 
         private Future<Boolean> connectionFuture;
 
@@ -417,6 +419,7 @@ public class AsyncHTTPConduit extends UR
                 wrappedStream = cachedStream;
             }
         }
+                
         protected void connect(boolean output) throws IOException {
             if (connectionFuture != null) {
                 return;
@@ -456,65 +459,81 @@ public class AsyncHTTPConduit extends UR
                 tlsClientParameters = new TLSClientParameters();
             }
             
-            BasicHttpContext ctx = new BasicHttpContext();
-            if (AsyncHTTPConduit.this.proxyAuthorizationPolicy != null
-                && AsyncHTTPConduit.this.proxyAuthorizationPolicy.getUserName() !=
null) {
-                ctx.setAttribute(ClientContext.CREDS_PROVIDER, new CredentialsProvider()
{
-                    public void setCredentials(AuthScope authscope, Credentials credentials)
{
+            HttpClientContext ctx = HttpClientContext.create();
+
+            BasicCredentialsProvider credsProvider = new BasicCredentialsProvider() {
+
+                @Override
+                public Credentials getCredentials(final AuthScope authscope) {
+                    Credentials creds = super.getCredentials(authscope);
+                    if (creds != null) {
+                        return creds;
                     }
-                    public Credentials getCredentials(AuthScope authscope) {
+                    if (AsyncHTTPConduit.this.proxyAuthorizationPolicy != null
+                            && AsyncHTTPConduit.this.proxyAuthorizationPolicy.getUserName()
!= null) {
                         return new UsernamePasswordCredentials(AsyncHTTPConduit.this
-                                                               .proxyAuthorizationPolicy.getUserName(),
-                                               AsyncHTTPConduit.this.proxyAuthorizationPolicy.getPassword());
-                    }
-                    public void clear() {
+                                .proxyAuthorizationPolicy.getUserName(),
+                                AsyncHTTPConduit.this.proxyAuthorizationPolicy.getPassword());
                     }
-                });
-            }
-            if (tlsClientParameters != null && tlsClientParameters.hashCode() ==
lastTlsHash && sslState != null) {
-                ctx.setAttribute(ClientContext.USER_TOKEN , sslState);
-            }
+                    return null;
+                }
+
+            };
+
+            ctx.setCredentialsProvider(credsProvider);
             
-            final AsyncSchemeRegistry reg = new AsyncSchemeRegistry();
-            reg.register(new AsyncScheme("http", 80, null));
             if ("https".equals(url.getScheme())) {
                 try {
-                    final SSLContext sslcontext = getSSLContext();
-                    reg.register(new AsyncScheme("https", 443, new SSLLayeringStrategy(sslcontext)
{
-                        @Override
-                        protected void initializeEngine(SSLEngine engine) {
-                            initializeSSLEngine(sslcontext, engine);
-                        }
-                        @Override
-                        protected void verifySession(final IOSession iosession,
-                                              final SSLSession sslsession) throws SSLException
{
-                            super.verifySession(iosession, sslsession);
-                            iosession.setAttribute("cxf.handshake.done", Boolean.TRUE);
-                            CXFHttpRequest req = (CXFHttpRequest)iosession
-                                .removeAttribute(CXFHttpRequest.class.getName());
-                            if (req != null) {
-                                req.getOutputStream().setSSLSession(sslsession);
-                            }
-                        }
-                    }));
+                    RegistryBuilder<SchemeIOSessionStrategy> regBuilder 
+                        = RegistryBuilder.<SchemeIOSessionStrategy>create()
+                            .register("http", NoopIOSessionStrategy.INSTANCE);
+                    
+                   
+                    TLSClientParameters tlsClientParameters = getTlsClientParameters();
+                    if (tlsClientParameters == null) {
+                        tlsClientParameters = new TLSClientParameters();
+                    }
+                    final SSLContext sslcontext = getSSLContext(tlsClientParameters);
+                    regBuilder
+                        .register("https",
+                                  new SSLIOSessionStrategy(sslcontext) {
+                                @Override
+                                protected void initializeEngine(SSLEngine engine) {
+                                    initializeSSLEngine(sslcontext, engine);
+                                }
+                                @Override
+                                protected void verifySession(final HttpHost host,
+                                                             final IOSession iosession,
+                                                             final SSLSession sslsession)
throws SSLException {
+                                    iosession.setAttribute("cxf.handshake.done", Boolean.TRUE);
+                                    setSSLSession(sslsession);
+                                }
+                            });
+                    ctx.setAttribute("http.iosession-factory-registry", regBuilder.build());
                 } catch (GeneralSecurityException e) {
                     // TODO Auto-generated catch block
                     e.printStackTrace();
                 }
+            } 
+            
+
+            if (sslURL != null && !sslURL.equals(url)) {
+                sslURL = null;
+                sslState = null;
+                session = null;
             }
-            ctx.setAttribute(ClientContext.SCHEME_REGISTRY, reg);
+            if (tlsClientParameters != null && tlsClientParameters.hashCode() ==
lastTlsHash) {
+                ctx.setUserToken(sslState);
+            }
+
             connectionFuture = new BasicFuture<Boolean>(callback);
-            DefaultHttpAsyncClient c = getHttpAsyncClient();
-            CredentialsProvider credProvider = c.getCredentialsProvider();
+            HttpAsyncClient c = getHttpAsyncClient();
             Credentials creds = (Credentials)outMessage.getContextualProperty(Credentials.class.getName());
-            if (creds != null && credProvider != null) {
-                credProvider.setCredentials(AuthScope.ANY, creds);
+            if (creds != null) {
+                credsProvider.setCredentials(AuthScope.ANY, creds);
+                ctx.setUserToken(creds.getUserPrincipal());
             }
-            if (credProvider != null && credProvider.getCredentials(AuthScope.ANY)
!= null) {
-                ctx.setAttribute(ClientContext.USER_TOKEN,
-                                 credProvider.getCredentials(AuthScope.ANY).getUserPrincipal());
-            }
-            
+
             c.execute(new CXFHttpAsyncRequestProducer(entity, outbuf),
                       new CXFHttpAsyncResponseConsumer(this, inbuf, responseCallback),
                       ctx,
@@ -768,6 +787,8 @@ public class AsyncHTTPConduit extends UR
             exception = null;
             connectionFuture = null;
             session = null;
+            sslState = null;
+            sslURL = null;
             
             //reset the buffers
             HeapByteBufferAllocator allocator = new HeapByteBufferAllocator();
@@ -790,6 +811,7 @@ public class AsyncHTTPConduit extends UR
             session = sslsession;
             synchronized (sessionLock) {
                 sslState = sslsession.getLocalPrincipal();
+                sslURL = url;
                 sessionLock.notifyAll();
             }
         }
@@ -797,13 +819,11 @@ public class AsyncHTTPConduit extends UR
     }
 
 
-    public synchronized SSLContext getSSLContext() throws GeneralSecurityException {
-        TLSClientParameters tlsClientParameters = getTlsClientParameters();
-        if (tlsClientParameters == null) {
-            tlsClientParameters = new TLSClientParameters();
-        }
+    public synchronized SSLContext getSSLContext(TLSClientParameters tlsClientParameters)
+        throws GeneralSecurityException {
+        
         int hash = tlsClientParameters.hashCode();
-        if (hash == lastTlsHash) {
+        if (hash == lastTlsHash && sslContext != null) {
             return sslContext;
         }
         
@@ -825,6 +845,8 @@ public class AsyncHTTPConduit extends UR
         sslContext = ctx;
         lastTlsHash = hash;
         sslState = null;
+        sslURL = null;
+        session = null;
         return ctx;
     }
 

Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java?rev=1530340&r1=1530339&r2=1530340&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
(original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
Tue Oct  8 16:27:00 2013
@@ -20,7 +20,6 @@
 package org.apache.cxf.transport.http.asyncclient;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -35,52 +34,33 @@ import org.apache.cxf.service.model.Endp
 import org.apache.cxf.transport.http.HTTPConduit;
 import org.apache.cxf.transport.http.HTTPConduitFactory;
 import org.apache.cxf.transport.http.HTTPTransportFactory;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.http.HttpRequest;
 import org.apache.http.HttpResponse;
-import org.apache.http.HttpResponseFactory;
-import org.apache.http.HttpVersion;
 import org.apache.http.ProtocolException;
 import org.apache.http.client.RedirectStrategy;
 import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.protocol.RequestAuthCache;
-import org.apache.http.client.protocol.RequestClientConnControl;
-import org.apache.http.client.protocol.RequestDefaultHeaders;
-import org.apache.http.client.protocol.RequestProxyAuthentication;
-import org.apache.http.client.protocol.RequestTargetAuthentication;
-import org.apache.http.impl.DefaultHttpResponseFactory;
-import org.apache.http.impl.client.EntityEnclosingRequestWrapper;
-import org.apache.http.impl.client.ProxyAuthenticationStrategy;
-import org.apache.http.impl.client.TargetAuthenticationStrategy;
-import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
-import org.apache.http.impl.nio.client.DefaultHttpAsyncClient;
-import org.apache.http.impl.nio.conn.DefaultClientAsyncConnection;
-import org.apache.http.impl.nio.conn.PoolingClientAsyncConnectionManager;
+import org.apache.http.config.ConnectionConfig;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.cookie.Cookie;
+import org.apache.http.impl.client.BasicCookieStore;
+import org.apache.http.impl.conn.DefaultSchemePortResolver;
+import org.apache.http.impl.conn.SystemDefaultDnsResolver;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory;
+import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
 import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
 import org.apache.http.impl.nio.reactor.IOReactorConfig;
-import org.apache.http.nio.conn.ClientAsyncConnection;
-import org.apache.http.nio.conn.ClientAsyncConnectionFactory;
-import org.apache.http.nio.conn.scheme.AsyncScheme;
-import org.apache.http.nio.conn.scheme.AsyncSchemeRegistry;
-import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
-import org.apache.http.nio.reactor.ConnectingIOReactor;
-import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.nio.conn.ManagedNHttpClientConnection;
+import org.apache.http.nio.conn.NoopIOSessionStrategy;
+import org.apache.http.nio.conn.SchemeIOSessionStrategy;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
 import org.apache.http.nio.reactor.IOReactorException;
 import org.apache.http.nio.reactor.IOSession;
-import org.apache.http.nio.reactor.ssl.SSLIOSession;
-import org.apache.http.nio.util.ByteBufferAllocator;
-import org.apache.http.nio.util.HeapByteBufferAllocator;
-import org.apache.http.params.BasicHttpParams;
-import org.apache.http.params.HttpConnectionParams;
-import org.apache.http.params.HttpParams;
-import org.apache.http.params.HttpProtocolParams;
-import org.apache.http.params.SyncBasicHttpParams;
-import org.apache.http.protocol.BasicHttpProcessor;
 import org.apache.http.protocol.HttpContext;
-import org.apache.http.protocol.RequestContent;
-import org.apache.http.protocol.RequestExpectContinue;
-import org.apache.http.protocol.RequestTargetHost;
-import org.apache.http.protocol.RequestUserAgent;
 
 /**
  * 
@@ -114,76 +94,55 @@ public class AsyncHTTPConduitFactory imp
         ALWAYS, ASYNC_ONLY, NEVER
     };
         
-    
-    final IOReactorConfig config = new IOReactorConfig();
-    volatile ConnectingIOReactor ioReactor;
-    volatile PoolingClientAsyncConnectionManager connectionManager;
-    
+    volatile PoolingNHttpClientConnectionManager connectionManager;
+    volatile CloseableHttpAsyncClient client;
+
     boolean isShutdown;
     UseAsyncPolicy policy;
     int maxConnections = 5000;
     int maxPerRoute = 1000;
     int connectionTTL = 60000;
 
-    
-    // these have per-instance Logger instances that have sync methods to setup.
-    private final TargetAuthenticationStrategy targetAuthenticationStrategy = new TargetAuthenticationStrategy();
-    private final ProxyAuthenticationStrategy proxyAuthenticationStrategy = new ProxyAuthenticationStrategy();
-    private final BasicHttpProcessor httpproc;
-    
+    int ioThreadCount = IOReactorConfig.DEFAULT.getIoThreadCount();
+    long selectInterval = IOReactorConfig.DEFAULT.getSelectInterval();
+    boolean interestOpQueued = IOReactorConfig.DEFAULT.isInterestOpQueued();
+    int soLinger = IOReactorConfig.DEFAULT.getSoLinger();
+    int soTimeout = IOReactorConfig.DEFAULT.getSoTimeout();
+    boolean soKeepalive = IOReactorConfig.DEFAULT.isSoKeepalive();
+    boolean tcpNoDelay = true;
+
     AsyncHTTPConduitFactory() {
         super();
-        httpproc = new BasicHttpProcessor();
-        httpproc.addInterceptor(new RequestDefaultHeaders());
-        // Required protocol interceptors
-        httpproc.addInterceptor(new RequestContent());
-        httpproc.addInterceptor(new RequestTargetHost());
-        // Recommended protocol interceptors
-        httpproc.addInterceptor(new RequestClientConnControl());
-        httpproc.addInterceptor(new RequestUserAgent());
-        httpproc.addInterceptor(new RequestExpectContinue());
-        // HTTP authentication interceptors
-        httpproc.addInterceptor(new RequestAuthCache());
-        httpproc.addInterceptor(new RequestTargetAuthentication());
-        httpproc.addInterceptor(new RequestProxyAuthentication());        
-
     }
+
     public AsyncHTTPConduitFactory(Map<String, Object> conf) {
         this();
-        config.setTcpNoDelay(true);
         setProperties(conf);
     }
     
-    
     public AsyncHTTPConduitFactory(Bus b) {
         this();
         addListener(b);
-        config.setTcpNoDelay(true);
         setProperties(b.getProperties());
     }
     
-    
-    public BasicHttpProcessor getDefaultHttpProcessor() {
-        return httpproc;
-    }
-    
     public UseAsyncPolicy getUseAsyncPolicy() {
         return policy;
     }
     
     public void update(Map<String, Object> props) {
-        if (setProperties(props) && ioReactor != null) {
+        if (setProperties(props) && client != null) {
             restartReactor(); 
         }
     }
+
     private void restartReactor() {
-        ConnectingIOReactor ioReactor2 = ioReactor;
-        PoolingClientAsyncConnectionManager connectionManager2 = connectionManager;
+        CloseableHttpAsyncClient client2 = client;
         resetVars();
-        shutdown(ioReactor2, connectionManager2);
+        shutdown(client2);
     }
     private synchronized void resetVars() {
-        ioReactor = null;
+        client = null;
         connectionManager = null;
     }
     
@@ -206,6 +165,7 @@ public class AsyncHTTPConduitFactory imp
         maxConnections = getInt(s.get(MAX_CONNECTIONS), maxConnections);
         connectionTTL = getInt(s.get(CONNECTION_TTL), connectionTTL);
         maxPerRoute = getInt(s.get(MAX_PER_HOST_CONNECTIONS), maxPerRoute);
+
         if (connectionManager != null) {
             connectionManager.setMaxTotal(maxConnections);
             connectionManager.setDefaultMaxPerRoute(maxPerRoute);
@@ -214,33 +174,33 @@ public class AsyncHTTPConduitFactory imp
         //properties that need a restart of the reactor
         boolean changed = false;
         
-        int i = config.getIoThreadCount();
-        config.setIoThreadCount(getInt(s.get(THREAD_COUNT), Runtime.getRuntime().availableProcessors()));
-        changed |= i != config.getIoThreadCount();
+        int i = ioThreadCount;
+        ioThreadCount = getInt(s.get(THREAD_COUNT), Runtime.getRuntime().availableProcessors());
+        changed |= i != ioThreadCount;
         
-        long l = config.getSelectInterval();
-        config.setSelectInterval(getInt(s.get(SELECT_INTERVAL), 1000));
-        changed |= l != config.getSelectInterval();
-
-        i = config.getSoLinger();
-        config.setSoLinger(getInt(s.get(SO_LINGER), -1));
-        changed |= i != config.getSoLinger();
-
-        i = config.getSoTimeout();
-        config.setSoTimeout(getInt(s.get(SO_TIMEOUT), 0));
-        changed |= i != config.getSoTimeout();
-
-        boolean b = config.isInterestOpQueued();
-        config.setInterestOpQueued(getBoolean(s.get(INTEREST_OP_QUEUED), false));
-        changed |= b != config.isInterestOpQueued();
+        long l = selectInterval;
+        selectInterval = getInt(s.get(SELECT_INTERVAL), 1000);
+        changed |= l != selectInterval;
+
+        i = soLinger;
+        soLinger = getInt(s.get(SO_LINGER), -1);
+        changed |= i != soLinger;
+
+        i = soTimeout;
+        soTimeout = getInt(s.get(SO_TIMEOUT), 0);
+        changed |= i != soTimeout;
+
+        boolean b = interestOpQueued;
+        interestOpQueued = getBoolean(s.get(INTEREST_OP_QUEUED), false);
+        changed |= b != interestOpQueued;
         
-        b = config.isTcpNoDelay();
-        config.setTcpNoDelay(getBoolean(s.get(TCP_NODELAY), true));
-        changed |= b != config.isTcpNoDelay();
-
-        b = config.isSoKeepalive();
-        config.setSoKeepalive(getBoolean(s.get(SO_KEEPALIVE), false));
-        changed |= b != config.isSoKeepalive();
+        b = tcpNoDelay;
+        tcpNoDelay = getBoolean(s.get(TCP_NODELAY), true);
+        changed |= b != tcpNoDelay;
+
+        b = soKeepalive;
+        soKeepalive = getBoolean(s.get(SO_KEEPALIVE), false);
+        changed |= b != soKeepalive;
                 
         return changed;
     }
@@ -292,136 +252,89 @@ public class AsyncHTTPConduitFactory imp
     public void setBus(Bus b) {
         addListener(b);
     }
+
     public void initComplete() {
     }
+
     public synchronized void preShutdown() {
         shutdown();
     }
+
     public void postShutdown() {
     }    
     
     public void shutdown() {
-        if (ioReactor != null) {
-            shutdown(ioReactor, connectionManager);
+        if (client != null) {
+            shutdown(client);
             connectionManager = null;
-            ioReactor = null;
+            client = null;
         }
         isShutdown = true;
     }
-    private static void shutdown(ConnectingIOReactor ioReactor2,
-                          PoolingClientAsyncConnectionManager connectionManager2) {
-        
+
+    private static void shutdown(CloseableHttpAsyncClient client) {
         try {
-            connectionManager2.shutdown();
+            client.close();
         } catch (IOException e1) {
             e1.printStackTrace();
         }
-        try {
-            ioReactor2.shutdown();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
     }
 
 
     private void addListener(Bus b) {
         b.getExtension(BusLifeCycleManager.class).registerLifeCycleListener(this);
     }
-    
-    
-    public synchronized void setupNIOClient() throws IOReactorException {
-        if (connectionManager != null) {
+
+    public synchronized void setupNIOClient(HTTPClientPolicy clientPolicy) throws IOReactorException
{
+        if (client != null) {
             return;
         }
-        // Create client-side I/O reactor
-        final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(new HttpAsyncRequestExecutor(),
-                                                                                new BasicHttpParams());
-        ioReactor = new DefaultConnectingIOReactor(config);
-        
 
-        // Run the I/O reactor in a separate thread
-        Thread t = new Thread(new Runnable() {
+        IOReactorConfig config = IOReactorConfig.custom()
+                .setIoThreadCount(ioThreadCount)
+                .setSelectInterval(selectInterval)
+                .setInterestOpQueued(interestOpQueued)
+                .setSoLinger(soLinger)
+                .setSoTimeout(soTimeout)
+                .setSoKeepAlive(soKeepalive)
+                .setTcpNoDelay(tcpNoDelay)
+                .build();
+
+        Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry = RegistryBuilder.<SchemeIOSessionStrategy>create()
+                    .register("http", NoopIOSessionStrategy.INSTANCE)
+                    .register("https", SSLIOSessionStrategy.getSystemDefaultStrategy())
+                    .build();
 
-            public void run() {
-                try {
-                    // Ready to go!
-                    ioReactor.execute(ioEventDispatch);
-                } catch (InterruptedIOException ex) {
-                    System.err.println("Interrupted");
-                } catch (IOException e) {
-                    System.err.println("I/O error: " + e.getMessage());
-                }
-            }
 
-        });
-        // Start the client thread
-        t.start();
-        
-        AsyncSchemeRegistry registry = new AsyncSchemeRegistry();
-        registry.register(new AsyncScheme("http", 80, null));
-        registry.register(new AsyncScheme("https", 443, null));
+        ManagedNHttpClientConnectionFactory connectionFactory = new ManagedNHttpClientConnectionFactory()
{
 
-        connectionManager = new PoolingClientAsyncConnectionManager(ioReactor, registry,

-                                                                    connectionTTL, TimeUnit.MILLISECONDS)
{
             @Override
-            protected ClientAsyncConnectionFactory createClientAsyncConnectionFactory() {
-                final HttpResponseFactory responseFactory = new DefaultHttpResponseFactory();
-                final ByteBufferAllocator allocator = new HeapByteBufferAllocator();
-
-                return new ClientAsyncConnectionFactory() {
-                    @Override
-                    public ClientAsyncConnection create(String id, IOSession iosession, HttpParams
params) {
-                        return new DefaultClientAsyncConnection(id, iosession, 
-                                                                responseFactory, 
-                                                                allocator, params) {
-                            @Override
-                            protected void onRequestSubmitted(HttpRequest request) {
-                                super.onRequestSubmitted(request);
-                                if (request instanceof EntityEnclosingRequestWrapper) {
-                                    request = ((EntityEnclosingRequestWrapper)request).getOriginal();
-                                }
-                                if (getIOSession() instanceof SSLIOSession) {
-                                    SSLIOSession sslio = (SSLIOSession)getIOSession();
-                                    getIOSession().setAttribute(CXFHttpRequest.class.getName(),
request);
-                                    if (getIOSession().getAttribute("cxf.handshake.done")
!= null) {
-                                        ((CXFHttpRequest)request).getOutputStream()
-                                            .setSSLSession(sslio.getSSLSession());
-                                    }
-                                }
-                            }
-                        };
-                    }
-                };
+            public ManagedNHttpClientConnection create(final IOSession iosession, final ConnectionConfig
config) {
+                ManagedNHttpClientConnection conn =  super.create(iosession, config);
+                return conn;
             }
-            
         };
+
+        DefaultConnectingIOReactor ioreactor = new DefaultConnectingIOReactor(config);
+        connectionManager = new PoolingNHttpClientConnectionManager(
+                ioreactor,
+                connectionFactory,
+                ioSessionFactoryRegistry,
+                DefaultSchemePortResolver.INSTANCE,
+                SystemDefaultDnsResolver.INSTANCE,
+                connectionTTL, TimeUnit.MILLISECONDS);
+
         connectionManager.setDefaultMaxPerRoute(maxPerRoute);
         connectionManager.setMaxTotal(maxConnections);
-    }
-    
-    public DefaultHttpAsyncClient createClient(final AsyncHTTPConduit c) throws IOException
{
-        if (connectionManager == null) {
-            setupNIOClient();
-        }
-        
-        DefaultHttpAsyncClient dhac = new DefaultHttpAsyncClient(connectionManager) {
-            @Override
-            protected HttpParams createHttpParams() {
-                HttpParams params = new SyncBasicHttpParams();
-                HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
-                HttpConnectionParams.setTcpNoDelay(params, true);
-                int bufSize = c.getClient().getChunkLength() > 0 ? c.getClient().getChunkLength()
: 16332;
-                HttpConnectionParams.setSocketBufferSize(params, bufSize);
-                HttpConnectionParams.setConnectionTimeout(params, (int)c.getClient().getConnectionTimeout());
-                return params;
-            }
-            @Override
-            protected BasicHttpProcessor createHttpProcessor() {
-                return httpproc;
-            }            
-        };
-        //CXF handles redirects ourselves
-        dhac.setRedirectStrategy(new RedirectStrategy() {
+
+        ConnectionConfig connectionConfig = ConnectionConfig.custom()
+                .setBufferSize(clientPolicy.getChunkLength() > 0 ? clientPolicy.getChunkLength()
: 16332)
+                .build();
+
+        connectionManager.setDefaultConnectionConfig(connectionConfig);
+
+        RedirectStrategy redirectStrategy = new RedirectStrategy() {
+
             public boolean isRedirected(HttpRequest request, HttpResponse response, HttpContext
context)
                 throws ProtocolException {
                 return false;
@@ -430,12 +343,26 @@ public class AsyncHTTPConduitFactory imp
                 throws ProtocolException {
                 return null;
             }
-        });
-        dhac.setTargetAuthenticationStrategy(targetAuthenticationStrategy);
-        dhac.setProxyAuthenticationStrategy(proxyAuthenticationStrategy);
-        return dhac;
-    }
+        };
 
+        client = HttpAsyncClients.custom()
+            .setConnectionManager(connectionManager)
+            .setRedirectStrategy(redirectStrategy)
+            .setDefaultCookieStore(new BasicCookieStore() {
+                private static final long serialVersionUID = 1L;
+                public void addCookie(Cookie cookie) {
+                }
+            })
+            .build();
+        // Start the client thread
+        client.start();
+    }
 
+    public CloseableHttpAsyncClient createClient(final AsyncHTTPConduit c) throws IOException
{
+        if (connectionManager == null) {
+            setupNIOClient(c.getClient());
+        }
+        return client;
+    }
 
 }

Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpRequest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpRequest.java?rev=1530340&r1=1530339&r2=1530340&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpRequest.java
(original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpRequest.java
Tue Oct  8 16:27:00 2013
@@ -28,24 +28,26 @@ import org.apache.http.HttpEntityEnclosi
 import org.apache.http.HttpVersion;
 import org.apache.http.ProtocolVersion;
 import org.apache.http.RequestLine;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.Configurable;
 import org.apache.http.message.AbstractHttpMessage;
 import org.apache.http.message.BasicRequestLine;
 import org.apache.http.protocol.HTTP;
 
-public class CXFHttpRequest extends AbstractHttpMessage implements HttpEntityEnclosingRequest
{
+public class CXFHttpRequest extends AbstractHttpMessage implements HttpEntityEnclosingRequest,
Configurable {
 
     private final String method;
     
     private URI uri;
     private HttpEntity entity;
     private AsyncWrappedOutputStream out;
+    private RequestConfig config;
 
     public CXFHttpRequest(final String method) {
         super();
         this.method = method;
     }
     
-
     public void setOutputStream(AsyncWrappedOutputStream o) {
         out = o;
     }
@@ -90,4 +92,13 @@ public class CXFHttpRequest extends Abst
         return expect != null && HTTP.EXPECT_CONTINUE.equalsIgnoreCase(expect.getValue());
     }
 
+    @Override
+    public RequestConfig getConfig() {
+        return config;
+    }
+
+    public void setConfig(RequestConfig config) {
+        this.config = config;
+    }
+
 }

Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java?rev=1530340&r1=1530339&r2=1530340&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
(original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
Tue Oct  8 16:27:00 2013
@@ -102,10 +102,10 @@ public class SharedInputBuffer extends E
                 while ((bytesRead = decoder.read(this.waitingBuffer)) > 0) {
                     totalRead += bytesRead;
                 }
-            } else {
-                while ((bytesRead = decoder.read(this.buffer)) > 0) {
-                    totalRead += bytesRead;
-                }
+            }
+            //read more
+            while ((bytesRead = decoder.read(this.buffer)) > 0) {
+                totalRead += bytesRead;
             }
             if (bytesRead == -1 || decoder.isCompleted()) {
                 this.endOfStream = true;



Mime
View raw message