cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [2/2] git commit: [CXF-5715] Add a conduit support in WebSocket transport
Date Mon, 28 Apr 2014 09:36:19 GMT
[CXF-5715] Add a conduit support in WebSocket transport


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0b4fe87b
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0b4fe87b
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0b4fe87b

Branch: refs/heads/master
Commit: 0b4fe87bb5918f3aa3a99834d1de0a5146bcbb36
Parents: b798331
Author: Akitoshi Yoshida <ay@apache.org>
Authored: Mon Apr 28 11:34:00 2014 +0200
Committer: Akitoshi Yoshida <ay@apache.org>
Committed: Mon Apr 28 11:35:46 2014 +0200

----------------------------------------------------------------------
 rt/transports/websocket/pom.xml                 |  18 +
 .../websocket/WebSocketTransportFactory.java    |  18 +-
 .../cxf/transport/websocket/WebSocketUtils.java |  54 ++-
 .../websocket/ahc/AhcWebSocketConduit.java      | 426 +++++++++++++++++++
 .../websocket/ahc/AhcWebSocketRequest.java      |  52 +++
 .../JAXRSClientConduitWebSocketTest.java        |  83 ++++
 6 files changed, 646 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/0b4fe87b/rt/transports/websocket/pom.xml
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/pom.xml b/rt/transports/websocket/pom.xml
index e2d4be0..7e10d33 100644
--- a/rt/transports/websocket/pom.xml
+++ b/rt/transports/websocket/pom.xml
@@ -133,6 +133,24 @@
             <artifactId>${cxf.servlet-api.artifact}</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.ning</groupId>
+            <artifactId>async-http-client</artifactId>
+            <version>${cxf.ahc.version}</version>
+            <optional>true</optional>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+            <version>${cxf.netty3.version}</version>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/cxf/blob/0b4fe87b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketTransportFactory.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketTransportFactory.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketTransportFactory.java
index c0605ff..96a5587 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketTransportFactory.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketTransportFactory.java
@@ -37,7 +37,10 @@ import org.apache.cxf.transport.DestinationFactory;
 import org.apache.cxf.transport.http.AbstractHTTPDestination;
 import org.apache.cxf.transport.http.DestinationRegistry;
 import org.apache.cxf.transport.http.DestinationRegistryImpl;
+import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transport.http.HTTPConduitConfigurer;
 import org.apache.cxf.transport.http.HttpDestinationFactory;
+import org.apache.cxf.transport.websocket.ahc.AhcWebSocketConduit;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
 @NoJSR250Annotations
@@ -105,8 +108,19 @@ public class WebSocketTransportFactory extends AbstractTransportFactory
implemen
      */
     public Conduit getConduit(EndpointInfo endpointInfo, EndpointReferenceType target, Bus
bus)
         throws IOException {
-        //TODO add on implementation using ning
-        throw new RuntimeException("not implemented yet");
+        HTTPConduit conduit = new AhcWebSocketConduit(bus, endpointInfo, target);
+
+        String address = conduit.getAddress();
+        if (address != null && address.indexOf('?') != -1) {
+            address = address.substring(0, address.indexOf('?'));
+        }
+        HTTPConduitConfigurer c1 = bus.getExtension(HTTPConduitConfigurer.class);
+        if (c1 != null) {
+            c1.configure(conduit.getBeanName(), address, conduit);
+        }
+        configure(bus, conduit, conduit.getBeanName(), address);
+        conduit.finalizeConfig();
+        return conduit;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cxf/blob/0b4fe87b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
index 188caf5..0e2c5ee 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
@@ -113,6 +113,15 @@ public final class WebSocketUtils {
         return buffer.toString();
     }
 
+    /**
+     * Build response bytes with the status and type information specified in the headers.
+     *
+     * @param headers
+     * @param data
+     * @param offset
+     * @param length
+     * @return
+     */
     public static byte[] buildResponse(Map<String, String> headers, byte[] data, int
offset, int length) {
         StringBuilder sb = new StringBuilder();
         String v = headers.get(SC_KEY);
@@ -123,15 +132,24 @@ public final class WebSocketUtils {
         }
         sb.append(CRLF);
         
-        byte[] hb = sb.toString().getBytes();
-        byte[] longdata = new byte[hb.length + length];
-        System.arraycopy(hb, 0, longdata, 0, hb.length);
+        byte[] longdata = sb.toString().getBytes();
         if (data != null && length > 0) {
+            final byte[] hb = longdata;
+            longdata = new byte[hb.length + length];
+            System.arraycopy(hb, 0, longdata, 0, hb.length);
             System.arraycopy(data, offset, longdata, hb.length, length);
         }
         return longdata;
     }
 
+    /**
+     * Build response bytes without status and type information.
+     *
+     * @param data
+     * @param offset
+     * @param length
+     * @return
+     */
     public static byte[] buildResponse(byte[] data, int offset, int length) {
         byte[] longdata = new byte[length + 2];
         longdata[0] = 0x0d;
@@ -140,5 +158,35 @@ public final class WebSocketUtils {
         return longdata;
     }
     
+    /**
+     * Build request bytes with the specified method, url, headers, and content entity.
+     * 
+     * @param method
+     * @param url
+     * @param headers
+     * @param data
+     * @param offset
+     * @param length
+     * @return
+     */
+    public static byte[] buildRequest(String method, String url, Map<String, String>
headers,
+                                      byte[] data, int offset, int length) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(method).append(' ').append(url).append(CRLF);
+        String v = headers.get("Content-Type");
+        if (v != null) {
+            sb.append("Content-Type: ").append(v).append(CRLF);
+        }
+        sb.append(CRLF);
+
+        byte[] longdata = sb.toString().getBytes();
+        if (data != null && length > 0) {
+            final byte[] hb = longdata;
+            longdata = new byte[hb.length + length];
+            System.arraycopy(hb, 0, longdata, 0, hb.length);
+            System.arraycopy(data, offset, longdata, hb.length, length);
+        }
+        return longdata;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/0b4fe87b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
new file mode 100644
index 0000000..5c6ebad
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.ahc;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.websocket.WebSocket;
+import com.ning.http.client.websocket.WebSocketByteListener;
+import com.ning.http.client.websocket.WebSocketTextListener;
+import com.ning.http.client.websocket.WebSocketUpgradeHandler;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.Headers;
+import org.apache.cxf.transport.http.URLConnectionHTTPConduit;
+import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
+import org.apache.cxf.transport.websocket.WebSocketUtils;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+
+/**
+ * 
+ */
+public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
+    private static final Logger LOG = LogUtils.getL7dLogger(AhcWebSocketConduit.class);
+
+    private AsyncHttpClient ahcclient;
+    private WebSocket websocket;
+
+    //FIXME use a ref-id based request and response association instead of using this sequential
queue
+    private BlockingQueue<Response> responseQueue = new ArrayBlockingQueue<Response>(4096);
+
+    public AhcWebSocketConduit(Bus b, EndpointInfo ei, EndpointReferenceType t) throws IOException
{
+        super(b, ei, t);
+        ahcclient = new AsyncHttpClient();
+    }
+
+    @Override
+    protected void setupConnection(Message message, URI currentURL, HTTPClientPolicy csPolicy)
+        throws IOException {
+
+        String s = currentURL.getScheme();
+        if (!"ws".equals(s) && !"wss".equals(s)) {
+            throw new MalformedURLException("unknown protocol: " + s);
+        }
+        message.put("http.scheme", currentURL.getScheme());
+        String httpRequestMethod =
+                (String)message.get(Message.HTTP_REQUEST_METHOD);
+        if (httpRequestMethod == null) {
+            httpRequestMethod = "POST";
+            message.put(Message.HTTP_REQUEST_METHOD, httpRequestMethod);
+        }
+
+        final AhcWebSocketRequest request = new AhcWebSocketRequest(currentURL, httpRequestMethod);
+        message.put(AhcWebSocketRequest.class, request);
+    }
+
+    @Override
+    protected OutputStream createOutputStream(Message message, boolean needToCacheRequest,
+                                              boolean isChunking, int chunkThreshold) throws
IOException {
+        AhcWebSocketRequest entity = message.get(AhcWebSocketRequest.class);
+        AhcWebSocketWrappedOutputStream out =
+            new AhcWebSocketWrappedOutputStream(message, needToCacheRequest, isChunking,
chunkThreshold,
+                                                getConduitName(), entity.getUri());
+        return out;
+    }
+
+    public class AhcWebSocketWrappedOutputStream extends WrappedOutputStream {
+        private Request request;
+        private Response response;
+
+        protected AhcWebSocketWrappedOutputStream(Message message, boolean possibleRetransmit,
+                                                  boolean isChunking, int chunkThreshold,
String conduitName, URI url) {
+            super(message, possibleRetransmit, isChunking, chunkThreshold, conduitName, url);
+
+            //REVISIT how we prepare the request
+            this.request = new Request();
+            request.setMethod((String)outMessage.get(Message.HTTP_REQUEST_METHOD));
+            request.setPath(url.getPath() + (String)outMessage.getContextualProperty("org.apache.cxf.request.uri"));
+        }
+
+        @Override
+        protected void setupWrappedStream() throws IOException {
+            connect();
+
+            wrappedStream = new OutputStream() {
+
+                @Override
+                public void write(byte b[], int off, int len) throws IOException {
+                    //REVISIT support multiple writes and flush() to write the entire block
data?
+                    websocket.sendMessage(WebSocketUtils.buildRequest(
+                        request.getMethod(), request.getPath(),
+                        Collections.<String, String>singletonMap("Content-Type", request.getContentType()),
+                        b, off, len));
+                }
+
+                @Override
+                public void write(int b) throws IOException {
+                    //REVISIT support this single byte write and use flush() to write the
block data?
+                }
+
+                @Override
+                public void close() throws IOException {
+                }
+            };
+        }
+
+        @Override
+        protected void handleNoOutput() throws IOException {
+            connect();
+            websocket.sendMessage(WebSocketUtils.buildRequest(
+                request.getMethod(), request.getPath(),
+                Collections.<String, String>emptyMap(),
+                null, 0, 0));
+        }
+
+        @Override
+        protected HttpsURLConnectionInfo getHttpsURLConnectionInfo() throws IOException {
+            return null;
+        }
+
+        @Override
+        protected void setProtocolHeaders() throws IOException {
+            Headers h = new Headers(outMessage);
+            request.setContentType(h.determineContentType());
+
+            //REVISIT may provide an option to add other headers
+//          boolean addHeaders = MessageUtils.isTrue(outMessage.getContextualProperty(Headers.ADD_HEADERS_PROPERTY));
+        }
+
+        @Override
+        protected void setFixedLengthStreamingMode(int i) {
+            // ignore
+        }
+
+        @Override
+        protected int getResponseCode() throws IOException {
+            Response r = getResponse();
+            return r.getStatusCode();
+        }
+
+        @Override
+        protected String getResponseMessage() throws IOException {
+            //TODO return a generic message based on the status code
+            return null;
+        }
+
+        @Override
+        protected void updateResponseHeaders(Message inMessage) throws IOException {
+            Headers h = new Headers(inMessage);
+            String ct = getResponse().getContentType();
+            inMessage.put(Message.CONTENT_TYPE, ct);
+
+            //REVISIT if we are allowing more headers, we need to add them into the cxf's
headers
+            h.headerMap().put(Message.CONTENT_TYPE, Collections.singletonList(ct));
+        }
+
+        @Override
+        protected void handleResponseAsync() throws IOException {
+            // TODO Auto-generated method stub
+        }
+
+        @Override
+        protected void closeInputStream() throws IOException {
+        }
+
+        @Override
+        protected boolean usingProxy() {
+            // TODO add proxy support ...
+            return false;
+        }
+
+        @Override
+        protected InputStream getInputStream() throws IOException {
+            Response r = getResponse();
+            //REVISIT
+            return new java.io.ByteArrayInputStream(r.getTextEntity().getBytes());
+        }
+
+        @Override
+        protected InputStream getPartialResponse() throws IOException {
+            Response r = getResponse();
+            //REVISIT
+            return new java.io.ByteArrayInputStream(r.getTextEntity().getBytes());
+        }
+
+        @Override
+        protected void setupNewConnection(String newURL) throws IOException {
+            // TODO
+            throw new IOException("not supported");
+        }
+
+        @Override
+        protected void retransmitStream() throws IOException {
+            // TODO
+            throw new IOException("not supported");
+        }
+
+        @Override
+        protected void updateCookiesBeforeRetransmit() throws IOException {
+            // ignore for now and may consider a specific websocket binding variant to use
cookies
+        }
+
+        @Override
+        public void thresholdReached() throws IOException {
+            // ignore
+        }
+
+        //
+        // other methods follow
+        //
+
+        protected void connect() {
+            LOG.log(Level.INFO, "connecting");
+            if (websocket == null) {
+                try {
+                    websocket = ahcclient.prepareGet(url.toASCIIString()).execute(
+                            new WebSocketUpgradeHandler.Builder()
+                            .addWebSocketListener(new AhcWebSocketListener()).build()).get();
+                    LOG.log(Level.INFO, "connected");
+                } catch (Exception e) {
+                    LOG.log(Level.SEVERE, "unable to connect", e);
+                }
+            } else {
+                LOG.log(Level.INFO, "already connected");
+            }
+        }
+
+        synchronized Response getResponse() throws IOException {
+            if (response == null) {
+                //TODO add a configurable timeout
+                try {
+                    response = responseQueue.take();
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            if (response == null) {
+                throw new IOException("timeout");
+            }
+            return response;
+        }
+    }
+
+    protected class AhcWebSocketListener implements WebSocketTextListener, WebSocketByteListener
{
+
+        public void onOpen(WebSocket ws) {
+            if (LOG.isLoggable(Level.INFO)) {
+                LOG.log(Level.INFO, "onOpen({0})", ws);
+            }
+        }
+
+        public void onClose(WebSocket ws) {
+            if (LOG.isLoggable(Level.INFO)) {
+                LOG.log(Level.INFO, "onCose({0})", ws);
+            }
+        }
+
+        public void onError(Throwable t) {
+            LOG.log(Level.SEVERE, "[ws] onError", t);
+        }
+
+        public void onMessage(byte[] message) {
+            if (LOG.isLoggable(Level.INFO)) {
+                LOG.log(Level.INFO, "onMessage({0})", message);
+            }
+
+            responseQueue.add(new Response(message));
+        }
+
+        public void onFragment(byte[] fragment, boolean last) {
+            //TODO
+            LOG.log(Level.WARNING, "NOT IMPLEMENTED onFragment({0}, {1})", new Object[]{fragment,
last});
+        }
+
+        public void onMessage(String message) {
+            if (LOG.isLoggable(Level.INFO)) {
+                LOG.log(Level.INFO, "onMessage({0})", message);
+            }
+
+            responseQueue.add(new Response(message));
+        }
+
+        public void onFragment(String fragment, boolean last) {
+            //TODO
+            LOG.log(Level.WARNING, "NOT IMPLEMENTED onFragment({0}, {1})", new Object[]{fragment,
last});
+        }
+    }
+
+    // Request and Response are used to represent request and response messages trasfered
over the websocket
+    //REVIST move these classes to be used in other places
+    static class Request {
+        private String method;
+        private String url;
+        private String contentType;
+
+        public String getMethod() {
+            return method;
+        }
+        public void setMethod(String method) {
+            this.method = method;
+        }
+
+        public String getPath() {
+            return url;
+        }
+        public void setPath(String path) {
+            this.url = path;
+        }
+
+        public String getContentType() {
+            return contentType;
+        }
+        public void setContentType(String contentType) {
+            this.contentType = contentType;
+        }
+    }
+
+    static class Response {
+        private Object data;
+        private int pos;
+        private int statusCode;
+        private String contentType;
+        private Object entity;
+
+        public Response(Object data) {
+            this.data = data;
+            String line = readLine();
+            if (line != null) {
+                statusCode = Integer.parseInt(line);
+                while ((line = readLine()) != null) {
+                    if (line.length() > 0) {
+                        int del = line.indexOf(':');
+                        String h = line.substring(0, del).trim();
+                        String v = line.substring(del + 1).trim();
+                        if ("Content-Type".equalsIgnoreCase(h)) {
+                            contentType = v;
+                        }
+                    }
+                }
+            }
+            if (data instanceof String) {
+                entity = ((String)data).substring(pos);
+            } else if (data instanceof byte[]) {
+                entity = new byte[((byte[])data).length - pos];
+                System.arraycopy((byte[])data, pos, (byte[])entity, 0, ((byte[])entity).length);
+            }
+        }
+
+        public int getStatusCode() {
+            return statusCode;
+        }
+
+        public String getContentType() {
+            return contentType;
+        }
+
+        public Object getEntity() {
+            return entity;
+        }
+
+        public String getTextEntity() {
+            return gettext(entity);
+        }
+
+        private String readLine() {
+            StringBuilder sb = new StringBuilder();
+            while (pos < length(data)) {
+                int c = getchar(data, pos++);
+                if (c == '\n') {
+                    break;
+                } else if (c == '\r') {
+                    continue;
+                } else {
+                    sb.append((char)c);
+                }
+            }
+            if (sb.length() == 0) {
+                return null;
+            }
+            return sb.toString();
+        }
+
+        private int length(Object o) {
+            return o instanceof char[] ? ((String)o).length() : (o instanceof byte[] ? ((byte[])o).length
: 0);
+        }
+
+        private int getchar(Object o, int p) {
+            return 0xff & (o instanceof String ? ((String)o).charAt(p) : (o instanceof
byte[] ? ((byte[])o)[p] : -1));
+        }
+
+        private String gettext(Object o) {
+            return o instanceof String ? (String)o : (o instanceof byte[] ? new String((byte[])o)
: null);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/0b4fe87b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketRequest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketRequest.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketRequest.java
new file mode 100644
index 0000000..520ff04
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketRequest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.ahc;
+
+import java.net.URI;
+
+/**
+ * TODO consolidate this class with the local Request class of AhcWebSocketConduit
+ */
+class AhcWebSocketRequest {
+    private URI uri;
+    private String method;
+    
+    public AhcWebSocketRequest(URI uri, String method) {
+        this.uri = uri;
+        this.method = method;
+    }
+
+    public URI getUri() {
+        return uri;
+    }
+
+    public void setUri(URI uri) {
+        this.uri = uri;
+    }
+
+    public String getMethod() {
+        return method;
+    }
+
+    public void setMethod(String method) {
+        this.method = method;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/0b4fe87b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientConduitWebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientConduitWebSocketTest.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientConduitWebSocketTest.java
new file mode 100644
index 0000000..2552b70
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientConduitWebSocketTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.websocket;
+
+import javax.ws.rs.core.HttpHeaders;
+
+import org.apache.cxf.jaxrs.client.Client;
+import org.apache.cxf.jaxrs.client.JAXRSClientFactory;
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.systest.jaxrs.Book;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class JAXRSClientConduitWebSocketTest extends AbstractBusClientServerTestBase {
+    private static final String PORT = BookServerWebSocket.PORT;
+    
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly", launchServer(new BookServerWebSocket()));
+        createStaticBus();
+    }
+    
+    @Test
+    public void testBookWithWebSocket() throws Exception {
+        String address = "ws://localhost:" + getPort() + "/websocket";
+
+        BookStoreWebSocket resource = JAXRSClientFactory.create(address, BookStoreWebSocket.class);
+        Client client = WebClient.client(resource);
+        client.header(HttpHeaders.USER_AGENT, JAXRSClientConduitWebSocketTest.class.getName());
+        
+        // call the GET service
+        assertEquals("CXF in Action", new String(resource.getBookName()));
+
+        // call the GET service in text mode
+        //TODO add some way to control the client to switch between the bytes and text modes
+        assertEquals("CXF in Action", new String(resource.getBookName()));
+
+        // call another GET service
+        Book book = resource.getBook(123);
+        assertEquals("CXF in Action", book.getName());
+
+        // call the POST service
+        assertEquals(Long.valueOf(123), resource.echoBookId(123));
+        
+        // call the same POST service in the text mode
+        //TODO add some way to control the client to switch between the bytes and text modes
+        assertEquals(Long.valueOf(123), resource.echoBookId(123));
+
+        // call the GET service returning a continous stream output
+        //TODO there is no way to get the continuous stream at the moment
+        //resource.getBookBought();
+
+    }
+    
+    protected String getPort() {
+        return PORT;
+    }
+
+}


Mime
View raw message