cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [1/2] git commit: [CXF-5979] Allow some headers to be returned in WebSocket's streaming responses
Date Fri, 29 Aug 2014 13:43:25 GMT
Repository: cxf
Updated Branches:
  refs/heads/master c56913cd0 -> fa8a61427


[CXF-5979] Allow some headers to be returned in WebSocket's streaming responses


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/fa8a6142
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/fa8a6142
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/fa8a6142

Branch: refs/heads/master
Commit: fa8a614278cb23f33f226bc31735609e03d7c748
Parents: 7e318c4
Author: Akitoshi Yoshida <ay@apache.org>
Authored: Fri Aug 29 15:41:53 2014 +0200
Committer: Akitoshi Yoshida <ay@apache.org>
Committed: Fri Aug 29 15:42:46 2014 +0200

----------------------------------------------------------------------
 .../release/samples/jax_rs/websocket/README.txt |  14 +-
 .../demo/jaxrs/client/WebSocketTestClient.java  |  34 +++--
 .../java/demo/jaxrs/server/CustomerService.java |  19 ++-
 .../cxf/transport/websocket/WebSocketUtils.java |  93 +++++++++---
 .../WebSocketVirtualServletResponse.java        |   5 +-
 .../websocket/ahc/AhcWebSocketConduit.java      |  35 +++--
 .../transport/websocket/WebSocketUtilsTest.java |  82 +++++++++++
 .../jaxrs/websocket/BookStoreWebSocket.java     |  44 +++++-
 .../JAXRSClientServerWebSocketTest.java         | 144 +++++++++++++++++++
 .../jaxrs/websocket/WebSocketTestClient.java    |  37 +++--
 10 files changed, 439 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/fa8a6142/distribution/src/main/release/samples/jax_rs/websocket/README.txt
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/websocket/README.txt b/distribution/src/main/release/samples/jax_rs/websocket/README.txt
index 19d8ecb..09c43e8 100644
--- a/distribution/src/main/release/samples/jax_rs/websocket/README.txt
+++ b/distribution/src/main/release/samples/jax_rs/websocket/README.txt
@@ -75,14 +75,24 @@ with the data:
 updates the customer instance whose id is 123
 
 
-A GET request to path /monitor
+A GET request to path /monitor with id monitor-12345
 
 ------------------------------------------------------------------------
 GET /customerservice/monitor
+requestId: monitor-12345
 ------------------------------------------------------------------------
 
 returns a continuous event stream on the customer
-activities.
+activities. Try invoking some customer related operations.
+
+A GET request to path /unmonitor with id monitor-12345
+
+------------------------------------------------------------------------
+GET /customerservice/unmonitor/monitor-12345
+------------------------------------------------------------------------
+
+unregisters the event stream and returns its status.
+
 
 The client code demonstrates how to send GET/POST/PUT/DELETE requests over
 a websocket.

http://git-wip-us.apache.org/repos/asf/cxf/blob/fa8a6142/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/client/WebSocketTestClient.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/client/WebSocketTestClient.java
b/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/client/WebSocketTestClient.java
index 5db967c..ba5a7e8 100644
--- a/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/client/WebSocketTestClient.java
+++ b/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/client/WebSocketTestClient.java
@@ -216,18 +216,21 @@ class WebSocketTestClient {
         
         public Response(Object data) {
             this.data = data;
-            String line = readLine();
-            if (line != null) {
-                statusCode = Integer.parseInt(line);
-                while ((line = readLine()) != null) {
-                    if (line.length() > 0) {
-                        int del = line.indexOf(':');
-                        String h = line.substring(0, del).trim();
-                        String v = line.substring(del + 1).trim();
-                        if ("Content-Type".equalsIgnoreCase(h)) {
-                            contentType = v;
-                        }
-                    }
+            String line;
+            boolean first = true;
+            while ((line = readLine()) != null) {
+                if (first && isStatusCode(line)) {
+                    statusCode = Integer.parseInt(line);
+                    continue;
+                } else {
+                    first = false;
+                }
+
+                int del = line.indexOf(':');
+                String h = line.substring(0, del).trim();
+                String v = line.substring(del + 1).trim();
+                if ("Content-Type".equalsIgnoreCase(h)) {
+                    contentType = v;
                 }
             }
             if (data instanceof String) {
@@ -237,8 +240,11 @@ class WebSocketTestClient {
                 System.arraycopy((byte[])data, pos, (byte[])entity, 0, ((byte[])entity).length);
             }
         }
-                
-            
+        
+        private static boolean isStatusCode(String line) {
+            char c = line.charAt(0);
+            return '0' <= c && c <= '9';
+        }
         
         public int getStatusCode() {
             return statusCode;

http://git-wip-us.apache.org/repos/asf/cxf/blob/fa8a6142/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java
b/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java
index 9ba3e05..2927d24 100644
--- a/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java
+++ b/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
 
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
@@ -39,6 +40,8 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
 
+import org.apache.cxf.transport.websocket.WebSocketConstants;
+
 @Path("/customerservice/")
 @Produces("text/xml")
 public class CustomerService {
@@ -47,7 +50,7 @@ public class CustomerService {
     long currentId = 123;
     Map<Long, Customer> customers = new HashMap<Long, Customer>();
     Map<Long, Order> orders = new HashMap<Long, Order>();
-    Set<OutputStream> monitors = new HashSet<OutputStream>();
+    Map<String, OutputStream> monitors = new HashMap<String, OutputStream>();
     
     public CustomerService() {
         init();
@@ -123,21 +126,29 @@ public class CustomerService {
     @GET
     @Path("/monitor")
     @Produces("text/*")
-    public StreamingOutput getBookBought() {
+    public StreamingOutput monitorCustomers(@HeaderParam(WebSocketConstants.DEFAULT_REQUEST_ID_KEY)
String reqid) {
+        final String key = reqid == null ? "*" : reqid; 
         return new StreamingOutput() {
             public void write(final OutputStream out) throws IOException, WebApplicationException
{
-                monitors.add(out);
+                monitors.put(key, out);
                 out.write(("Subscribed at " + new java.util.Date()).getBytes());
             }
         };
     }
 
+    @GET
+    @Path("/unmonitor/{key}")
+    @Produces("text/*")
+    public String unmonitorCustomers(@PathParam("key") String key) {
+        return (monitors.remove(key) != null ? "Removed: " : "Already removed: ") + key;

+    }
+
     private void sendCustomerEvent(final String msg, final Customer customer) {
         executor.execute(new Runnable() {
             public void run() {
                 try {
                     String t = msg + ": " + customer.getId() + "/" + customer.getName();
-                    for (Iterator<OutputStream> it = monitors.iterator(); it.hasNext();)
{
+                    for (Iterator<OutputStream> it = monitors.values().iterator();
it.hasNext();) {
                         OutputStream out = it.next();
                         try {
                             out.write(t.getBytes());

http://git-wip-us.apache.org/repos/asf/cxf/blob/fa8a6142/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
index 7bb27d2..a55639c 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.cxf.transport.websocket;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
@@ -34,8 +35,8 @@ public final class WebSocketUtils {
     static final String SC_KEY = "$sc";
     static final String SM_KEY = "$sm";
     static final String FLUSHED_KEY = "$flushed";
-    private static final String CRLF = "\r\n";
-    private static final String COLSP = ": ";
+    private static final byte[] CRLF = "\r\n".getBytes();
+    private static final byte[] COLSP = ": ".getBytes();
     private static final String DEFAULT_SC = "200";
 
     private WebSocketUtils() {
@@ -125,38 +126,62 @@ public final class WebSocketUtils {
      * @return
      */
     public static byte[] buildResponse(Map<String, String> headers, byte[] data, int
offset, int length) {
-        StringBuilder sb = new StringBuilder();
+        ByteArrayBuilder sb = new ByteArrayBuilder();
         String v = headers.get(SC_KEY);
         sb.append(v == null ? DEFAULT_SC : v).append(CRLF);
         appendHeaders(headers, sb);
-        sb.append(CRLF);
         
-        byte[] longdata = sb.toString().getBytes();
+        byte[] longdata = sb.toByteArray();
         if (data != null && length > 0) {
-            final byte[] hb = longdata;
-            longdata = new byte[hb.length + length];
-            System.arraycopy(hb, 0, longdata, 0, hb.length);
-            System.arraycopy(data, offset, longdata, hb.length, length);
+            longdata = buildResponse(longdata, data, offset, length);
+        }
+        return longdata;
+    }
+
+    /**
+     * Build response bytes with some generated headers.
+     *
+     * @param headers
+     * @param data
+     * @param offset
+     * @param length
+     * @return
+     */
+    public static byte[] buildResponse(byte[] headers, byte[] data, int offset, int length)
{
+        final int hlen = headers != null ? headers.length : 0;
+        byte[] longdata = new byte[length + 2 + hlen];
+
+        if (hlen > 0) {
+            System.arraycopy(headers, 0, longdata, 0, hlen);
         }
+        longdata[hlen] = 0x0d;
+        longdata[hlen + 1] = 0x0a;
+        System.arraycopy(data, offset, longdata, hlen + 2, length);
         return longdata;
     }
 
     /**
      * Build response bytes without status and type information.
      *
+     * @param headers
      * @param data
      * @param offset
      * @param length
      * @return
      */
     public static byte[] buildResponse(byte[] data, int offset, int length) {
-        byte[] longdata = new byte[length + 2];
-        longdata[0] = 0x0d;
-        longdata[1] = 0x0a;
-        System.arraycopy(data, offset, longdata, 2, length);
-        return longdata;
+        return buildResponse((byte[])null, data, offset, length);
     }
     
+    static byte[] buildHeaderLine(String name, String value) {
+        byte[] hl = new byte[name.length() + COLSP.length + value.length() + CRLF.length];
+        System.arraycopy(name.getBytes(), 0, hl, 0, name.length());
+        System.arraycopy(COLSP, 0, hl, name.length(), COLSP.length);
+        System.arraycopy(value.getBytes(), 0, hl, name.length() + COLSP.length, value.length());
+        System.arraycopy(CRLF, 0, hl, name.length() + COLSP.length + value.length(), CRLF.length);
+        return hl;
+    }
+        
     /**
      * Build request bytes with the specified method, url, headers, and content entity.
      * 
@@ -170,12 +195,12 @@ public final class WebSocketUtils {
      */
     public static byte[] buildRequest(String method, String url, Map<String, String>
headers,
                                       byte[] data, int offset, int length) {
-        StringBuilder sb = new StringBuilder();
+        ByteArrayBuilder sb = new ByteArrayBuilder();
         sb.append(method).append(' ').append(url).append(CRLF);
         appendHeaders(headers, sb);
         sb.append(CRLF);
 
-        byte[] longdata = sb.toString().getBytes();
+        byte[] longdata = sb.toByteArray();
         if (data != null && length > 0) {
             final byte[] hb = longdata;
             longdata = new byte[hb.length + length];
@@ -185,11 +210,45 @@ public final class WebSocketUtils {
         return longdata;
     }
 
-    private static void appendHeaders(Map<String, String> headers, StringBuilder sb)
{
+    private static void appendHeaders(Map<String, String> headers, ByteArrayBuilder
sb) {
         for (Entry<String, String> header : headers.entrySet()) {
             if (!header.getKey().startsWith("$")) {
                 sb.append(header.getKey()).append(COLSP).append(header.getValue()).append(CRLF);
             }
         }
     }
+    
+    private static class ByteArrayBuilder {
+        private ByteArrayOutputStream baos;
+        public ByteArrayBuilder() {
+            baos = new ByteArrayOutputStream();
+        }
+        
+        public ByteArrayBuilder append(byte[] b) {
+            try {
+                baos.write(b);
+            } catch (IOException e) {
+                // ignore;
+            }
+            return this;
+        }
+        
+        public ByteArrayBuilder append(String s) {
+            try {
+                baos.write(s.getBytes());
+            } catch (IOException e) {
+                // ignore
+            }
+            return this;
+        }
+        
+        public ByteArrayBuilder append(char c) {
+            baos.write(c);
+            return this;
+        }
+        
+        public byte[] toByteArray() {
+            return baos.toByteArray();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/fa8a6142/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
index e9fccbf..a783712 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
@@ -343,7 +343,10 @@ public class WebSocketVirtualServletResponse implements HttpServletResponse
{
                     buffer.write(data, offset, length);
                 } else {
                     // unbuffered write to the socket
-                    data = WebSocketUtils.buildResponse(data, offset, length);
+                    String respid = responseHeaders.get(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY);
+                    byte[] headers = respid != null 
+                        ? WebSocketUtils.buildHeaderLine(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY,
respid) : null;
+                    data = WebSocketUtils.buildResponse(headers, data, offset, length);
                     webSocketHolder.write(data, 0, data.length);
                 }
             }

http://git-wip-us.apache.org/repos/asf/cxf/blob/fa8a6142/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
index 3bfb4ed..c243d9d 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
@@ -375,20 +375,22 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
 
         public Response(String idKey, Object data) {
             this.data = data;
-            String line = readLine();
-            if (line != null) {
-                statusCode = Integer.parseInt(line);
-                while ((line = readLine()) != null) {
-                    if (line.length() > 0) {
-                        int del = line.indexOf(':');
-                        String h = line.substring(0, del).trim();
-                        String v = line.substring(del + 1).trim();
-                        if ("Content-Type".equalsIgnoreCase(h)) {
-                            contentType = v;
-                        } else if (idKey.equals(h)) {
-                            id = v;
-                        }
-                    }
+            String line;
+            boolean first = true;
+            while ((line = readLine()) != null) {
+                if (first && isStatusCode(line)) {
+                    statusCode = Integer.parseInt(line);
+                    continue;
+                } else {
+                    first = false;
+                }
+                int del = line.indexOf(':');
+                String h = line.substring(0, del).trim();
+                String v = line.substring(del + 1).trim();
+                if ("Content-Type".equalsIgnoreCase(h)) {
+                    contentType = v;
+                } else if (WebSocketConstants.DEFAULT_RESPONSE_ID_KEY.equals(h)) {
+                    id = v;
                 }
             }
             if (data instanceof String) {
@@ -399,6 +401,11 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
             }
         }
 
+        private static boolean isStatusCode(String line) {
+            char c = line.charAt(0);
+            return '0' <= c && c <= '9';
+        }
+
         public int getStatusCode() {
             return statusCode;
         }

http://git-wip-us.apache.org/repos/asf/cxf/blob/fa8a6142/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/WebSocketUtilsTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/WebSocketUtilsTest.java
b/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/WebSocketUtilsTest.java
new file mode 100644
index 0000000..257be24
--- /dev/null
+++ b/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/WebSocketUtilsTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class WebSocketUtilsTest extends Assert {
+    private static final byte[] TEST_BODY_BYTES = "buenos dias".getBytes();
+    private static final byte[] TEST_HEADERS_BYTES = "200\r\nContent-Type: text/xml;charset=utf-8\r\n".getBytes();
+    private static final byte[] TEST_ID_BYTES = 
+        (WebSocketConstants.DEFAULT_RESPONSE_ID_KEY + ": 31415926-5358-9793-2384-626433832795\r\n").getBytes();
+    private static final Map<String, String> TEST_HEADERS_MAP;
+    private static final byte[] CRLF = "\r\n".getBytes();
+    
+    
+    static {
+        TEST_HEADERS_MAP = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+        TEST_HEADERS_MAP.put(WebSocketUtils.SC_KEY, "200");
+        TEST_HEADERS_MAP.put("Content-Type", "text/xml;charset=utf-8");
+        TEST_HEADERS_MAP.put(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, "31415926-5358-9793-2384-626433832795");
+    }
+    @Test
+    public void testBuildResponse() {
+        byte[] r = WebSocketUtils.buildResponse(TEST_BODY_BYTES, 0, TEST_BODY_BYTES.length);
+        verifyBytes(CRLF, 0, r, 0, 2);
+        verifyBytes(TEST_BODY_BYTES, 0, r, 2, TEST_BODY_BYTES.length);
+        assertEquals(2 + TEST_BODY_BYTES.length, r.length);
+        
+        r = WebSocketUtils.buildResponse(TEST_HEADERS_BYTES, TEST_BODY_BYTES, 0, TEST_BODY_BYTES.length);
+        verifyBytes(TEST_HEADERS_BYTES, 0, r, 0, TEST_HEADERS_BYTES.length);
+        verifyBytes(CRLF, 0, r, TEST_HEADERS_BYTES.length, 2);
+        verifyBytes(TEST_BODY_BYTES, 0, r, TEST_HEADERS_BYTES.length + 2, TEST_BODY_BYTES.length);
+        assertEquals(TEST_HEADERS_BYTES.length + 2 + TEST_BODY_BYTES.length, r.length);
+        
+        r = WebSocketUtils.buildResponse(TEST_HEADERS_MAP, TEST_BODY_BYTES, 0, TEST_BODY_BYTES.length);
+        verifyBytes(TEST_HEADERS_BYTES, 0, r, 0, TEST_HEADERS_BYTES.length);
+        verifyBytes(TEST_ID_BYTES, 0, r, TEST_HEADERS_BYTES.length, TEST_ID_BYTES.length);
+        verifyBytes(CRLF, 0, r, TEST_HEADERS_BYTES.length + TEST_ID_BYTES.length, 2);
+        verifyBytes(TEST_BODY_BYTES, 0, r, 
+                    TEST_HEADERS_BYTES.length + TEST_ID_BYTES.length + 2, TEST_BODY_BYTES.length);
+        assertEquals(TEST_HEADERS_BYTES.length + TEST_ID_BYTES.length + 2 + TEST_BODY_BYTES.length,
r.length);
+        
+        // with some offset
+        r = WebSocketUtils.buildResponse(TEST_BODY_BYTES, 3, 3);
+        verifyBytes(CRLF, 0, r, 0, 2);
+        verifyBytes(TEST_BODY_BYTES, 3, r, 2, 3);
+        assertEquals(2 + 3, r.length);
+    }
+
+    private void verifyBytes(byte[] expected, int epos, byte[] result, int rpos, int length)
{
+        for (int i = 0; i < length; i++) {
+            if (result[rpos + i] != expected[epos + i]) {
+                fail("Wrong byte at position result[" + (rpos + i) + "]. Expected " 
+                    + expected[epos + i] + " but was " + result[rpos + i]);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/fa8a6142/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
index 857bccc..eed72a3 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
@@ -23,12 +23,16 @@ package org.apache.cxf.systest.jaxrs.websocket;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
@@ -39,11 +43,12 @@ import javax.ws.rs.core.StreamingOutput;
 
 import org.apache.cxf.jaxrs.ext.StreamingResponse;
 import org.apache.cxf.systest.jaxrs.Book;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
 
 @Path("/web/bookstore")
 public class BookStoreWebSocket {
     private static ExecutorService executor = Executors.newSingleThreadExecutor();
-
+    private Map<String, OutputStream> eventsStreams = new HashMap<String, OutputStream>();
     
     @GET
     @Path("/booknames")
@@ -136,6 +141,43 @@ public class BookStoreWebSocket {
         }
         return "Held from " + from + " for " + t + " ms";
     }
+    
+    @GET
+    @Path("/events/register")
+    @Produces("text/plain")
+    public StreamingOutput registerEventsStream(@HeaderParam(WebSocketConstants.DEFAULT_REQUEST_ID_KEY)
String reqid) {
+        final String key = reqid == null ? "*" : reqid;
+        return new StreamingOutput() {
+            public void write(final OutputStream out) throws IOException, WebApplicationException
{
+                eventsStreams.put(key, out);
+                out.write(("Registered " + key + " at " + new java.util.Date()).getBytes());
+            }
+        };
+
+    }
+
+    @GET
+    @Path("/events/create/{name}")
+    @Produces("text/plain")
+    public String createEvent(@PathParam("name") String name) {
+        for (Iterator<OutputStream> it = eventsStreams.values().iterator(); it.hasNext();)
{
+            OutputStream out = it.next();
+            try {
+                out.write(("News: event " + name + " created").getBytes());
+            } catch (IOException e) {
+                it.remove();
+                e.printStackTrace();
+            }
+        }
+        return name + " created";
+    }
+
+    @GET
+    @Path("/events/unregister/{key}")
+    @Produces("text/plain")
+    public String unregisterEventsStream(@PathParam("key") String key) {
+        return (eventsStreams.remove(key) != null ? "Unregistered: " : "Already Unregistered:
") + key; 
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/fa8a6142/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
index 01e4ccb..006bcbc 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
@@ -159,6 +159,39 @@ public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestB
         }
     }
     
+    @Test
+    public void testGetBookStreamWithIDReferences() throws Exception {
+        String address = "ws://localhost:" + getPort() + getContext() + "/websocket/web/bookstore";
+
+        WebSocketTestClient wsclient = new WebSocketTestClient(address);
+        wsclient.connect();
+        try {
+            wsclient.reset(5);
+            String reqid = UUID.randomUUID().toString();
+            wsclient.sendMessage(
+                ("GET " +  getContext() + "/websocket/web/bookstore/bookstream\r\nAccept:
application/json\r\n"
+                    + WebSocketConstants.DEFAULT_REQUEST_ID_KEY + ": " + reqid + "\r\n\r\n")
+                .getBytes());
+            assertTrue("response expected", wsclient.await(5));
+            List<WebSocketTestClient.Response> received = wsclient.getReceivedResponses();
+            assertEquals(5, received.size());
+            WebSocketTestClient.Response resp = received.get(0);
+            assertEquals(200, resp.getStatusCode());
+            assertEquals("application/json", resp.getContentType());
+            String value = resp.getTextEntity();
+            assertEquals(value, getBookJson(1));
+            for (int i = 2; i <= 5; i++) {
+                // subsequent data should not carry the status but the id header
+                resp = received.get(i - 1);
+                assertEquals(0, resp.getStatusCode());
+                assertEquals(reqid, resp.getId());
+                assertEquals(resp.getTextEntity(), getBookJson(i));
+            }
+        } finally {
+            wsclient.close();
+        }
+    }
+    
     private String getBookJson(int index) {
         return "{\"Book\":{\"id\":" + index + ",\"name\":\"WebSocket" + index + "\"}}";
     }
@@ -330,6 +363,117 @@ public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestB
         }
     }
 
+    @Test
+    public void testStreamRegisterAndUnregister() throws Exception {
+        String address = "ws://localhost:" + getPort() + getContext() + "/websocket/web/bookstore";
+
+        WebSocketTestClient wsclient1 = new WebSocketTestClient(address);
+        WebSocketTestClient wsclient2 = new WebSocketTestClient(address);
+        wsclient1.connect();
+        wsclient2.connect();
+        try {
+            String regkey = UUID.randomUUID().toString();
+
+            EventCreatorRunner runner = new EventCreatorRunner(wsclient2, regkey, 1000, 1000);
+            new Thread(runner).start();
+            
+            // register for the event stream with requestId ane expect to get 2 messages
+            wsclient1.reset(3);
+            wsclient1.sendTextMessage(
+                "GET " +  getContext() + "/websocket/web/bookstore/events/register\r\n"
+                    + WebSocketConstants.DEFAULT_REQUEST_ID_KEY + ": " + regkey + "\r\n\r\n");
+            assertFalse("only 2 responses expected", wsclient1.await(5));
+            List<WebSocketTestClient.Response> received = wsclient1.getReceivedResponses();
+            assertEquals(2, received.size());
+            
+            // the first response is the registration confirmation
+            WebSocketTestClient.Response resp = received.get(0);
+            assertEquals(200, resp.getStatusCode());
+            assertEquals("text/plain", resp.getContentType());
+            String value = resp.getTextEntity();
+            assertTrue(value.startsWith("Registered " + regkey));
+            String id = resp.getId();
+            assertEquals("unexpected responseId", regkey, id);
+
+            // the second response is the event news
+            resp = received.get(1);
+            assertEquals(0, resp.getStatusCode());
+            value = resp.getTextEntity();
+            assertEquals("News: event Hello created", value);
+            id = resp.getId();
+            assertEquals("unexpected responseId", regkey, id);
+
+            String[] values = runner.getValues();
+            assertTrue(runner.isCompleted());
+            assertEquals("Hello created", values[0]);
+            assertTrue(values[1].startsWith("Unregistered: " + regkey));
+            assertEquals("Hola created", values[2]);
+        } finally {
+            wsclient1.close();
+            wsclient2.close();
+        }        
+    }
+
+    private class EventCreatorRunner implements Runnable {
+        private WebSocketTestClient wsclient;
+        private String key;
+        private long delay1;
+        private long delay2;
+        private String[] values = new String[3];
+        private boolean completed;
+
+        public EventCreatorRunner(WebSocketTestClient wsclient, String key, long delay1,
long delay2) {
+            this.wsclient = wsclient;
+            this.key = key;
+            this.delay1 = delay1;
+            this.delay2 = delay2;
+        }
+
+        public void run() {
+            try {
+                Thread.sleep(delay1);
+                // creating an event and the event stream will see this event
+                wsclient.sendTextMessage(
+                    "GET " +  getContext() + "/websocket/web/bookstore/events/create/Hello\r\n\r\n");
+                assertTrue("response expected", wsclient.await(3));
+                List<WebSocketTestClient.Response> received = wsclient.getReceivedResponses();
+                WebSocketTestClient.Response resp = received.get(0);
+                values[0] = resp.getTextEntity();
+
+                Thread.sleep(delay2);
+                wsclient.reset(1);
+                // unregistering the event stream
+                wsclient.sendTextMessage(
+                    "GET " +  getContext() + "/websocket/web/bookstore/events/unregister/"
+ key + "\r\n\r\n");
+                assertTrue("response expected", wsclient.await(3));
+                received = wsclient.getReceivedResponses();
+                resp = received.get(0);
+                values[1] = resp.getTextEntity();
+
+                wsclient.reset(1);
+                // creating another event and the event stream will not see this event
+                wsclient.sendTextMessage(
+                    "GET " +  getContext() + "/websocket/web/bookstore/events/create/Hola\r\n\r\n");
+                assertTrue("response expected", wsclient.await(3));
+                received = wsclient.getReceivedResponses();
+                resp = received.get(0);
+                values[2] = resp.getTextEntity();
+            } catch (InterruptedException e) {
+                // ignore
+            } finally {
+                completed = true;
+            }
+        }
+
+        public String[] getValues() {
+            return values;
+        }
+
+        public boolean isCompleted() {
+            return completed;
+        }
+    }
+
     protected String getPort() {
         return PORT;
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/fa8a6142/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
index cfe73a5..69ad06c 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
@@ -228,20 +228,23 @@ class WebSocketTestClient {
         
         public Response(Object data) {
             this.data = data;
-            String line = readLine();
-            if (line != null) {
-                statusCode = Integer.parseInt(line);
-                while ((line = readLine()) != null) {
-                    if (line.length() > 0) {
-                        int del = line.indexOf(':');
-                        String h = line.substring(0, del).trim();
-                        String v = line.substring(del + 1).trim();
-                        if ("Content-Type".equalsIgnoreCase(h)) {
-                            contentType = v;
-                        } else if (WebSocketConstants.DEFAULT_RESPONSE_ID_KEY.equals(h))
{
-                            id = v;
-                        }
-                    }
+            String line;
+            boolean first = true;
+            while ((line = readLine()) != null) {
+                if (first && isStatusCode(line)) {
+                    statusCode = Integer.parseInt(line);
+                    continue;
+                } else {
+                    first = false;
+                }
+
+                int del = line.indexOf(':');
+                String h = line.substring(0, del).trim();
+                String v = line.substring(del + 1).trim();
+                if ("Content-Type".equalsIgnoreCase(h)) {
+                    contentType = v;
+                } else if (WebSocketConstants.DEFAULT_RESPONSE_ID_KEY.equals(h)) {
+                    id = v;
                 }
             }
             if (data instanceof String) {
@@ -252,7 +255,11 @@ class WebSocketTestClient {
             }
         }
             
-        
+        private static boolean isStatusCode(String line) {
+            char c = line.charAt(0);
+            return '0' <= c && c <= '9';
+        }
+
         public int getStatusCode() {
             return statusCode;
         }


Mime
View raw message