cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject git commit: [CXF-5746] WebsocketConduit to use the id header so that it can correlate requests and responses
Date Mon, 19 May 2014 10:09:58 GMT
Repository: cxf
Updated Branches:
  refs/heads/master e43f4527e -> 846451664


[CXF-5746] WebsocketConduit to use the id header so that it can correlate requests and responses


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/84645166
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/84645166
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/84645166

Branch: refs/heads/master
Commit: 84645166440d3053558cd83ae2bec86c2c88ed8d
Parents: e43f452
Author: Akitoshi Yoshida <ay@apache.org>
Authored: Thu May 15 17:28:50 2014 +0200
Committer: Akitoshi Yoshida <ay@apache.org>
Committed: Mon May 19 12:08:37 2014 +0200

----------------------------------------------------------------------
 .../transport/websocket/WebSocketConstants.java |  32 ++++
 .../cxf/transport/websocket/WebSocketUtils.java |  19 ++-
 .../WebSocketVirtualServletResponse.java        |   1 +
 .../websocket/ahc/AhcWebSocketConduit.java      | 151 +++++++++++--------
 .../ahc/AhcWebSocketConduitRequest.java         |  88 +++++++++++
 .../websocket/ahc/AhcWebSocketRequest.java      |  52 -------
 .../atmosphere/AtmosphereWebSocketHandler.java  |  33 ++--
 .../AtmosphereWebSocketStreamHandler.java       |  25 +--
 .../websocket/jetty/JettyWebSocket.java         |   9 ++
 .../JAXRSClientConduitWebSocketTest.java        |  58 ++++++-
 .../JAXRSClientServerWebSocketTest.java         |  51 +++++++
 .../jaxrs/websocket/WebSocketTestClient.java    |  14 +-
 12 files changed, 374 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/84645166/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketConstants.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketConstants.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketConstants.java
new file mode 100644
index 0000000..64efc1d
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketConstants.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket;
+
+/**
+ * 
+ */
+public final class WebSocketConstants {
+    public static final String DEFAULT_REQUEST_ID_KEY = "requestId";
+    public static final String DEFAULT_RESPONSE_ID_KEY = "responseId";
+
+    private WebSocketConstants() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/84645166/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
index 0e2c5ee..7bb27d2 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
@@ -22,6 +22,7 @@ package org.apache.cxf.transport.websocket;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.TreeMap;
 
 /**
@@ -34,6 +35,7 @@ public final class WebSocketUtils {
     static final String SM_KEY = "$sm";
     static final String FLUSHED_KEY = "$flushed";
     private static final String CRLF = "\r\n";
+    private static final String COLSP = ": ";
     private static final String DEFAULT_SC = "200";
 
     private WebSocketUtils() {
@@ -126,10 +128,7 @@ public final class WebSocketUtils {
         StringBuilder sb = new StringBuilder();
         String v = headers.get(SC_KEY);
         sb.append(v == null ? DEFAULT_SC : v).append(CRLF);
-        v = headers.get("Content-Type");
-        if (v != null) {
-            sb.append("Content-Type: ").append(v).append(CRLF);
-        }
+        appendHeaders(headers, sb);
         sb.append(CRLF);
         
         byte[] longdata = sb.toString().getBytes();
@@ -173,10 +172,7 @@ public final class WebSocketUtils {
                                       byte[] data, int offset, int length) {
         StringBuilder sb = new StringBuilder();
         sb.append(method).append(' ').append(url).append(CRLF);
-        String v = headers.get("Content-Type");
-        if (v != null) {
-            sb.append("Content-Type: ").append(v).append(CRLF);
-        }
+        appendHeaders(headers, sb);
         sb.append(CRLF);
 
         byte[] longdata = sb.toString().getBytes();
@@ -189,4 +185,11 @@ public final class WebSocketUtils {
         return longdata;
     }
 
+    private static void appendHeaders(Map<String, String> headers, StringBuilder sb)
{
+        for (Entry<String, String> header : headers.entrySet()) {
+            if (!header.getKey().startsWith("$")) {
+                sb.append(header.getKey()).append(COLSP).append(header.getValue()).append(CRLF);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/84645166/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
index f4260ac..1594dab 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
@@ -321,6 +321,7 @@ public class WebSocketVirtualServletResponse implements HttpServletResponse
{
         if (LOG.isLoggable(Level.INFO)) {
             LOG.log(Level.INFO, "setHeader({0}, {1})", new Object[]{name, value});
         }
+        responseHeaders.put(name, value);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cxf/blob/84645166/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
index 5c6ebad..3a9ab55 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
@@ -25,8 +25,10 @@ import java.io.OutputStream;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.util.Collections;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -43,6 +45,7 @@ import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.http.Headers;
 import org.apache.cxf.transport.http.URLConnectionHTTPConduit;
 import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
 import org.apache.cxf.transport.websocket.WebSocketUtils;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
@@ -56,8 +59,11 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
     private AsyncHttpClient ahcclient;
     private WebSocket websocket;
 
-    //FIXME use a ref-id based request and response association instead of using this sequential
queue
-    private BlockingQueue<Response> responseQueue = new ArrayBlockingQueue<Response>(4096);
+    //REVISIT make these keys configurable
+    private String requestIdKey = WebSocketConstants.DEFAULT_REQUEST_ID_KEY;
+    private String responseIdKey = WebSocketConstants.DEFAULT_RESPONSE_ID_KEY;
+    
+    private Map<String, RequestResponse> uncorrelatedRequests = new ConcurrentHashMap<String,
RequestResponse>();
 
     public AhcWebSocketConduit(Bus b, EndpointInfo ei, EndpointReferenceType t) throws IOException
{
         super(b, ei, t);
@@ -72,6 +78,7 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
         if (!"ws".equals(s) && !"wss".equals(s)) {
             throw new MalformedURLException("unknown protocol: " + s);
         }
+        
         message.put("http.scheme", currentURL.getScheme());
         String httpRequestMethod =
                 (String)message.get(Message.HTTP_REQUEST_METHOD);
@@ -80,14 +87,16 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
             message.put(Message.HTTP_REQUEST_METHOD, httpRequestMethod);
         }
 
-        final AhcWebSocketRequest request = new AhcWebSocketRequest(currentURL, httpRequestMethod);
-        message.put(AhcWebSocketRequest.class, request);
+        final AhcWebSocketConduitRequest request = new AhcWebSocketConduitRequest(currentURL,
httpRequestMethod);
+        final int rtimeout = determineReceiveTimeout(message, csPolicy);
+        request.setReceiveTimeout(rtimeout);
+        message.put(AhcWebSocketConduitRequest.class, request);
     }
 
     @Override
     protected OutputStream createOutputStream(Message message, boolean needToCacheRequest,
                                               boolean isChunking, int chunkThreshold) throws
IOException {
-        AhcWebSocketRequest entity = message.get(AhcWebSocketRequest.class);
+        AhcWebSocketConduitRequest entity = message.get(AhcWebSocketConduitRequest.class);
         AhcWebSocketWrappedOutputStream out =
             new AhcWebSocketWrappedOutputStream(message, needToCacheRequest, isChunking,
chunkThreshold,
                                                 getConduitName(), entity.getUri());
@@ -95,17 +104,18 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
     }
 
     public class AhcWebSocketWrappedOutputStream extends WrappedOutputStream {
-        private Request request;
+        private AhcWebSocketConduitRequest entity;
         private Response response;
 
         protected AhcWebSocketWrappedOutputStream(Message message, boolean possibleRetransmit,
                                                   boolean isChunking, int chunkThreshold,
String conduitName, URI url) {
             super(message, possibleRetransmit, isChunking, chunkThreshold, conduitName, url);
 
+            entity = message.get(AhcWebSocketConduitRequest.class);
             //REVISIT how we prepare the request
-            this.request = new Request();
-            request.setMethod((String)outMessage.get(Message.HTTP_REQUEST_METHOD));
-            request.setPath(url.getPath() + (String)outMessage.getContextualProperty("org.apache.cxf.request.uri"));
+            entity.setPath(url.getPath() + (String)message.getContextualProperty("org.apache.cxf.request.uri"));
+            entity.setId(UUID.randomUUID().toString());
+            uncorrelatedRequests.put(entity.getId(), new RequestResponse(entity));
         }
 
         @Override
@@ -117,9 +127,13 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
                 @Override
                 public void write(byte b[], int off, int len) throws IOException {
                     //REVISIT support multiple writes and flush() to write the entire block
data?
+                    // or provides the fragment mode?
+                    Map<String, String> headers = new HashMap<String, String>();
+                    headers.put("Content-Type", entity.getContentType());
+                    headers.put(requestIdKey, entity.getId());
                     websocket.sendMessage(WebSocketUtils.buildRequest(
-                        request.getMethod(), request.getPath(),
-                        Collections.<String, String>singletonMap("Content-Type", request.getContentType()),
+                        entity.getMethod(), entity.getPath(),
+                        headers,
                         b, off, len));
                 }
 
@@ -137,9 +151,11 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
         @Override
         protected void handleNoOutput() throws IOException {
             connect();
+            Map<String, String> headers = new HashMap<String, String>();
+            headers.put(requestIdKey, entity.getId());
             websocket.sendMessage(WebSocketUtils.buildRequest(
-                request.getMethod(), request.getPath(),
-                Collections.<String, String>emptyMap(),
+                entity.getMethod(), entity.getPath(),
+                headers,
                 null, 0, 0));
         }
 
@@ -151,8 +167,7 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
         @Override
         protected void setProtocolHeaders() throws IOException {
             Headers h = new Headers(outMessage);
-            request.setContentType(h.determineContentType());
-
+            entity.setContentType(h.determineContentType());
             //REVISIT may provide an option to add other headers
 //          boolean addHeaders = MessageUtils.isTrue(outMessage.getContextualProperty(Headers.ADD_HEADERS_PROPERTY));
         }
@@ -255,17 +270,25 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
             }
         }
 
-        synchronized Response getResponse() throws IOException {
+        Response getResponse() throws IOException {
             if (response == null) {
-                //TODO add a configurable timeout
-                try {
-                    response = responseQueue.take();
-                } catch (InterruptedException e) {
-                    // ignore
+                String rid = entity.getId();
+                RequestResponse rr = uncorrelatedRequests.get(rid);
+                synchronized (rr) {
+                    try {
+                        long timetowait = entity.getReceiveTimeout();
+                        response = rr.getResponse();
+                        if (response == null) {
+                            rr.wait(timetowait);
+                            response = rr.getResponse();
+                        }
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                }
+                if (response == null) {
+                    throw new IOException("timeout");
                 }
-            }
-            if (response == null) {
-                throw new IOException("timeout");
             }
             return response;
         }
@@ -293,8 +316,14 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
             if (LOG.isLoggable(Level.INFO)) {
                 LOG.log(Level.INFO, "onMessage({0})", message);
             }
-
-            responseQueue.add(new Response(message));
+            Response resp = new Response(responseIdKey, message);
+            RequestResponse rr = uncorrelatedRequests.get(resp.getId());
+            if (rr != null) {
+                synchronized (rr) {
+                    rr.setResponse(resp);
+                    rr.notifyAll();
+                }
+            }
         }
 
         public void onFragment(byte[] fragment, boolean last) {
@@ -306,8 +335,14 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
             if (LOG.isLoggable(Level.INFO)) {
                 LOG.log(Level.INFO, "onMessage({0})", message);
             }
-
-            responseQueue.add(new Response(message));
+            Response resp = new Response(responseIdKey, message);
+            RequestResponse rr = uncorrelatedRequests.get(resp.getId());
+            if (rr != null) {
+                synchronized (rr) {
+                    rr.setResponse(resp);
+                    rr.notifyAll();
+                }
+            }
         }
 
         public void onFragment(String fragment, boolean last) {
@@ -316,43 +351,17 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
         }
     }
 
-    // Request and Response are used to represent request and response messages trasfered
over the websocket
-    //REVIST move these classes to be used in other places
-    static class Request {
-        private String method;
-        private String url;
-        private String contentType;
-
-        public String getMethod() {
-            return method;
-        }
-        public void setMethod(String method) {
-            this.method = method;
-        }
-
-        public String getPath() {
-            return url;
-        }
-        public void setPath(String path) {
-            this.url = path;
-        }
-
-        public String getContentType() {
-            return contentType;
-        }
-        public void setContentType(String contentType) {
-            this.contentType = contentType;
-        }
-    }
-
+    // Request and Response are used to represent request and response messages transfered
over the websocket
+    //REVIST move these classes to be used in other places after finalizing their contained
information.
     static class Response {
         private Object data;
         private int pos;
         private int statusCode;
         private String contentType;
+        private String id;
         private Object entity;
 
-        public Response(Object data) {
+        public Response(String idKey, Object data) {
             this.data = data;
             String line = readLine();
             if (line != null) {
@@ -364,6 +373,8 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
                         String v = line.substring(del + 1).trim();
                         if ("Content-Type".equalsIgnoreCase(h)) {
                             contentType = v;
+                        } else if (idKey.equals(h)) {
+                            id = v;
                         }
                     }
                 }
@@ -384,6 +395,10 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
             return contentType;
         }
 
+        public String getId() {
+            return id;
+        }
+
         public Object getEntity() {
             return entity;
         }
@@ -423,4 +438,20 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
         }
     }
 
+    static class RequestResponse {
+        private AhcWebSocketConduitRequest request;
+        private Response response;
+        public RequestResponse(AhcWebSocketConduitRequest request) {
+            this.request = request;
+        }
+        public AhcWebSocketConduitRequest getRequest() {
+            return request;
+        }
+        public Response getResponse() {
+            return response;
+        }
+        public void setResponse(Response response) {
+            this.response = response;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/84645166/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduitRequest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduitRequest.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduitRequest.java
new file mode 100644
index 0000000..e4396b3
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduitRequest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.ahc;
+
+import java.net.URI;
+
+/**
+ * TODO may rename it to WebSocketConduitRequest and move it to a generic package
+ */
+class AhcWebSocketConduitRequest {
+    private URI uri;
+    private String method;
+    private String path;
+    private String contentType;
+    private String id;
+    private int receiveTimeout;
+    
+    public AhcWebSocketConduitRequest(URI uri, String method) {
+        this.uri = uri;
+        this.method = method;
+    }
+
+    public URI getUri() {
+        return uri;
+    }
+
+    public void setUri(URI uri) {
+        this.uri = uri;
+    }
+
+    public String getMethod() {
+        return method;
+    }
+
+    public void setMethod(String method) {
+        this.method = method;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public void setPath(String path) {
+        this.path = path;
+    }
+
+    public String getContentType() {
+        return contentType;
+    }
+
+    public void setContentType(String contentType) {
+        this.contentType = contentType;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public int getReceiveTimeout() {
+        return receiveTimeout;
+    }
+
+    public void setReceiveTimeout(int receiveTimeout) {
+        this.receiveTimeout = receiveTimeout;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/84645166/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketRequest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketRequest.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketRequest.java
deleted file mode 100644
index 520ff04..0000000
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketRequest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.transport.websocket.ahc;
-
-import java.net.URI;
-
-/**
- * TODO consolidate this class with the local Request class of AhcWebSocketConduit
- */
-class AhcWebSocketRequest {
-    private URI uri;
-    private String method;
-    
-    public AhcWebSocketRequest(URI uri, String method) {
-        this.uri = uri;
-        this.method = method;
-    }
-
-    public URI getUri() {
-        return uri;
-    }
-
-    public void setUri(URI uri) {
-        this.uri = uri;
-    }
-
-    public String getMethod() {
-        return method;
-    }
-
-    public void setMethod(String method) {
-        this.method = method;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/84645166/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
index 9216dad..6203234 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
@@ -37,6 +37,7 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.transport.websocket.InvalidPathException;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
 import org.apache.cxf.transport.websocket.WebSocketDestinationService;
 import org.apache.cxf.transport.websocket.WebSocketServletHolder;
 import org.apache.cxf.transport.websocket.WebSocketVirtualServletRequest;
@@ -55,6 +56,9 @@ public class AtmosphereWebSocketHandler implements WebSocketProtocol {
 
     protected AtmosphereWebSocketServletDestination destination;
     
+    //REVISIT make these keys configurable
+    private String requestIdKey = WebSocketConstants.DEFAULT_REQUEST_ID_KEY;
+    private String responseIdKey = WebSocketConstants.DEFAULT_RESPONSE_ID_KEY;
     
     public AtmosphereWebSocketServletDestination getDestination() {
         return destination;
@@ -88,14 +92,22 @@ public class AtmosphereWebSocketHandler implements WebSocketProtocol {
     /** {@inheritDoc}*/
     @Override
     public List<AtmosphereRequest> onMessage(WebSocket webSocket, byte[] data, int
offset, int length) {
-        LOG.info("onMessage(WebSocket, byte[], int, int)");
+        return invokeService(webSocket, new ByteArrayInputStream(data, offset, length));
+    }
+    
+    protected List<AtmosphereRequest> invokeService(WebSocket webSocket,  InputStream
stream) {
+        LOG.info("invokeService(WebSocket, InputStream)");
         HttpServletRequest request = null;
-        HttpServletResponse response = null;
+        HttpServletResponse response = null;        
         try {
             WebSocketServletHolder webSocketHolder = new AtmosphereWebSocketServletHolder(webSocket);
             response = createServletResponse(webSocketHolder);
-            request = createServletRequest(webSocketHolder, data, offset, length);
+            request = createServletRequest(webSocketHolder, stream);
             if (destination != null) {
+                String reqid = request.getHeader(requestIdKey);
+                if (reqid != null) {
+                    response.setHeader(responseIdKey, reqid);
+                }
                 ((WebSocketDestinationService)destination).invokeInternal(null, 
                     webSocket.resource().getRequest().getServletContext(),
                     request, response);
@@ -104,13 +116,12 @@ public class AtmosphereWebSocketHandler implements WebSocketProtocol
{
             reportErrorStatus(response, 400);
         } catch (Exception e) {
             LOG.log(Level.WARNING, "Failed to invoke service", e);
-            reportErrorStatus(response, 500);
         }
         return null;
     }
 
     // may want to move this error reporting code to WebSocketServletHolder
-    private void reportErrorStatus(HttpServletResponse response, int status) {
+    protected void reportErrorStatus(HttpServletResponse response, int status) {
         if (response != null) {
             response.setStatus(status);
             try {
@@ -142,12 +153,12 @@ public class AtmosphereWebSocketHandler implements WebSocketProtocol
{
         LOG.info("onError(WebSocket, WebSocketException)");
     }
 
-    protected WebSocketVirtualServletRequest createServletRequest(WebSocketServletHolder
webSocketHolder, 
-                                                                  byte[] data, int offset,
int length) 
-        throws IOException {
-        return new WebSocketVirtualServletRequest(webSocketHolder, 
-                                                  new ByteArrayInputStream(data, offset,
length));
-    }
+//    protected WebSocketVirtualServletRequest createServletRequest(WebSocketServletHolder
webSocketHolder, 
+//                                                                  byte[] data, int offset,
int length) 
+//        throws IOException {
+//        return new WebSocketVirtualServletRequest(webSocketHolder, 
+//                                                  new ByteArrayInputStream(data, offset,
length));
+//    }
     
     protected WebSocketVirtualServletRequest createServletRequest(WebSocketServletHolder
webSocketHolder, 
                                                                   InputStream stream)

http://git-wip-us.apache.org/repos/asf/cxf/blob/84645166/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
index 8574858..1ddc652 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
@@ -22,15 +22,9 @@ package org.apache.cxf.transport.websocket.atmosphere;
 import java.io.InputStream;
 import java.io.Reader;
 import java.util.List;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
 import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.transport.websocket.WebSocketDestinationService;
-import org.apache.cxf.transport.websocket.WebSocketServletHolder;
 import org.atmosphere.cpr.AtmosphereRequest;
 import org.atmosphere.websocket.WebSocket;
 import org.atmosphere.websocket.WebSocketProtocolStream;
@@ -51,23 +45,6 @@ public class AtmosphereWebSocketStreamHandler extends AtmosphereWebSocketHandler
 
     @Override
     public List<AtmosphereRequest> onBinaryStream(WebSocket webSocket, InputStream
stream) {
-        LOG.info("onBinaryStream(WebSocket, InputStream)");
-        
-        try {
-            WebSocketServletHolder webSocketHolder = new AtmosphereWebSocketServletHolder(webSocket);
-            HttpServletRequest request = createServletRequest(webSocketHolder, stream);
-            HttpServletResponse response = createServletResponse(webSocketHolder);
-            if (destination != null) {
-                ((WebSocketDestinationService)destination).invokeInternal(null, 
-                    webSocket.resource().getRequest().getServletContext(),
-                    request, response);
-            }
-        } catch (Exception e) {
-            LOG.log(Level.WARNING, "Failed to invoke service", e);
-        }
-        return null;
+        return invokeService(webSocket, stream);
     }
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/84645166/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
index a9ff973..4450a2b 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
@@ -38,6 +38,7 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.transport.websocket.InvalidPathException;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
 import org.apache.cxf.transport.websocket.WebSocketServletHolder;
 import org.apache.cxf.transport.websocket.WebSocketVirtualServletRequest;
 import org.apache.cxf.transport.websocket.WebSocketVirtualServletResponse;
@@ -50,6 +51,10 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa
     private Connection webSocketConnection;
     private WebSocketServletHolder webSocketHolder;
     private String protocol;
+
+    //REVISIT make these keys configurable
+    private String requestIdKey = WebSocketConstants.DEFAULT_REQUEST_ID_KEY;
+    private String responseIdKey = WebSocketConstants.DEFAULT_RESPONSE_ID_KEY;
     
     public JettyWebSocket(JettyWebSocketManager manager, HttpServletRequest request, String
protocol) {
         this.manager = manager;
@@ -103,6 +108,10 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa
             response = createServletResponse();
             request = createServletRequest(data, offset, length);
             if (manager != null) {
+                String reqid = request.getHeader(requestIdKey);
+                if (reqid != null) {
+                    response.setHeader(responseIdKey, reqid);
+                }
                 manager.service(request, response);
             }
         } catch (InvalidPathException ex) { 

http://git-wip-us.apache.org/repos/asf/cxf/blob/84645166/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientConduitWebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientConduitWebSocketTest.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientConduitWebSocketTest.java
index 2552b70..02491f1 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientConduitWebSocketTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientConduitWebSocketTest.java
@@ -75,7 +75,63 @@ public class JAXRSClientConduitWebSocketTest extends AbstractBusClientServerTest
         //resource.getBookBought();
 
     }
-    
+
+    @Test
+    public void testCallsWithIDReferences() throws Exception {
+        String address = "ws://localhost:" + getPort() + "/websocket";
+
+        BookStoreWebSocket resource = JAXRSClientFactory.create(address, BookStoreWebSocket.class,
null, true);
+        Client client = WebClient.client(resource);
+        client.header(HttpHeaders.USER_AGENT, JAXRSClientConduitWebSocketTest.class.getName());
+        
+        // call the POST service twice (a unique requestId is automatically included to correlate
the response)
+        EchoBookIdRunner[] runners = new EchoBookIdRunner[2];
+        runners[0] = new EchoBookIdRunner(resource, 549);
+        runners[1] = new EchoBookIdRunner(resource, 495);
+        
+        new Thread(runners[0]).start();
+        new Thread(runners[1]).start();
+        
+        long timetowait = 5000;
+        while (timetowait > 0) {
+            if (runners[0].isCompleted() && runners[1].isCompleted()) {
+                break;
+            }
+            Thread.sleep(500);
+            timetowait -= 500;
+        }
+        assertEquals(Long.valueOf(549), runners[0].getValue());
+        assertEquals(Long.valueOf(495), runners[1].getValue());
+    }    
+
+    private static class EchoBookIdRunner implements Runnable {
+        private BookStoreWebSocket resource;
+        private long input;
+        private Long value;
+        private boolean completed;
+
+        public EchoBookIdRunner(BookStoreWebSocket resource, long input) {
+            this.resource = resource;
+            this.input = input;
+        }
+
+        public void run() {
+            try {
+                value = resource.echoBookId(input);
+            } finally {
+                completed = true;
+            }
+        }
+
+        public Long getValue() {
+            return value;
+        }
+
+        public boolean isCompleted() {
+            return completed;
+        }
+    }
+
     protected String getPort() {
         return PORT;
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/84645166/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
index 1c32f85..46d8fc2 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
@@ -20,11 +20,13 @@
 package org.apache.cxf.systest.jaxrs.websocket;
 
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.cxf.jaxrs.client.WebClient;
 import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
 import org.apache.cxf.systest.jaxrs.Book;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -254,6 +256,55 @@ public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestB
         }
     }
     
+    @Test
+    public void testCallsWithIDReferences() throws Exception {
+        String address = "ws://localhost:" + getPort() + "/websocket/web/bookstore";
+
+        WebSocketTestClient wsclient = new WebSocketTestClient(address);
+        wsclient.connect();
+        try {
+            // call the POST service without requestId
+            wsclient.sendTextMessage(
+                "POST /websocket/web/bookstore/booksplain\r\nContent-Type: text/plain\r\n\r\n459");
+            assertTrue("response expected", wsclient.await(3));
+            List<WebSocketTestClient.Response> received = wsclient.getReceivedResponses();
+            WebSocketTestClient.Response resp = received.get(0);
+            assertEquals(200, resp.getStatusCode());
+            assertEquals("text/plain", resp.getContentType());
+            String value = resp.getTextEntity();
+            assertEquals("459", value);
+            String id = resp.getId();
+            assertNull("response id is incorrect", id);
+            
+            // call the POST service twice with a unique requestId 
+            wsclient.reset(2);
+            String reqid1 = UUID.randomUUID().toString();
+            String reqid2 = UUID.randomUUID().toString();
+            wsclient.sendTextMessage(
+                "POST /websocket/web/bookstore/booksplain\r\nContent-Type: text/plain\r\n"

+                + WebSocketConstants.DEFAULT_REQUEST_ID_KEY + ": " + reqid1 + "\r\n\r\n549");
+            wsclient.sendTextMessage(
+                "POST /websocket/web/bookstore/booksplain\r\nContent-Type: text/plain\r\n"

+                + WebSocketConstants.DEFAULT_REQUEST_ID_KEY + ": " + reqid2 + "\r\n\r\n495");
+            assertTrue("response expected", wsclient.await(3));
+            received = wsclient.getReceivedResponses();
+            for (WebSocketTestClient.Response r : received) {
+                assertEquals(200, r.getStatusCode());
+                assertEquals("text/plain", r.getContentType());
+                value = r.getTextEntity();
+                id = r.getId();
+                if (reqid1.equals(id)) {
+                    assertEquals("549", value);
+                } else if (reqid2.equals(id)) {
+                    assertEquals("495", value);
+                } else {
+                    fail("unexpected responseId: " + id);
+                }
+            }
+        } finally {
+            wsclient.close();
+        }
+    }
     
     protected String getPort() {
         return PORT;

http://git-wip-us.apache.org/repos/asf/cxf/blob/84645166/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
index b28a1c6..7e45544 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
@@ -22,6 +22,7 @@ package org.apache.cxf.systest.jaxrs.websocket;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -36,6 +37,7 @@ import com.ning.http.client.websocket.WebSocketTextListener;
 import com.ning.http.client.websocket.WebSocketUpgradeHandler;
 
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
 
 
 
@@ -57,8 +59,8 @@ class WebSocketTestClient {
     private String url;
     
     public WebSocketTestClient(String url) {
-        this.received = new ArrayList<Object>();
-        this.fragments = new ArrayList<Object>();
+        this.received = Collections.synchronizedList(new ArrayList<Object>());
+        this.fragments = Collections.synchronizedList(new ArrayList<Object>());
         this.latch = new CountDownLatch(1);
         this.client = new AsyncHttpClient();
         this.url = url;
@@ -214,6 +216,7 @@ class WebSocketTestClient {
         private int pos; 
         private int statusCode;
         private String contentType;
+        private String id;
         private Object entity;
         
         public Response(Object data) {
@@ -228,6 +231,8 @@ class WebSocketTestClient {
                         String v = line.substring(del + 1).trim();
                         if ("Content-Type".equalsIgnoreCase(h)) {
                             contentType = v;
+                        } else if (WebSocketConstants.DEFAULT_RESPONSE_ID_KEY.equals(h))
{
+                            id = v;
                         }
                     }
                 }
@@ -239,7 +244,6 @@ class WebSocketTestClient {
                 System.arraycopy((byte[])data, pos, (byte[])entity, 0, ((byte[])entity).length);
             }
         }
-                
             
         
         public int getStatusCode() {
@@ -258,6 +262,10 @@ class WebSocketTestClient {
             return gettext(entity);
         }
 
+        public String getId() {
+            return id;
+        }
+
         public String toString() {
             StringBuffer sb = new StringBuffer();
             sb.append("Status: ").append(statusCode).append("\r\n");


Mime
View raw message