cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r1368636 - in /cxf/sandbox/dkulp_async_clients/http-hc/src: main/java/org/apache/cxf/transport/http/asyncclient/ test/java/org/apache/cxf/transport/http/asyncclient/
Date Thu, 02 Aug 2012 18:25:31 GMT
Author: dkulp
Date: Thu Aug  2 18:25:30 2012
New Revision: 1368636

URL: http://svn.apache.org/viewvc?rev=1368636&view=rev
Log:
Start adding some error handling into async client
add FIXME's for Proxy and TLS and connectTimeout stuff

Modified:
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1368636&r1=1368635&r2=1368636&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
(original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
Thu Aug  2 18:25:30 2012
@@ -22,8 +22,8 @@ package org.apache.cxf.transport.http.as
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.net.HttpURLConnection;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -44,9 +44,6 @@ import org.apache.http.Header;
 import org.apache.http.HttpResponse;
 import org.apache.http.concurrent.FutureCallback;
 import org.apache.http.entity.BasicHttpEntity;
-import org.apache.http.nio.ContentDecoder;
-import org.apache.http.nio.ContentEncoder;
-import org.apache.http.nio.IOControl;
 import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
 import org.apache.http.nio.util.HeapByteBufferAllocator;
 import org.apache.http.protocol.BasicHttpContext;
@@ -91,12 +88,6 @@ public class AsyncHTTPConduit extends HT
                                               boolean needToCacheRequest, 
                                               boolean isChunking,
                                               int chunkThreshold) {
-        HttpURLConnection connection = (HttpURLConnection)message.get(KEY_HTTP_CONNECTION);
-        
-        if (isChunking && chunkThreshold <= 0) {
-            chunkThreshold = 0;
-            connection.setChunkedStreamingMode(-1);                    
-        }
         CXFHttpRequest entity = message.get(CXFHttpRequest.class);
         return new AsyncWrappedOutputStream(message,
                                             needToCacheRequest, 
@@ -110,18 +101,16 @@ public class AsyncHTTPConduit extends HT
     class AsyncWrappedOutputStream extends WrappedOutputStream {
         final CXFHttpRequest entity;
         final BasicHttpEntity basicEntity;
-        final SharedInputBuffer inbuf;
-        final SharedOutputBuffer outbuf;
+        final HTTPClientPolicy csPolicy;
+
         boolean isAsync;
+        SharedInputBuffer inbuf;
+        SharedOutputBuffer outbuf;
         
         // Objects for the response
-        HttpResponse httpResponse;
-        ContentDecoder decoder;
-        IOControl ioctrl;
-
-        // Objects for the request
-        ContentEncoder encoder;
-        IOControl requestioctrl;
+        volatile HttpResponse httpResponse;
+        volatile Exception exception;
+
         
         public AsyncWrappedOutputStream(Message message,
                                         boolean needToCacheRequest, 
@@ -132,11 +121,12 @@ public class AsyncHTTPConduit extends HT
             super(message, needToCacheRequest, isChunking,
                   chunkThreshold, conduitName,
                   url);
+            csPolicy = getClient(message);
             entity = message.get(CXFHttpRequest.class);
             basicEntity = (BasicHttpEntity)entity.getEntity();
             HeapByteBufferAllocator allocator = new HeapByteBufferAllocator();
-            inbuf = new SharedInputBuffer(4096, allocator);
-            outbuf = new SharedOutputBuffer(4096, allocator);
+            inbuf = new SharedInputBuffer(4096 * 4, allocator, csPolicy.getReceiveTimeout());
+            outbuf = new SharedOutputBuffer(4096 * 4, allocator);
         }
         
         protected void setProtocolHeaders() throws IOException {
@@ -196,6 +186,7 @@ public class AsyncHTTPConduit extends HT
                 }
                 
                 public void failed(Exception ex) {
+                    setException(ex);
                     inbuf.shutdown();
                     outbuf.shutdown();
                 }
@@ -205,7 +196,16 @@ public class AsyncHTTPConduit extends HT
                 }
                 
             };
+            
+            //FIXME - what to do with a Proxy?
+            //Proxy proxy = proxyFactory.createProxy(csPolicy , entity.getURI());
+            
+            //FIXME - what to do for SSL/TLS?
+            //tlsClientParameters.*
 
+            //FIXME - what to do with the connection timeout 
+            //long connectTimout = csPolicy.getConnectionTimeout();
+            
             factory.getRequester().execute(new CXFHttpAsyncRequestProducer(entity, outbuf),
                          consumer,
                          factory.getPool(),
@@ -213,7 +213,6 @@ public class AsyncHTTPConduit extends HT
                          callback);
             
             wrappedStream = new OutputStream() {
-                
                 public void write(byte b[], int off, int len) throws IOException {
                     outbuf.write(b, off, len);
                 }
@@ -246,15 +245,34 @@ public class AsyncHTTPConduit extends HT
             }
             notifyAll();
         }
+        protected synchronized void setException(Exception ex) {
+            exception = ex;
+            notifyAll();
+        }
 
         protected synchronized HttpResponse getHttpResponse() throws IOException {
             while (httpResponse == null) {
-                //FIXME get the read timeout
                 try {
-                    wait();
+                    wait(csPolicy.getReceiveTimeout());
                 } catch (InterruptedException e) {
                     throw new IOException(e);
                 }
+                if (httpResponse == null) {
+                    outbuf.shutdown();
+                    inbuf.shutdown();
+                    
+                    if (exception != null) {
+                        if (exception instanceof IOException) {
+                            throw (IOException)exception;
+                        }
+                        if (exception instanceof RuntimeException) {
+                            throw (RuntimeException)exception;
+                        }
+                        throw new IOException(exception);
+                    }
+                    
+                    throw new IOException("Read Timeout");
+                }
             }
             return httpResponse;
         }
@@ -268,32 +286,21 @@ public class AsyncHTTPConduit extends HT
         
         protected synchronized InputStream getInputStream() throws IOException {
             return new InputStream() {
-
-                @Override
                 public int read() throws IOException {
                     return inbuf.read();
                 }
-
-                @Override
                 public int read(byte[] b) throws IOException {
                     return inbuf.read(b);
                 }
-
-                @Override
                 public int read(byte[] b, int off, int len) throws IOException {
                     return inbuf.read(b, off, len);
                 }
-
-                @Override
                 public int available() throws IOException {
                     return inbuf.available();
                 }
-
-                @Override
                 public void close() throws IOException {
                     inbuf.close();
                 }
-                
             };
         }
         
@@ -339,10 +346,24 @@ public class AsyncHTTPConduit extends HT
         }
         
         protected void retransmitStream() throws IOException {
+            cachingForRetransmission = false; //already cached
+            setupWrappedStream();
+            cachedStream.writeCacheTo(wrappedStream);
         }
 
         protected void setupNewConnection(String newURL) throws IOException {
             httpResponse = null;
+            isAsync = false;
+            exception = null;
+            try {
+                entity.setURI(new URI(newURL));
+            } catch (URISyntaxException e) {
+                throw new IOException(e);
+            }
+            //reset the buffers
+            HeapByteBufferAllocator allocator = new HeapByteBufferAllocator();
+            inbuf = new SharedInputBuffer(4096 * 4, allocator, csPolicy.getReceiveTimeout());
+            outbuf = new SharedOutputBuffer(4096 * 4, allocator);
         }
         
     }

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java?rev=1368636&r1=1368635&r2=1368636&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
(original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
Thu Aug  2 18:25:30 2012
@@ -46,6 +46,7 @@ import org.apache.http.nio.reactor.Conne
 import org.apache.http.nio.reactor.IOEventDispatch;
 import org.apache.http.nio.reactor.IOReactorException;
 import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.CoreConnectionPNames;
 import org.apache.http.params.HttpParams;
 import org.apache.http.protocol.BasicHttpProcessor;
 import org.apache.http.protocol.RequestConnControl;
@@ -105,6 +106,7 @@ public class AsyncHTTPTransportFactory e
         }
      // HTTP parameters for the client
         HttpParams params = new BasicHttpParams();
+        params.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 16 * 1024);
         // Create HTTP protocol processing chain
         BasicHttpProcessor httpproc = new BasicHttpProcessor();
         httpproc.addInterceptor(new RequestContent());

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java?rev=1368636&r1=1368635&r2=1368636&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
(original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
Thu Aug  2 18:25:30 2012
@@ -21,6 +21,7 @@ package org.apache.cxf.transport.http.as
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -48,15 +49,19 @@ public class SharedInputBuffer extends E
 
     private final ReentrantLock lock;
     private final Condition condition;
+    private final long timeout;
 
     private volatile IOControl ioctrl;
     private volatile boolean shutdown;
     private volatile boolean endOfStream;
 
-    public SharedInputBuffer(int buffersize, final ByteBufferAllocator allocator) {
+    public SharedInputBuffer(int buffersize, 
+                             final ByteBufferAllocator allocator,
+                             long timeout) {
         super(buffersize, allocator);
         this.lock = new ReentrantLock();
         this.condition = this.lock.newCondition();
+        this.timeout = timeout;
     }
 
     public void reset() {
@@ -74,6 +79,10 @@ public class SharedInputBuffer extends E
 
     public int consumeContent(final ContentDecoder decoder, final IOControl ioc) throws IOException
{
         if (this.shutdown) {
+            //something bad happened, we need to shutdown the connection
+            //as we're not going to read the data at all and we
+            //don't want to keep getting read notices and such
+            ioc.shutdown();
             return -1;
         }
         this.lock.lock();
@@ -158,7 +167,10 @@ public class SharedInputBuffer extends E
                     if (this.ioctrl != null) {
                         this.ioctrl.requestInput();
                     }
-                    this.condition.await();
+                    if (!this.condition.await(timeout, TimeUnit.MILLISECONDS)) {
+                        shutdown();
+                        throw new IOException("Read timeout waiting for data");
+                    }
                 }
             } catch (InterruptedException ex) {
                 throw new IOException("Interrupted while waiting for more data");

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java?rev=1368636&r1=1368635&r2=1368636&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
(original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
Thu Aug  2 18:25:30 2012
@@ -31,7 +31,9 @@ import javax.xml.ws.Response;
 import org.apache.cxf.Bus;
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.frontend.ClientProxy;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.transport.http.HTTPConduit;
 import org.apache.hello_world_soap_http.Greeter;
 import org.apache.hello_world_soap_http.SOAPService;
 import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
@@ -45,6 +47,7 @@ import org.junit.Test;
 
 public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase {
     public static final String PORT = allocatePort(AsyncHTTPConduitTest.class);
+    public static final String PORT_INV = allocatePort(AsyncHTTPConduitTest.class, 2);
     
     static Endpoint ep;
     static String request;
@@ -68,7 +71,11 @@ public class AsyncHTTPConduitTest extend
                         getContext().getMessageContext().get(ContinuationProvider.class.getName());
                     Continuation c = p.getContinuation();
                     if (c.isNew()) {
-                        c.suspend(2000 - (cnt % 1000));
+                        if (cnt < 0) {
+                            c.suspend(-cnt);
+                        } else {
+                            c.suspend(2000 - (cnt % 1000));
+                        }
                         return null;
                     }
                     return "Hello, finally! " + cnt;
@@ -79,7 +86,7 @@ public class AsyncHTTPConduitTest extend
             });
         
         StringBuilder builder = new StringBuilder("NaNaNa");
-        for (int x = 0; x < 1000; x++) {
+        for (int x = 0; x < 10; x++) {
             builder.append(" NaNaNa ");
         }
         request = builder.toString();
@@ -100,7 +107,28 @@ public class AsyncHTTPConduitTest extend
         ep.stop();
         ep = null;
     }
-    
+    @Test
+    public void testTimeout() throws Exception {
+        updateAddressPort(g, PORT);
+        HTTPConduit c = (HTTPConduit)ClientProxy.getClient(g).getConduit();
+        c.getClient().setReceiveTimeout(3000);
+        try {
+            assertEquals("Hello " + request, g.greetMeLater(-5000));
+            fail();
+        } catch (Exception ex) {
+            //expected!!!
+        }
+    }
+    @Test
+    public void testConnectIssue() throws Exception {
+        updateAddressPort(g, PORT_INV);
+        try {
+            g.greetMe(request);
+            fail("should have connect exception");
+        } catch (Exception ex) {
+            //expected
+        }
+    }    
     @Test
     public void testCall() throws Exception {
         updateAddressPort(g, PORT);
@@ -112,7 +140,7 @@ public class AsyncHTTPConduitTest extend
         GreetMeResponse resp = (GreetMeResponse)g.greetMeAsync(request, new AsyncHandler<GreetMeResponse>()
{
             public void handleResponse(Response<GreetMeResponse> res) {
                 try {
-                    System.out.println(res.get().getResponseType());
+                    res.get().getResponseType();
                 } catch (InterruptedException e) {
                     // TODO Auto-generated catch block
                     e.printStackTrace();



Mime
View raw message