cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r1366492 - in /cxf/sandbox/dkulp_async_clients/http-hc: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/cxf/ src/main/java/org/apache/cxf/transport/ src/main/java/org/apache/cxf/transpo...
Date Fri, 27 Jul 2012 18:46:26 GMT
Author: dkulp
Date: Fri Jul 27 18:46:25 2012
New Revision: 1366492

URL: http://svn.apache.org/viewvc?rev=1366492&view=rev
Log:
Add Apache Http Components NIO based client - works fairly well at this point, but having
very sporatic errors with chunk headers being sent wrong.  (like less than 1% of the time).
 No idea what's happening there.

Added:
    cxf/sandbox/dkulp_async_clients/http-hc/
    cxf/sandbox/dkulp_async_clients/http-hc/pom.xml
    cxf/sandbox/dkulp_async_clients/http-hc/src/
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/test/
    cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/
    cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/
    cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/
    cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/
    cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/
    cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/
    cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/
    cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java

Added: cxf/sandbox/dkulp_async_clients/http-hc/pom.xml
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/pom.xml?rev=1366492&view=auto
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/pom.xml (added)
+++ cxf/sandbox/dkulp_async_clients/http-hc/pom.xml Fri Jul 27 18:46:25 2012
@@ -0,0 +1,110 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.cxf</groupId>
+    <artifactId>cxf-rt-transports-http-hc</artifactId>
+    <packaging>jar</packaging>
+    <version>2.7.0-SNAPSHOT</version>
+    <name>Apache CXF Runtime HTTP Transport</name>
+    <description>Apache CXF Runtime HTTP Async Transport</description>
+    <url>http://cxf.apache.org</url>
+
+    <parent>
+        <groupId>org.apache.cxf</groupId>
+        <artifactId>cxf-parent</artifactId>
+        <version>2.7.0-SNAPSHOT</version>
+        <relativePath>../../../parent/pom.xml</relativePath>
+    </parent>
+    <properties>
+        <cxf.osgi.import>
+            javax.servlet*;version="${cxf.osgi.javax.servlet.version}",
+        </cxf.osgi.import>
+        <cxf.osgi.export>
+            org.apache.cxf.*,
+        </cxf.osgi.export>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-transports-http</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.2.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore-nio</artifactId>
+            <version>4.2.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpasyncclient</artifactId>
+            <version>4.0-beta1</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-transports-http-jetty</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-frontend-jaxws</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-testutils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+
+</project>

Added: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1366492&view=auto
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
(added)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
Fri Jul 27 18:46:25 2012
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.helpers.HttpHeaderHelper;
+import org.apache.cxf.io.CacheAndWriteOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transport.http.Headers;
+import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.version.Version;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpException;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.entity.BasicHttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.ContentDecoder;
+import org.apache.http.nio.ContentEncoder;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.protocol.AbstractAsyncResponseConsumer;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
+import org.apache.http.protocol.BasicHttpContext;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * 
+ */
+public class AsyncHTTPConduit extends HTTPConduit {
+
+    AsyncHTTPTransportFactory factory;
+    
+    public AsyncHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t,
+                            AsyncHTTPTransportFactory factory) throws IOException {
+        super(b, ei, t);
+        this.factory = factory;
+    }
+
+    protected void setupConnection(Message message, URL url, HTTPClientPolicy csPolicy) throws
IOException {
+        String httpRequestMethod = 
+            (String)message.get(Message.HTTP_REQUEST_METHOD);
+        if (httpRequestMethod == null) {
+            httpRequestMethod = "POST";
+            message.put(Message.HTTP_REQUEST_METHOD, httpRequestMethod);
+        }
+        CXFEntity e = new CXFEntity(httpRequestMethod);
+        
+        BasicHttpEntity entity = new BasicHttpEntity() {
+            public boolean isStreaming() {
+                return true;
+            }
+        };
+        entity.setChunked(true);
+        //entity.setContentLength(this.file.length());
+        entity.setContentType((String)message.get(Message.CONTENT_TYPE));
+        try {
+            e.setURI(url.toURI());
+        } catch (URISyntaxException e1) {
+            new IOException(e1);
+        }
+        e.setEntity(entity);
+        message.put(CXFEntity.class, e);
+    }
+    
+    
+    protected OutputStream createOutputStream(Message message, 
+                                              boolean needToCacheRequest, 
+                                              boolean isChunking,
+                                              int chunkThreshold) {
+        HttpURLConnection connection = (HttpURLConnection)message.get(KEY_HTTP_CONNECTION);
+        
+        if (isChunking && chunkThreshold <= 0) {
+            chunkThreshold = 0;
+            connection.setChunkedStreamingMode(-1);                    
+        }
+        CXFEntity entity = message.get(CXFEntity.class);
+        return new AsyncWrappedOutputStream(message,
+                                            needToCacheRequest, 
+                                            isChunking,
+                                            chunkThreshold,
+                                            getConduitName(),
+                                            entity.getURI().toString());
+    }
+    
+    
+    class AsyncWrappedOutputStream extends WrappedOutputStream {
+        CXFEntity entity;
+        BasicHttpEntity basicEntity; 
+        boolean isAsync;
+        
+        // Objects for the response
+        HttpResponse httpResponse;
+        ContentDecoder decoder;
+        IOControl ioctrl;
+
+        // Objects for the request
+        ContentEncoder encoder;
+        IOControl requestioctrl;
+        
+        public AsyncWrappedOutputStream(Message message,
+                                        boolean needToCacheRequest, 
+                                        boolean isChunking,
+                                        int chunkThreshold, 
+                                        String conduitName,
+                                        String url) {
+            super(message, needToCacheRequest, isChunking,
+                  chunkThreshold, conduitName,
+                  url);
+            entity = message.get(CXFEntity.class);
+            basicEntity = (BasicHttpEntity)entity.getEntity();
+        }
+        protected void setProtocolHeaders() throws IOException {
+            Headers h = new Headers(outMessage);
+            basicEntity.setContentType(h.determineContentType());
+            boolean addHeaders = MessageUtils.isTrue(outMessage.getContextualProperty(Headers.ADD_HEADERS_PROPERTY));
+            
+            for (Map.Entry<String, List<String>> header : h.headerMap().entrySet())
{
+                if (HttpHeaderHelper.CONTENT_TYPE.equalsIgnoreCase(header.getKey())) {
+                    continue;
+                }
+                if (addHeaders || HttpHeaderHelper.COOKIE.equalsIgnoreCase(header.getKey()))
{
+                    for (String s : header.getValue()) {
+                        entity.addHeader(HttpHeaderHelper.COOKIE, s);
+                    }
+                } else {
+                    StringBuilder b = new StringBuilder();
+                    for (int i = 0; i < header.getValue().size(); i++) {
+                        b.append(header.getValue().get(i));
+                        if (i + 1 < header.getValue().size()) {
+                            b.append(',');
+                        }
+                    }
+                    entity.setHeader(header.getKey(), b.toString());
+                }        
+                if (!entity.containsHeader("User-Agent")) {
+                    entity.setHeader("User-Agent", Version.getCompleteVersionString());
+                }
+            }
+        }
+        
+        protected void setFixedLengthStreamingMode(int i) {
+            basicEntity.setChunked(false);
+            basicEntity.setContentLength(i);
+        }
+        public void thresholdReached() throws IOException {
+            basicEntity.setChunked(true);
+        }
+
+        synchronized void waitForEncoder() throws IOException {
+            while (encoder == null) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    throw new IOException(e);
+                }
+            }
+        }
+        synchronized void setEncoder(ContentEncoder enc, IOControl ioc) {
+            encoder = enc;
+            requestioctrl = ioc;
+            notifyAll();
+        }
+        
+        protected void setupWrappedStream() throws IOException {
+                       
+            HttpAsyncResponseConsumer<Object> consumer = new CXFHttpAsyncResponseConsumer();
+            FutureCallback<Object> callback = new FutureCallback<Object>() {
+                public void completed(Object result) {
+                }
+                public void failed(Exception ex) {
+                }
+                public void cancelled() {
+                }
+                
+            };
+
+            factory.getRequester()
+                .execute(new CXFHttpAsyncRequestProducer(entity),
+                         consumer,
+                         factory.getPool(),
+                         new BasicHttpContext(),
+                         callback);
+            wrappedStream = new OutputStream() {
+                public void write(byte b[], int off, int len) throws IOException {
+                    waitForEncoder();
+                    if (len == 0) {
+                        return;
+                    }
+                    ByteBuffer bb = ByteBuffer.wrap(b, off, len);
+                    while (bb.hasRemaining()) {
+                        int i = encoder.write(bb);
+                        if (i == -1) {
+                            return;
+                        }
+                    }
+                }
+                public void write(int b) throws IOException {
+                    write(new byte[] {(byte)b});
+                }
+                public void close() throws IOException {
+                    waitForEncoder();
+                    requestioctrl.requestInput();
+                    requestioctrl.requestOutput();
+                    encoder.complete();
+                }
+            };
+                        
+            // If we need to cache for retransmission, store data in a
+            // CacheAndWriteOutputStream. Otherwise write directly to the output stream.
+            if (cachingForRetransmission) {
+                cachedStream =
+                    new CacheAndWriteOutputStream(wrappedStream);
+                wrappedStream = cachedStream;
+            } else {
+                wrappedStream = new BufferedOutputStream(wrappedStream, 8192);
+            }
+        }
+        protected synchronized void setHttpResponse(HttpResponse r) {
+            httpResponse = r;
+            notifyAll();
+        }
+        protected synchronized HttpResponse getHttpResponse() throws IOException {
+            while (httpResponse == null) {
+                //FIXME get the read timeout
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    throw new IOException(e);
+                }
+            }
+            return httpResponse;
+        }
+        
+        protected void handleResponseAsync() throws IOException {
+            isAsync = true;
+        }
+        protected void closeInputStream() throws IOException {
+        }
+        
+        protected synchronized void setDecoder(ContentDecoder r, IOControl i) {
+            decoder = r;
+            ioctrl = i;
+            if (isAsync) {
+                //got a response, need to start the response processing now
+                try {
+                    handleResponseOnWorkqueue(false);
+                    isAsync = false; // don't trigger another start on next block. :-)
+                } catch (Exception ex) {
+                    ex.printStackTrace();
+                }
+            }
+            notifyAll();
+        }
+        synchronized void waitForDecoder() throws IOException {
+            while (decoder == null) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    throw new IOException();
+                }
+            }
+        }
+        protected synchronized InputStream getInputStream() throws IOException {
+            return Channels.newInputStream(new ReadableByteChannel() {
+                public boolean isOpen() {
+                    try {
+                        waitForDecoder();
+                    } catch (IOException e) {
+                        return false;
+                    }
+                    return !decoder.isCompleted();
+                }
+                public void close() throws IOException {
+                    waitForDecoder();
+                    ioctrl.requestInput();
+                }
+                public int read(ByteBuffer dst) throws IOException {
+                    waitForDecoder();
+                    int i = 0;
+                    while (i == 0) {
+                        //really should wait for an async event
+                        i = decoder.read(dst);
+                    }
+                    return i;
+                }
+            });
+        }
+        protected boolean usingProxy() {
+            return false;
+        }
+        protected HttpsURLConnectionInfo getHttpsURLConnectionInfo() throws IOException {
+            return null;
+        }
+        protected synchronized int getResponseCode() throws IOException {
+            return getHttpResponse().getStatusLine().getStatusCode();
+        }
+        protected String getResponseMessage() throws IOException {
+            return getHttpResponse().getStatusLine().getReasonPhrase();
+        }
+        
+        private void readHeaders(Headers h) {
+            Header headers[] = httpResponse.getAllHeaders();
+            h.headerMap().clear();
+            for (Header header : headers) {
+                List<String> s = new ArrayList<String>(1);
+                s.add(header.getValue());
+                h.headerMap().put(header.getName(), s);
+            } 
+        }
+        protected void updateResponseHeaders(Message inMessage) {
+            Headers h = new Headers(inMessage);
+            readHeaders(h);
+        }
+        protected InputStream getPartialResponse() throws IOException {
+            return null;
+        }
+        protected void updateCookiesBeforeRetransmit() {
+            Headers h = new Headers();
+            readHeaders(h);
+            cookies.readFromHeaders(h);
+        }
+        protected void retransmitStream() throws IOException {
+        }
+        protected void setupNewConnection(String newURL) throws IOException {
+            httpResponse = null;
+            
+        }
+        
+        
+        class CXFHttpAsyncResponseConsumer extends AbstractAsyncResponseConsumer<Object>
{
+            protected void onResponseReceived(HttpResponse response) throws HttpException,
IOException {
+                setHttpResponse(response);
+            }
+            protected Object buildResult(HttpContext context) throws Exception {
+                return Boolean.TRUE;
+            }
+            protected void onContentReceived(ContentDecoder dec, IOControl ioc) 
+                throws IOException {
+                setDecoder(dec, ioc);
+                if (!dec.isCompleted()) {
+                    ioctrl.suspendInput();
+                }
+            }
+            protected void onEntityEnclosed(HttpEntity e, ContentType contentType) throws
IOException {
+                //nothing
+            }
+            protected void releaseResources() {
+                //decoder = null;
+                //ioctrl = null;
+            }
+        }
+        
+        class CXFHttpAsyncRequestProducer implements HttpAsyncRequestProducer {
+            CXFEntity entity;
+            public CXFHttpAsyncRequestProducer(CXFEntity e) {
+                entity = e;
+            }
+            public void close() throws IOException {
+            }
+            public HttpHost getTarget() {
+                int i = entity.getURI().getPort();
+                if (i == -1) {
+                    i = 80;
+                }
+                HttpHost host = new HttpHost(entity.getURI().getHost(),
+                                    i,
+                                    entity.getURI().getScheme());
+                return host;
+            }
+            public HttpRequest generateRequest() throws IOException, HttpException {
+                return entity;
+            }
+            public void produceContent(ContentEncoder enc, IOControl ioc) throws IOException
{
+                setEncoder(enc, ioc);
+                if (!enc.isCompleted()) { 
+                    ioc.suspendOutput();
+                }
+            }
+            public void requestCompleted(HttpContext context) {
+            }
+            public void failed(Exception ex) {
+                ex.printStackTrace();
+            }
+            public boolean isRepeatable() {
+                return false;
+            }
+            public void resetRequest() throws IOException {
+            }
+        }
+
+    }
+    
+    static class CXFEntity extends HttpEntityEnclosingRequestBase {
+        final String method;
+        public CXFEntity(String m) {
+            super();
+            method = m;
+        }
+        public String getMethod() {
+            return method;
+        }
+    }
+
+    
+    
+}

Added: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java?rev=1366492&view=auto
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
(added)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
Fri Jul 27 18:46:25 2012
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+import javax.annotation.Resource;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.buslifecycle.BusLifeCycleListener;
+import org.apache.cxf.buslifecycle.BusLifeCycleManager;
+import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.http.DestinationRegistry;
+import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transport.http.HTTPConduitConfigurer;
+import org.apache.cxf.transport.http.HTTPTransportFactory;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
+import org.apache.http.impl.nio.pool.BasicNIOConnPool;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
+import org.apache.http.nio.protocol.HttpAsyncRequester;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.nio.reactor.IOReactorException;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.protocol.BasicHttpProcessor;
+import org.apache.http.protocol.RequestConnControl;
+import org.apache.http.protocol.RequestContent;
+import org.apache.http.protocol.RequestExpectContinue;
+import org.apache.http.protocol.RequestTargetHost;
+
+/**
+ * 
+ */
+@NoJSR250Annotations(unlessNull = "bus")
+public class AsyncHTTPTransportFactory extends HTTPTransportFactory implements BusLifeCycleListener
{
+    HttpAsyncRequester requester;
+    BasicNIOConnPool pool;
+    
+    public AsyncHTTPTransportFactory() {
+        super();
+    }
+    public AsyncHTTPTransportFactory(Bus b) {
+        super(b);
+        addListener(b);
+    }
+    public AsyncHTTPTransportFactory(Bus b, DestinationRegistry registry) {
+        super(b, registry);
+        addListener(b);
+    }
+
+    public AsyncHTTPTransportFactory(DestinationRegistry registry) {
+        super(registry);
+    }
+    
+    @Resource 
+    public void setBus(Bus b) {
+        super.setBus(b);
+        addListener(b);
+    }
+    public void initComplete() {
+    }
+    public synchronized void preShutdown() {
+        try {
+            pool.shutdown(1000);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+    public void postShutdown() {
+    }    
+    
+    private void addListener(Bus b) {
+        b.getExtension(BusLifeCycleManager.class).registerLifeCycleListener(this);
+    }
+    
+    
+    public synchronized void setupNIOClient() throws IOReactorException {
+        if (requester != null) {
+            return;
+        }
+     // HTTP parameters for the client
+        HttpParams params = new BasicHttpParams();
+        // Create HTTP protocol processing chain
+        BasicHttpProcessor httpproc = new BasicHttpProcessor();
+        httpproc.addInterceptor(new RequestContent());
+        httpproc.addInterceptor(new RequestTargetHost());
+        httpproc.addInterceptor(new RequestConnControl());
+        httpproc.addInterceptor(new RequestExpectContinue());
+
+        // Create client-side HTTP protocol handler
+        HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor();
+        // Create client-side I/O event dispatch
+        final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler,
params);
+        // Create client-side I/O reactor
+        IOReactorConfig config = new IOReactorConfig();
+        config.setTcpNoDelay(true);
+        
+        final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(config);
+        // Create HTTP connection pool
+        pool = new BasicNIOConnPool(ioReactor, params);
+        pool.setDefaultMaxPerRoute(1000);
+        pool.setMaxTotal(5000);
+
+        // Run the I/O reactor in a separate thread
+        Thread t = new Thread(new Runnable() {
+
+            public void run() {
+                try {
+                    // Ready to go!
+                    ioReactor.execute(ioEventDispatch);
+                } catch (InterruptedIOException ex) {
+                    System.err.println("Interrupted");
+                } catch (IOException e) {
+                    System.err.println("I/O error: " + e.getMessage());
+                }
+            }
+
+        });
+        // Start the client thread
+        t.start();
+        
+        requester = new HttpAsyncRequester(httpproc, new DefaultConnectionReuseStrategy(),
params);
+    }
+    
+    public HttpAsyncRequester getRequester() {
+        return requester;
+    }
+    public BasicNIOConnPool getPool() {
+        return pool;
+    }
+
+    /**
+     * This call creates a new HTTP Conduit based on the EndpointInfo and
+     * EndpointReferenceType.
+     * TODO: What are the formal constraints on EndpointInfo and 
+     * EndpointReferenceType values?
+     */
+    public Conduit getConduit(
+            EndpointInfo endpointInfo,
+            EndpointReferenceType target
+    ) throws IOException {
+        setupNIOClient();
+        
+        HTTPConduit conduit = new AsyncHTTPConduit(bus, endpointInfo, target, this);
+        // Spring configure the conduit.  
+        String address = conduit.getAddress();
+        if (address != null && address.indexOf('?') != -1) {
+            address = address.substring(0, address.indexOf('?'));
+        }
+        HTTPConduitConfigurer c1 = bus.getExtension(HTTPConduitConfigurer.class);
+        if (c1 != null) {
+            c1.configure(conduit.getBeanName(), address, conduit);
+        }
+        configure(conduit, conduit.getBeanName(), address);
+        conduit.finalizeConfig();
+        return conduit;
+    }
+
+}

Added: cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java?rev=1366492&view=auto
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
(added)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
Fri Jul 27 18:46:25 2012
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.ws.AsyncHandler;
+import javax.xml.ws.Endpoint;
+import javax.xml.ws.Response;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.hello_world_soap_http.Greeter;
+import org.apache.hello_world_soap_http.SOAPService;
+import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
+import org.apache.hello_world_soap_http.types.GreetMeResponse;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+
+public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = allocatePort(AsyncHTTPConduitTest.class);
+    
+    static Endpoint ep;
+    static String request;
+    static Greeter g;
+    
+    @BeforeClass
+    public static void start() throws Exception {
+        Bus b = createStaticBus();
+        new AsyncHTTPTransportFactory(b);
+        ep = Endpoint.publish("http://localhost:" + PORT + "/SoapContext/SoapPort",
+                              new org.apache.hello_world_soap_http.GreeterImpl() {
+                public String greetMeLater(long cnt) {
+                    //use the continuations so the async client can 
+                    //have a ton of connections, use less threads
+                    //
+                    //mimic a slow server by delaying somewhere between
+                    //1 and 2 seconds, with a preference of delaying the earlier 
+                    //requests longer to create a sort of backlog/contention 
+                    //with the later requests
+                    ContinuationProvider p = (ContinuationProvider)
+                        getContext().getMessageContext().get(ContinuationProvider.class.getName());
+                    Continuation c = p.getContinuation();
+                    if (c.isNew()) {
+                        c.suspend(2000 - (cnt % 1000));
+                        return null;
+                    }
+                    return "Hello, finally! " + cnt;
+                }
+                public String greetMe(String me) {
+                    return "Hello " + me;
+                }
+            });
+        
+        StringBuilder builder = new StringBuilder("NaNaNa");
+        for (int x = 0; x < 100; x++) {
+            builder.append(" NaNaNa ");
+        }
+        request = builder.toString();
+        
+        URL wsdl = AsyncHTTPConduitTest.class.getResource("/wsdl/hello_world_services.wsdl");
+        assertNotNull("WSDL is null", wsdl);
+
+        SOAPService service = new SOAPService();
+        assertNotNull("Service is null", service);
+
+        g = service.getSoapPort();
+        assertNotNull("Port is null", g);
+    }
+    
+    @AfterClass
+    public static void stop() throws Exception {
+        ((java.io.Closeable)g).close();
+        ep.stop();
+        ep = null;
+    }
+    
+    @Test
+    public void testCall() throws Exception {
+        updateAddressPort(g, PORT);
+        assertEquals("Hello " + request, g.greetMe(request));
+    }
+    @Test
+    public void testCallAsync() throws Exception {
+        updateAddressPort(g, PORT);
+        GreetMeResponse resp = (GreetMeResponse)g.greetMeAsync(request, new AsyncHandler<GreetMeResponse>()
{
+            public void handleResponse(Response<GreetMeResponse> res) {
+                try {
+                    System.out.println(res.get().getResponseType());
+                } catch (InterruptedException e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                } catch (ExecutionException e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+            }
+        }).get();
+        assertEquals("Hello " + request, resp.getResponseType());
+        
+        g.greetMeLaterAsync(1000, new AsyncHandler<GreetMeLaterResponse>() {
+            public void handleResponse(Response<GreetMeLaterResponse> res) {
+            }
+        }).get();
+    }
+        
+    @Test
+    public void testCalls() throws Exception {
+        updateAddressPort(g, PORT);
+
+        //warmup
+        for (int x = 0; x < 10000; x++) {
+            //builder.append("a");
+            //long s1 = System.nanoTime();
+            //System.out.println("aa1: " + s1);
+            String value = g.greetMe(request);
+            //long s2 = System.nanoTime();
+            //System.out.println("aa2: " + s2 + " " + (s2 - s1));
+            assertEquals("Hello " + request, value);
+            //System.out.println();
+        }
+
+        long start = System.currentTimeMillis();
+        for (int x = 0; x < 10000; x++) {
+            //builder.append("a");
+            //long s1 = System.nanoTime();
+            //System.out.println("aa1: " + s1);
+            g.greetMe(request);
+            //long s2 = System.nanoTime();
+            //System.out.println("aa2: " + s2 + " " + (s2 - s1));
+            //System.out.println();
+        }
+        long end = System.currentTimeMillis();
+        System.out.println("Total: " + (end - start));
+        /*
+        updateAddressPort(g, PORT2);
+        String value = g.greetMe(builder.toString());
+        assertEquals("Hello " + builder.toString(), value);
+        */
+    }
+    
+    @Test
+    public void testCallsAsync() throws Exception {
+        updateAddressPort(g, PORT);
+
+        final int warmupIter = 5000;
+        final int runIter = 5000;
+        final CountDownLatch wlatch = new CountDownLatch(warmupIter);
+        final boolean wdone[] = new boolean[warmupIter];
+        
+        @SuppressWarnings("unchecked")
+        AsyncHandler<GreetMeLaterResponse> whandler[] = new AsyncHandler[warmupIter];

+        for (int x = 0; x < warmupIter; x++) {
+            final int c = x;
+            whandler[x] = new AsyncHandler<GreetMeLaterResponse>() {
+                public void handleResponse(Response<GreetMeLaterResponse> res) {
+                    try {
+                        String s = res.get().getResponseType();
+                        s = s.substring(s.lastIndexOf(' ') + 1);
+                        if (c != Integer.parseInt(s)) {
+                            System.out.println("Problem " + c + " != " + s);
+                        }
+                    } catch (InterruptedException e) {
+                        // TODO Auto-generated catch block
+                        e.printStackTrace();
+                    } catch (ExecutionException e) {
+                        // TODO Auto-generated catch block
+                        e.printStackTrace();
+                    }
+                    wdone[c] = true;
+                    wlatch.countDown();
+                }
+            };
+        }
+        
+        //warmup
+        long start = System.currentTimeMillis();
+        for (int x = 0; x < warmupIter; x++) {
+            //builder.append("a");
+            //long s1 = System.nanoTime();
+            //System.out.println("aa1: " + s1);
+            g.greetMeLaterAsync(x, whandler[x]);
+            //long s2 = System.nanoTime();
+            //System.out.println("aa2: " + s2 + " " + (s2 - s1));
+            //System.out.println();
+        }
+        wlatch.await(30, TimeUnit.SECONDS);
+        
+        long end = System.currentTimeMillis();
+        System.out.println("Warmup Total: " + (end - start) + " " + wlatch.getCount());
+        for (int x = 0; x < warmupIter; x++) {
+            if (!wdone[x]) {
+                System.out.println("  " + x);
+            }
+        }
+        if (wlatch.getCount() > 0) {
+            Thread.sleep(1000000);
+        }
+
+        final CountDownLatch rlatch = new CountDownLatch(runIter);
+        AsyncHandler<GreetMeLaterResponse> rhandler = new AsyncHandler<GreetMeLaterResponse>()
{
+            public void handleResponse(Response<GreetMeLaterResponse> res) {
+                rlatch.countDown();
+            }
+        };
+        
+        start = System.currentTimeMillis();
+        for (int x = 0; x < runIter; x++) {
+            //builder.append("a");
+            //long s1 = System.nanoTime();
+            //System.out.println("aa1: " + s1);
+            g.greetMeLaterAsync(x, rhandler);
+            //long s2 = System.nanoTime();
+            //System.out.println("aa2: " + s2 + " " + (s2 - s1));
+            //System.out.println();
+        }
+        rlatch.await(30, TimeUnit.SECONDS);
+        end = System.currentTimeMillis();
+        
+        System.out.println("Total: " + (end - start) + " " + rlatch.getCount());
+    }
+
+}



Mime
View raw message