cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject svn commit: r1514298 - in /cxf/trunk/rt/transports/http-netty/netty-client: ./ src/main/java/org/apache/cxf/transport/http/netty/client/ src/test/java/org/apache/cxf/transport/http/netty/client/integration/
Date Thu, 15 Aug 2013 14:32:02 GMT
Author: ningjiang
Date: Thu Aug 15 14:32:02 2013
New Revision: 1514298

URL: http://svn.apache.org/r1514298
Log:
CXF-5177 Upgraded netty version of netty-client to 4.0.x

Modified:
    cxf/trunk/rt/transports/http-netty/netty-client/pom.xml
    cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java
    cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java
    cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java
    cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientRequest.java
    cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java
    cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitFactory.java
    cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/NettyClientTest.java
    cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/SSLNettyClientTest.java

Modified: cxf/trunk/rt/transports/http-netty/netty-client/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/pom.xml?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/pom.xml (original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/pom.xml Thu Aug 15 14:32:02 2013
@@ -35,8 +35,8 @@
     </parent>
     
     <properties>
-        <cxf.osgi.import>
-	    org.jboss.netty.*;version="${cxf.netty.version.range}",
+	    <cxf.osgi.import>
+	    io.netty.*;version="${cxf.netty.version.range}",
 	    javax.annotation;version="${cxf.osgi.javax.annotation.version}",
         </cxf.osgi.import>
         <cxf.osgi.export>
@@ -53,8 +53,8 @@
 
         <dependency>
             <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
-            <version>3.6.5.Final</version>
+            <artifactId>netty-codec-http</artifactId>
+            <version>${cxf.netty.version}</version>
         </dependency>
         
          <dependency>

Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java
(original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java
Thu Aug 15 14:32:02 2013
@@ -19,7 +19,7 @@
 
 package org.apache.cxf.transport.http.netty.client;
 
-import org.jboss.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponse;
 
 public interface CxfResponseCallBack {
 

Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java
(original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java
Thu Aug 15 14:32:02 2013
@@ -19,51 +19,60 @@
 
 package org.apache.cxf.transport.http.netty.client;
 
-
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-
-public class NettyHttpClientHandler extends SimpleChannelHandler {
 
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http.HttpResponse;
+
+public class NettyHttpClientHandler extends ChannelDuplexHandler {
+    private final BlockingQueue<NettyHttpClientRequest> sendedQueue = 
+        new LinkedBlockingDeque<NettyHttpClientRequest>();
+        
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-        HttpResponse response = (HttpResponse)e.getMessage();
-        Channel ch = ctx.getChannel();
-        @SuppressWarnings("unchecked")
-        BlockingQueue<NettyHttpClientRequest> sendedQueue = (BlockingQueue<NettyHttpClientRequest>)ch.getAttachment();
-        NettyHttpClientRequest request = sendedQueue.poll();
-        request.setResponse(response);
-        // calling the callback here
-        request.getCxfResponseCallback().responseReceived(response);
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        
+        if (msg instanceof HttpResponse) {
+            // just make sure we can combine the request and response together
+            HttpResponse response = (HttpResponse)msg;
+            NettyHttpClientRequest request = sendedQueue.poll();
+            request.setResponse(response);
+            // calling the callback here
+            request.getCxfResponseCallback().responseReceived(response);
+        } else {
+            super.channelRead(ctx, msg);
+        }
     }
 
 
     @Override
-    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-        Object p = e.getMessage();
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws
Exception {
+        
         // need to deal with the request
-        if (p instanceof NettyHttpClientRequest) {
-            NettyHttpClientRequest request = (NettyHttpClientRequest)e.getMessage();
-            Channel ch = ctx.getChannel();
-            @SuppressWarnings("unchecked")
-            BlockingQueue<NettyHttpClientRequest> sendedQueue = 
-                (BlockingQueue<NettyHttpClientRequest>)ch.getAttachment();
-            if (sendedQueue == null) {
-                sendedQueue = new LinkedBlockingDeque<NettyHttpClientRequest>();
-                ch.setAttachment(sendedQueue);
-            }
+        if (msg instanceof NettyHttpClientRequest) {
+            NettyHttpClientRequest request = (NettyHttpClientRequest)msg;
             sendedQueue.put(request);
-            ctx.getChannel().write(request.getRequest());
+            ctx.writeAndFlush(request.getRequest());
         } else {
-            super.writeRequested(ctx, e);
+            super.write(ctx, msg, promise);
         }
     }
+    
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+        throws Exception {
+        //TODO need to handle the exception here
+        cause.printStackTrace();
+        ctx.close();
+    }
+    
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+        ctx.flush();
+    }
+    
+    
 
 }

Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java
(original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java
Thu Aug 15 14:32:02 2013
@@ -25,31 +25,31 @@ import java.util.logging.Logger;
 import javax.net.ssl.SSLEngine;
 
 import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.configuration.jsse.TLSClientParameters;
 import org.apache.cxf.transport.https.SSLUtils;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
 
-public class NettyHttpClientPipelineFactory implements ChannelPipelineFactory {
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequestEncoder;
+import io.netty.handler.codec.http.HttpResponseDecoder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+
+public class NettyHttpClientPipelineFactory extends ChannelInitializer<Channel> {
     
     private static final Logger LOG =
         LogUtils.getL7dLogger(NettyHttpClientPipelineFactory.class);
+    private final NettyHttpConduit httpConduit;
     
-    private final TLSClientParameters tlsClientParameters;
-    
-    public NettyHttpClientPipelineFactory(TLSClientParameters tlsClientParameters) {
-        this.tlsClientParameters = tlsClientParameters;
+    public NettyHttpClientPipelineFactory(NettyHttpConduit httpConduit) {
+        this.httpConduit = httpConduit;
     }
     
     @Override
-    public ChannelPipeline getPipeline() throws Exception {
-        ChannelPipeline pipeline = Channels.pipeline();
+    protected void initChannel(Channel ch) throws Exception {
+        ChannelPipeline pipeline = ch.pipeline();
         
         SslHandler sslHandler = configureClientSSLOnDemand();
         if (sslHandler != null) {
@@ -59,18 +59,18 @@ public class NettyHttpClientPipelineFact
             pipeline.addLast("ssl", sslHandler);
         }
 
+        
         pipeline.addLast("decoder", new HttpResponseDecoder());
-        pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
+        // TODO need to configure the aggregator size
+        pipeline.addLast("aggregator", new HttpObjectAggregator(1048576));
         pipeline.addLast("encoder", new HttpRequestEncoder());
         pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
         pipeline.addLast("client", new NettyHttpClientHandler());
-        return pipeline;
-
     }
     
     private SslHandler configureClientSSLOnDemand() throws Exception {
-        if (tlsClientParameters != null) {
-            SSLEngine sslEngine = SSLUtils.createClientSSLEngine(tlsClientParameters);
+        if (httpConduit.getTlsClientParameters() != null) {
+            SSLEngine sslEngine = SSLUtils.createClientSSLEngine(httpConduit.getTlsClientParameters());
             return new SslHandler(sslEngine);
         } else {
             return null;

Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientRequest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientRequest.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientRequest.java
(original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientRequest.java
Thu Aug 15 14:32:02 2013
@@ -20,39 +20,43 @@
 package org.apache.cxf.transport.http.netty.client;
 
 import java.net.URI;
-import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpVersion;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpVersion;
 
 public class NettyHttpClientRequest {
 
     private HttpRequest request;
     private HttpResponse response;
     private URI uri;
+    private String method;
     private CxfResponseCallBack cxfResponseCallback;
     private int connectionTimeout;
     private int receiveTimeout;
 
     public NettyHttpClientRequest(URI requestUri, String method) {
         this.uri = requestUri;
+        this.method = method;
+    }
+    
+    public void createRequest(ByteBuf content) {
         this.request  = 
-            new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.getPath().toString());
+            new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+                                       HttpMethod.valueOf(method),
+                                       uri.getPath().toString(), content);
         // setup the default headers
-        request.setHeader("Connection", "keep-alive");
-        request.setHeader("Host", uri.getHost() + ":" + uri.getPort());
-
+        request.headers().set("Connection", "keep-alive");
+        request.headers().set("Host", uri.getHost() + ":" + uri.getPort());
     }
 
     public HttpRequest getRequest() {
         return request;
     }
-
-    public void setRequest(HttpRequest request) {
-        this.request = request;
-    }
-
+    
     public HttpResponse getResponse() {
         return response;
     }

Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java
(original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java
Thu Aug 15 14:32:02 2013
@@ -40,6 +40,7 @@ import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLSession;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.buslifecycle.BusLifeCycleListener;
 import org.apache.cxf.common.util.StringUtils;
 import org.apache.cxf.helpers.HttpHeaderHelper;
 import org.apache.cxf.io.CacheAndWriteOutputStream;
@@ -53,43 +54,52 @@ import org.apache.cxf.transport.https.Ht
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.version.Version;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.ssl.SslHandler;
 
-public class NettyHttpConduit extends URLConnectionHTTPConduit {
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.ssl.SslHandler;
+
+public class NettyHttpConduit extends URLConnectionHTTPConduit implements BusLifeCycleListener
{
     public static final String USE_ASYNC = "use.async.http.conduit";
     final NettyHttpConduitFactory factory;
-    private final ClientBootstrap bootstrap;
+    private Bootstrap bootstrap;
+    // TODO do we need to use the EventLoop from NettyHttpConduitFactory
+    private final EventLoopGroup group;
     
     public NettyHttpConduit(Bus b, EndpointInfo ei, EndpointReferenceType t, NettyHttpConduitFactory
conduitFactory)
         throws IOException {
         super(b, ei, t);
         factory = conduitFactory;
-        bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory());
+        group = new NioEventLoopGroup();
+        bootstrap = new Bootstrap();
+        bootstrap.group(group);
+        bootstrap.channel(NioSocketChannel.class);
+        bootstrap.handler(new NettyHttpClientPipelineFactory(this));
     }
-
+    
     public NettyHttpConduitFactory getNettyHttpConduitFactory() {
         return factory;
     }
     
     public void close() {
         super.close();
-        // clean up the resource that ClientChannelFactory used
-        bootstrap.shutdown();
+        // clean up thread of the group
+        group.shutdownGracefully().syncUninterruptibly();
     }
 
     // Using Netty API directly
     protected void setupConnection(Message message, URI uri, HTTPClientPolicy csPolicy) throws
IOException {
-
         // need to do some clean up work on the URI address
         String uriString = uri.toString();
         if (uriString.startsWith("netty://")) {
@@ -122,12 +132,9 @@ public class NettyHttpConduit extends UR
         final int rtimeout = determineReceiveTimeout(message, csPolicy);
         request.setConnectionTimeout(ctimeout);
         request.setReceiveTimeout(rtimeout);
-        request.getRequest().setChunked(true);
-        request.getRequest().setHeader(Message.CONTENT_TYPE, (String)message.get(Message.CONTENT_TYPE));
-        // need to socket connection timeout
-
+        
         message.put(NettyHttpClientRequest.class, request);
-        bootstrap.setPipelineFactory(new NettyHttpClientPipelineFactory(getTlsClientParameters()));
+        
     }
 
     protected OutputStream createOutputStream(Message message,
@@ -142,7 +149,10 @@ public class NettyHttpConduit extends UR
                 chunkThreshold,
                 getConduitName(),
                 entity.getUri());
-        entity.getRequest().setContent(out.getOutBuffer());
+        entity.createRequest(out.getOutBuffer());
+        // TODO need to check how to set the Chunked feature
+        //request.getRequest().setChunked(true);
+        entity.getRequest().headers().set(Message.CONTENT_TYPE, (String)message.get(Message.CONTENT_TYPE));
         return out;
 
 
@@ -156,7 +166,7 @@ public class NettyHttpConduit extends UR
         volatile Channel channel;
         volatile SSLSession session;
         boolean isAsync;
-        ChannelBuffer outBuffer;
+        ByteBuf outBuffer;
         OutputStream outputStream;
 
         protected NettyWrappedOutputStream(Message message, boolean possibleRetransmit,
@@ -165,11 +175,11 @@ public class NettyHttpConduit extends UR
             csPolicy = getClient(message);
             entity  = message.get(NettyHttpClientRequest.class);
             int bufSize = csPolicy.getChunkLength() > 0 ? csPolicy.getChunkLength() :
16320;
-            outBuffer = ChannelBuffers.dynamicBuffer(bufSize);
-            outputStream = new ChannelBufferOutputStream(outBuffer);
+            outBuffer = Unpooled.buffer(bufSize);
+            outputStream = new ByteBufOutputStream(outBuffer);
         }
 
-        protected ChannelBuffer getOutBuffer() {
+        protected ByteBuf getOutBuffer() {
             return outBuffer;
         }
 
@@ -200,6 +210,10 @@ public class NettyHttpConduit extends UR
             }
             return httpResponse;
         }
+        
+        protected HttpContent getHttpResponseContent() throws IOException {
+            return (HttpContent) getHttpResponse();
+        }
 
 
         protected synchronized Channel getChannel() throws IOException {
@@ -248,7 +262,7 @@ public class NettyHttpConduit extends UR
                         @Override
                         public void operationComplete(ChannelFuture future) throws Exception
{
                             if (!future.isSuccess()) {
-                                setException(future.getCause());
+                                setException(future.cause());
                             }
                         }
                     };
@@ -277,13 +291,13 @@ public class NettyHttpConduit extends UR
                 @Override
                 public void operationComplete(ChannelFuture future) throws Exception {
                     if (future.isSuccess()) {
-                        setChannel(future.getChannel());
-                        SslHandler sslHandler = channel.getPipeline().get(SslHandler.class);
+                        setChannel(future.channel());
+                        SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
                         if (sslHandler != null) {
-                            session = sslHandler.getEngine().getSession();
+                            session = sslHandler.engine().getSession();
                         }
                     } else {
-                        setException((Exception) future.getCause());
+                        setException((Exception) future.cause());
                     }
                 }
             };
@@ -291,9 +305,9 @@ public class NettyHttpConduit extends UR
             connFuture.addListener(listener);
 
             if (!output) {
-                entity.getRequest().removeHeader("Transfer-Encoding");
-                entity.getRequest().removeHeader("Content-Type");
-                entity.getRequest().setContent(null);
+                entity.getRequest().headers().remove("Transfer-Encoding");
+                entity.getRequest().headers().remove("Content-Type");
+                entity.getRequest().headers().remove(null);
             }
 
             // setup the CxfResponseCallBack
@@ -347,7 +361,7 @@ public class NettyHttpConduit extends UR
         @Override
         protected void setProtocolHeaders() throws IOException {
             Headers h = new Headers(outMessage);
-            entity.getRequest().setHeader(Message.CONTENT_TYPE, h.determineContentType());
+            entity.getRequest().headers().set(Message.CONTENT_TYPE, h.determineContentType());
             boolean addHeaders = MessageUtils.isTrue(outMessage.getContextualProperty(Headers.ADD_HEADERS_PROPERTY));
 
             for (Map.Entry<String, List<String>> header : h.headerMap().entrySet())
{
@@ -356,7 +370,7 @@ public class NettyHttpConduit extends UR
                 }
                 if (addHeaders || HttpHeaderHelper.COOKIE.equalsIgnoreCase(header.getKey()))
{
                     for (String s : header.getValue()) {
-                        entity.getRequest().addHeader(HttpHeaderHelper.COOKIE, s);
+                        entity.getRequest().headers().add(HttpHeaderHelper.COOKIE, s);
                     }
                 } else if (!"Content-Length".equalsIgnoreCase(header.getKey())) {
                     StringBuilder b = new StringBuilder();
@@ -366,10 +380,10 @@ public class NettyHttpConduit extends UR
                             b.append(',');
                         }
                     }
-                    entity.getRequest().setHeader(header.getKey(), b.toString());
+                    entity.getRequest().headers().set(header.getKey(), b.toString());
                 }
-                if (!entity.getRequest().containsHeader("User-Agent")) {
-                    entity.getRequest().setHeader("User-Agent", Version.getCompleteVersionString());
+                if (!entity.getRequest().headers().contains("User-Agent")) {
+                    entity.getRequest().headers().set("User-Agent", Version.getCompleteVersionString());
                 }
             }
         }
@@ -377,18 +391,19 @@ public class NettyHttpConduit extends UR
         @Override
         protected void setFixedLengthStreamingMode(int i) {
             // Here we can set the Content-Length
-            entity.getRequest().setHeader("Content-Length", i);
-            entity.getRequest().setChunked(false);
+            entity.getRequest().headers().set("Content-Length", i);
+            // TODO we need to deal with the Chunked information ourself
+            //entity.getRequest().setChunked(false);
         }
 
         @Override
         protected int getResponseCode() throws IOException {
-            return getHttpResponse().getStatus().getCode();
+            return getHttpResponse().getStatus().code();
         }
 
         @Override
         protected String getResponseMessage() throws IOException {
-            return getHttpResponse().getStatus().getReasonPhrase();
+            return getHttpResponse().getStatus().reasonPhrase();
         }
 
         @Override
@@ -399,13 +414,13 @@ public class NettyHttpConduit extends UR
         }
 
         private String readHeaders(Headers h) throws IOException {
-            Set<String> headerNames = getHttpResponse().getHeaderNames();
+            Set<String> headerNames = getHttpResponse().headers().names();
             String ct = null;
             for (String name : headerNames) {
-                List<String> s = getHttpResponse().getHeaders(name);
+                List<String> s = getHttpResponse().headers().getAll(name);
                 h.headerMap().put(name, s);
                 if (Message.CONTENT_TYPE.equalsIgnoreCase(name)) {
-                    ct = getHttpResponse().getHeader(name);
+                    ct = getHttpResponse().headers().get(name);
                 }
             }
             return ct;
@@ -419,7 +434,7 @@ public class NettyHttpConduit extends UR
         @Override
         protected void closeInputStream() throws IOException {
             //We just clear the buffer
-            getHttpResponse().getContent().clear();
+            getHttpResponseContent().content().clear();
         }
 
         @Override
@@ -430,7 +445,7 @@ public class NettyHttpConduit extends UR
 
         @Override
         protected InputStream getInputStream() throws IOException {
-            return new ChannelBufferInputStream(getHttpResponse().getContent());
+            return new ByteBufInputStream(getHttpResponseContent().content());
         }
 
         @Override
@@ -440,14 +455,14 @@ public class NettyHttpConduit extends UR
             if (responseCode == HttpURLConnection.HTTP_ACCEPTED
                     || responseCode == HttpURLConnection.HTTP_OK) {
 
-                String head = httpResponse.getHeader(HttpHeaderHelper.CONTENT_LENGTH);
+                String head = httpResponse.headers().get(HttpHeaderHelper.CONTENT_LENGTH);
                 int cli = 0;
                 if (head != null) {
                     cli = Integer.parseInt(head);
                 }
-                head = httpResponse.getHeader(HttpHeaderHelper.TRANSFER_ENCODING);
+                head = httpResponse.headers().get(HttpHeaderHelper.TRANSFER_ENCODING);
                 boolean isChunked = head != null &&  HttpHeaderHelper.CHUNKED.equalsIgnoreCase(head);
-                head = httpResponse.getHeader(HttpHeaderHelper.CONNECTION);
+                head = httpResponse.headers().get(HttpHeaderHelper.CONNECTION);
                 boolean isEofTerminated = head != null &&  HttpHeaderHelper.CLOSE.equalsIgnoreCase(head);
                 if (cli > 0) {
                     in = getInputStream();
@@ -486,7 +501,7 @@ public class NettyHttpConduit extends UR
                 entity = outMessage.get(NettyHttpClientRequest.class);
                 //reset the buffers
                 outBuffer.clear();
-                outputStream = new ChannelBufferOutputStream(outBuffer);
+                outputStream = new ByteBufOutputStream(outBuffer);
 
             } catch (URISyntaxException e) {
                 throw new IOException(e);
@@ -511,7 +526,8 @@ public class NettyHttpConduit extends UR
 
         @Override
         public void thresholdReached() throws IOException {
-            entity.getRequest().setChunked(true);
+            //TODO need to support the chunked version
+            //entity.getRequest().setChunked(true);
         }
 
         protected synchronized void setHttpResponse(HttpResponse r) {
@@ -548,5 +564,24 @@ public class NettyHttpConduit extends UR
         }
     }
 
+    @Override
+    public void initComplete() {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public void postShutdown() {
+        // shutdown the conduit
+        this.close();
+        
+    }
+
+    @Override
+    public void preShutdown() {
+        // TODO Auto-generated method stub
+        
+    }
+
 
 }

Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitFactory.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitFactory.java
(original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitFactory.java
Thu Aug 15 14:32:02 2013
@@ -66,7 +66,8 @@ public class NettyHttpConduitFactory imp
 
     @Override
     public void postShutdown() {
-        // TODO Do we need to keep the track of the NettyHttpConduit?
+        // shutdown the group
+        
     }
 
     public boolean isShutdown() {

Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/NettyClientTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/NettyClientTest.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/NettyClientTest.java
(original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/NettyClientTest.java
Thu Aug 15 14:32:02 2013
@@ -86,7 +86,6 @@ public class NettyClientTest extends Abs
         JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
         factory.setServiceClass(Greeter.class);
         factory.setAddress(address);
-        //factory.setTransportId("http://cxf.apache.org/transports/http/netty/client");
         Greeter greeter = factory.create(Greeter.class);
         String response = greeter.greetMe("test");
         assertEquals("Get a wrong response", "Hello test", response);

Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/SSLNettyClientTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/SSLNettyClientTest.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/SSLNettyClientTest.java
(original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/SSLNettyClientTest.java
Thu Aug 15 14:32:02 2013
@@ -26,12 +26,15 @@ import java.security.GeneralSecurityExce
 import java.security.KeyStore;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.ExecutionException;
 
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
+import javax.xml.ws.AsyncHandler;
 import javax.xml.ws.Endpoint;
+import javax.xml.ws.Response;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
@@ -41,6 +44,8 @@ import org.apache.cxf.testutil.common.Ab
 import org.apache.cxf.transport.http.netty.client.NettyHttpConduit;
 import org.apache.hello_world_soap_http.Greeter;
 import org.apache.hello_world_soap_http.SOAPService;
+import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
+import org.apache.hello_world_soap_http.types.GreetMeResponse;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -95,6 +100,25 @@ public class SSLNettyClientTest extends 
         setAddress(g, address);
         String response = g.greetMe("test");
         assertEquals("Get a wrong response", "Hello test", response);
+        
+        GreetMeResponse resp = (GreetMeResponse)g.greetMeAsync("asyncTest", new AsyncHandler<GreetMeResponse>()
{
+            public void handleResponse(Response<GreetMeResponse> res) {
+                try {
+                    res.get().getResponseType();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } catch (ExecutionException e) {
+                    e.printStackTrace();
+                }
+            }
+        }).get();
+        assertEquals("Hello asyncTest", resp.getResponseType());
+
+        MyLaterResponseHandler handler = new MyLaterResponseHandler();
+        g.greetMeLaterAsync(1000, handler).get();
+        // need to check the result here
+        assertEquals("Hello, finally!", handler.getResponse().getResponseType());
+
     }
     
     private static void setupTLS(Greeter port)
@@ -138,5 +162,24 @@ public class SSLNettyClientTest extends 
         return fac.getKeyManagers();
     }
     
+    private class MyLaterResponseHandler implements AsyncHandler<GreetMeLaterResponse>
{
+        GreetMeLaterResponse response;
+        @Override
+        public void handleResponse(Response<GreetMeLaterResponse> res) {
+            try {
+                response = res.get();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            } catch (ExecutionException e) {
+                e.printStackTrace();
+            }
+        }
+
+        GreetMeLaterResponse getResponse() {
+            return response;
+        }
+
+    }
+    
 
 }



Mime
View raw message