cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject cxf git commit: [CXF-6232] Reactor CXF's Atmosphere based WebSocket transport (adjusted for 3.0.x)
Date Wed, 18 Mar 2015 15:24:05 GMT
Repository: cxf
Updated Branches:
  refs/heads/3.0.x-fixes 467d32055 -> 08fc8aadb


[CXF-6232] Reactor CXF's Atmosphere based WebSocket transport (adjusted for 3.0.x)


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/08fc8aad
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/08fc8aad
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/08fc8aad

Branch: refs/heads/3.0.x-fixes
Commit: 08fc8aadb9e886323533b657604912b33246efe9
Parents: 467d320
Author: Akitoshi Yoshida <ay@apache.org>
Authored: Wed Feb 4 23:47:20 2015 +0100
Committer: Akitoshi Yoshida <ay@apache.org>
Committed: Wed Mar 18 16:23:58 2015 +0100

----------------------------------------------------------------------
 .../websocket/WebSocketServletHolder.java       |   2 +-
 .../cxf/transport/websocket/WebSocketUtils.java |  91 ++--
 .../WebSocketVirtualServletRequest.java         |   2 +-
 .../WebSocketVirtualServletResponse.java        |   4 +-
 .../atmosphere/AtmosphereWebSocketHandler.java  |   3 +-
 .../AtmosphereWebSocketServletDestination.java  |  74 ++-
 .../AtmosphereWebSocketStreamHandler.java       |   3 +-
 .../atmosphere/DefaultProtocolInterceptor.java  | 254 +++++++++
 .../websocket/jetty/JettyWebSocket.java         |   3 -
 .../websocket/jetty/WebSocketServletHolder.java |  59 +++
 .../jetty/WebSocketVirtualServletRequest.java   | 527 +++++++++++++++++++
 .../jetty/WebSocketVirtualServletResponse.java  | 367 +++++++++++++
 .../jaxrs/websocket/WebSocketTestClient.java    |  10 +-
 13 files changed, 1324 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java
index 8385fa8..6bcb72a 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java
@@ -28,7 +28,7 @@ import javax.servlet.DispatcherType;
 import javax.servlet.ServletContext;
 
 /**
- * 
+ * @deprecated This class is only used by jetty, it has been moved to org.apache.cxf.transport.websocket.jetty
  */
 public interface WebSocketServletHolder {
     String getAuthType();

http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
index a55639c..5dbb930 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
@@ -30,14 +30,13 @@ import java.util.TreeMap;
  * 
  */
 public final class WebSocketUtils {
-    static final String URI_KEY = "$uri";
-    static final String METHOD_KEY = "$method";
-    static final String SC_KEY = "$sc";
-    static final String SM_KEY = "$sm";
-    static final String FLUSHED_KEY = "$flushed";
+    public static final String URI_KEY = "$uri";
+    public static final String METHOD_KEY = "$method";
+    public static final String SC_KEY = "$sc";
+    public static final String FLUSHED_KEY = "$flushed";
+
     private static final byte[] CRLF = "\r\n".getBytes();
     private static final byte[] COLSP = ": ".getBytes();
-    private static final String DEFAULT_SC = "200";
 
     private WebSocketUtils() {
     }
@@ -116,6 +115,15 @@ public final class WebSocketUtils {
         return buffer.toString();
     }
 
+    public static byte[] readBody(InputStream in) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        byte[] buf = new byte[8192];
+        for (int n = in.read(buf); n > -1; n = in.read(buf)) {
+            baos.write(buf, 0, n);
+        }
+        return baos.toByteArray();
+    }
+
     /**
      * Build response bytes with the status and type information specified in the headers.
      *
@@ -128,14 +136,15 @@ public final class WebSocketUtils {
     public static byte[] buildResponse(Map<String, String> headers, byte[] data, int offset, int length) {
         ByteArrayBuilder sb = new ByteArrayBuilder();
         String v = headers.get(SC_KEY);
-        sb.append(v == null ? DEFAULT_SC : v).append(CRLF);
-        appendHeaders(headers, sb);
+        if (v != null) {
+            sb.append(v).append(CRLF);
+        }
+        sb.append(headers);
         
-        byte[] longdata = sb.toByteArray();
         if (data != null && length > 0) {
-            longdata = buildResponse(longdata, data, offset, length);
+            sb.append(CRLF).append(data, offset, length);
         }
-        return longdata;
+        return sb.toByteArray();
     }
 
     /**
@@ -154,9 +163,10 @@ public final class WebSocketUtils {
         if (hlen > 0) {
             System.arraycopy(headers, 0, longdata, 0, hlen);
         }
-        longdata[hlen] = 0x0d;
-        longdata[hlen + 1] = 0x0a;
-        System.arraycopy(data, offset, longdata, hlen + 2, length);
+        if (data != null && length > 0) {
+            System.arraycopy(CRLF, 0, longdata, hlen, CRLF.length);
+            System.arraycopy(data, offset, longdata, hlen + CRLF.length, length);
+        }
         return longdata;
     }
 
@@ -172,8 +182,9 @@ public final class WebSocketUtils {
     public static byte[] buildResponse(byte[] data, int offset, int length) {
         return buildResponse((byte[])null, data, offset, length);
     }
-    
-    static byte[] buildHeaderLine(String name, String value) {
+
+    //FIXME (consolidate the response building code)
+    public static byte[] buildHeaderLine(String name, String value) {
         byte[] hl = new byte[name.length() + COLSP.length + value.length() + CRLF.length];
         System.arraycopy(name.getBytes(), 0, hl, 0, name.length());
         System.arraycopy(COLSP, 0, hl, name.length(), COLSP.length);
@@ -181,7 +192,7 @@ public final class WebSocketUtils {
         System.arraycopy(CRLF, 0, hl, name.length() + COLSP.length + value.length(), CRLF.length);
         return hl;
     }
-        
+
     /**
      * Build request bytes with the specified method, url, headers, and content entity.
      * 
@@ -196,34 +207,20 @@ public final class WebSocketUtils {
     public static byte[] buildRequest(String method, String url, Map<String, String> headers,
                                       byte[] data, int offset, int length) {
         ByteArrayBuilder sb = new ByteArrayBuilder();
-        sb.append(method).append(' ').append(url).append(CRLF);
-        appendHeaders(headers, sb);
-        sb.append(CRLF);
+        sb.append(method).append(' ').append(url).append(CRLF).append(headers);
 
-        byte[] longdata = sb.toByteArray();
         if (data != null && length > 0) {
-            final byte[] hb = longdata;
-            longdata = new byte[hb.length + length];
-            System.arraycopy(hb, 0, longdata, 0, hb.length);
-            System.arraycopy(data, offset, longdata, hb.length, length);
+            sb.append(CRLF).append(data, offset, length);
         }
-        return longdata;
+        return sb.toByteArray();
     }
 
-    private static void appendHeaders(Map<String, String> headers, ByteArrayBuilder sb) {
-        for (Entry<String, String> header : headers.entrySet()) {
-            if (!header.getKey().startsWith("$")) {
-                sb.append(header.getKey()).append(COLSP).append(header.getValue()).append(CRLF);
-            }
-        }
-    }
-    
     private static class ByteArrayBuilder {
         private ByteArrayOutputStream baos;
         public ByteArrayBuilder() {
             baos = new ByteArrayOutputStream();
         }
-        
+
         public ByteArrayBuilder append(byte[] b) {
             try {
                 baos.write(b);
@@ -232,21 +229,35 @@ public final class WebSocketUtils {
             }
             return this;
         }
-        
+
+        public ByteArrayBuilder append(byte[] b, int offset, int length) {
+            baos.write(b, offset, length);
+            return this;
+        }
+
         public ByteArrayBuilder append(String s) {
             try {
-                baos.write(s.getBytes());
+                baos.write(s.getBytes("utf-8"));
             } catch (IOException e) {
                 // ignore
             }
             return this;
         }
-        
-        public ByteArrayBuilder append(char c) {
+
+        public ByteArrayBuilder append(int c) {
             baos.write(c);
             return this;
         }
-        
+
+        public ByteArrayBuilder append(Map<String, String> map) {
+            for (Entry<String, String> m : map.entrySet()) {
+                if (!m.getKey().startsWith("$")) {
+                    append(m.getKey()).append(COLSP).append(m.getValue()).append(CRLF);
+                }
+            }
+            return this;
+        }
+
         public byte[] toByteArray() {
             return baos.toByteArray();
         }

http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java
index 9109aed..137b74b 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java
@@ -52,7 +52,7 @@ import javax.servlet.http.Part;
 import org.apache.cxf.common.logging.LogUtils;
 
 /**
- * 
+ * @deprecated This class is only used by jetty, it has been moved to org.apache.cxf.transport.websocket.jetty
  */
 public class WebSocketVirtualServletRequest implements HttpServletRequest {
     private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletRequest.class);

http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
index 149e377..026f31f 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
@@ -36,7 +36,7 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.cxf.common.logging.LogUtils;
 
 /**
- * 
+ * @deprecated This class is only used by jetty, it has been moved to org.apache.cxf.transport.websocket.jetty
  */
 public class WebSocketVirtualServletResponse implements HttpServletResponse {
     private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletResponse.class);
@@ -257,7 +257,6 @@ public class WebSocketVirtualServletResponse implements HttpServletResponse {
             LOG.log(Level.FINE, "sendError({0}, {1})", new Object[]{sc, msg});
         }
         responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
-        responseHeaders.put(WebSocketUtils.SM_KEY, msg);
     }
 
     @Override
@@ -304,7 +303,6 @@ public class WebSocketVirtualServletResponse implements HttpServletResponse {
             LOG.log(Level.FINE, "setStatus({0}, {1})", new Object[]{sc, sm});
         }
         responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
-        responseHeaders.put(WebSocketUtils.SM_KEY, sm);
     }
 
     private ServletOutputStream createOutputStream() {

http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
index 38e6599..1cf1124 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
@@ -50,7 +50,8 @@ import org.atmosphere.websocket.WebSocketProcessor.WebSocketException;
 import org.atmosphere.websocket.WebSocketProtocol;
 
 /**
- * 
+ * @deprecated No longer used as the protocol handling is done by Atmosphere's protocol intercepter
+ * such as org.apache.cxf.transport.websocket.atmosphere.DefaultProtocolInterceptor.
  */
 public class AtmosphereWebSocketHandler implements WebSocketProtocol {
     private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketHandler.class);

http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
index c8e5fae..7aa4cd3 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
@@ -21,15 +21,18 @@ package org.apache.cxf.transport.websocket.atmosphere;
 
 import java.io.IOException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletRequestWrapper;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.http.DestinationRegistry;
 import org.apache.cxf.transport.servlet.ServletDestination;
@@ -37,16 +40,20 @@ import org.apache.cxf.transport.websocket.WebSocketDestinationService;
 import org.apache.cxf.workqueue.WorkQueueManager;
 import org.atmosphere.cpr.ApplicationConfig;
 import org.atmosphere.cpr.AtmosphereFramework;
+import org.atmosphere.cpr.AtmosphereInterceptor;
 import org.atmosphere.cpr.AtmosphereRequest;
+import org.atmosphere.cpr.AtmosphereResource;
 import org.atmosphere.cpr.AtmosphereResponse;
+import org.atmosphere.handler.AbstractReflectorAtmosphereHandler;
 import org.atmosphere.util.Utils;
-import org.atmosphere.websocket.WebSocketProtocol;
 
 /**
  * 
  */
 public class AtmosphereWebSocketServletDestination extends ServletDestination implements
     WebSocketDestinationService {
+    private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketServletDestination.class);
+
     private AtmosphereFramework framework;
     private Executor executor;
 
@@ -54,19 +61,14 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im
                                                  String path) throws IOException {
         super(bus, registry, ei, path);
         this.framework = new AtmosphereFramework(false, true);
-
         framework.setUseNativeImplementation(false);
+        framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true");
+        framework.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, "true");
         framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");
-        //TODO provide a way to switch between the non-stream handler and the stream handler
-        framework.addInitParameter(ApplicationConfig.WEBSOCKET_PROTOCOL, 
-                                   AtmosphereWebSocketHandler.class.getName());
+        framework.interceptor(getInterceptor(bus));
+        framework.addAtmosphereHandler("/", new DestinationHandler());
         framework.init();
 
-        WebSocketProtocol wsp = framework.getWebSocketProtocol();
-        if (wsp instanceof AtmosphereWebSocketHandler) {
-            ((AtmosphereWebSocketHandler)wsp).setDestination(this);
-        }
-
         // the executor for decoupling the service invocation from websocket's onMessage call which is
         // synchronously blocked
         executor = bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue();
@@ -77,7 +79,7 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im
                        HttpServletResponse resp) throws IOException {
         if (Utils.webSocketEnabled(req)) {
             try {
-                framework.doCometSupport(AtmosphereRequest.wrap(new HttpServletRequestFilter(req)), 
+                framework.doCometSupport(AtmosphereRequest.wrap(req), 
                                          AtmosphereResponse.wrap(resp));
             } catch (ServletException e) {
                 throw new IOException(e);
@@ -96,20 +98,44 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im
     Executor getExecutor() {
         return executor;
     }
+
+    private class DestinationHandler extends AbstractReflectorAtmosphereHandler {
+
+        @Override
+        public void onRequest(final AtmosphereResource resource) throws IOException {
+            LOG.fine("onRequest");
+            executeHandlerTask(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        invokeInternal(null, 
+                            resource.getRequest().getServletContext(), resource.getRequest(), resource.getResponse());
+                    } catch (Exception e) {
+                        LOG.log(Level.WARNING, "Failed to invoke service", e);
+                    }
+                }
+            });
+        }
+    }
     
-    private static class HttpServletRequestFilter extends HttpServletRequestWrapper {
-        private static final String TRANSPORT_ADDRESS 
-            = "org.apache.cxf.transport.endpoint.address";
-        private String transportAddress; 
-        public HttpServletRequestFilter(HttpServletRequest request) {
-            super(request);
-            transportAddress = (String)request.getAttribute(TRANSPORT_ADDRESS);
+    private void executeHandlerTask(Runnable r) {
+        try {
+            executor.execute(r);
+        } catch (RejectedExecutionException e) {
+            LOG.warning(
+                "Executor queue is full, run the service invocation task in caller thread." 
+                + "  Users can specify a larger executor queue to avoid this.");
+            r.run();
         }
-        
-        @Override
-        public Object getAttribute(String name) {
-            return TRANSPORT_ADDRESS.equals(name) ? transportAddress : super.getAttribute(name);
+    }
+
+    //FIXME a temporary workaround until we decide how to customize atmosphere using cxf's destination configuration
+    private AtmosphereInterceptor getInterceptor(Bus bus) {
+        AtmosphereInterceptor ai = (AtmosphereInterceptor)bus.getProperty("atmosphere.interceptor");
+        if (ai == null) {
+            ai = new DefaultProtocolInterceptor(); 
         }
-        
+        LOG.info("AtmosphereInterceptor: " + ai);
+        return ai;
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
index 1f4cc00..ac14b0a 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
@@ -30,7 +30,8 @@ import org.atmosphere.websocket.WebSocket;
 import org.atmosphere.websocket.WebSocketProtocolStream;
 
 /**
- * 
+ * @deprecated No longer used as the protocol handling is done by Atmosphere's protocol intercepter
+ * such as org.apache.cxf.transport.websocket.atmosphere.DefaultProtocolInterceptor.
  */
 public class AtmosphereWebSocketStreamHandler extends AtmosphereWebSocketHandler implements 
     WebSocketProtocolStream {

http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
new file mode 100644
index 0000000..7c4c6e5
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.atmosphere;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.websocket.InvalidPathException;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
+import org.apache.cxf.transport.websocket.WebSocketUtils;
+import org.atmosphere.config.service.AtmosphereInterceptorService;
+import org.atmosphere.cpr.Action;
+import org.atmosphere.cpr.AsyncIOInterceptor;
+import org.atmosphere.cpr.AsyncIOInterceptorAdapter;
+import org.atmosphere.cpr.AsyncIOWriter;
+import org.atmosphere.cpr.AtmosphereFramework;
+import org.atmosphere.cpr.AtmosphereInterceptorAdapter;
+import org.atmosphere.cpr.AtmosphereInterceptorWriter;
+import org.atmosphere.cpr.AtmosphereRequest;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResponse;
+import org.atmosphere.cpr.FrameworkConfig;
+
+/**
+ * 
+ */
+@AtmosphereInterceptorService
+public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter {
+    private static final Logger LOG = LogUtils.getL7dLogger(DefaultProtocolInterceptor.class);
+
+    private static final String REQUEST_DISPATCHED = "request.dispatched";
+    private static final String RESPONSE_PARENT = "response.parent";
+
+    private final AsyncIOInterceptor interceptor = new Interceptor();
+
+    @Override
+    public Action inspect(final AtmosphereResource r) {
+        LOG.log(Level.FINE, "inspect");
+        AtmosphereRequest request = r.getRequest();
+
+        if (request.getAttribute(REQUEST_DISPATCHED) == null) {
+            AtmosphereResponse response = new WrappedAtmosphereResponse(r.getResponse(), request);
+
+            AtmosphereFramework framework = r.getAtmosphereConfig().framework();
+            try {
+                byte[] data = WebSocketUtils.readBody(request.getInputStream());
+                if (data.length == 0) {
+                    return Action.CANCELLED;
+                }
+                
+                if (LOG.isLoggable(Level.INFO)) {
+                    LOG.log(Level.INFO, "inspecting data {0}", new String(data));
+                }
+                try {
+                    AtmosphereRequest ar = createAtmosphereRequest(request, data);
+                    ar.setAttribute(REQUEST_DISPATCHED, "true");
+                    String refid = ar.getHeader(WebSocketConstants.DEFAULT_REQUEST_ID_KEY);
+                    if (refid != null) {
+                        ar.setAttribute(WebSocketConstants.DEFAULT_REQUEST_ID_KEY, refid);
+                    }
+                    // This is a new request, we must clean the Websocket AtmosphereResource.
+                    request.removeAttribute(FrameworkConfig.INJECTED_ATMOSPHERE_RESOURCE);
+                    response.request(ar);
+                    attachWriter(r);
+
+                    Action action = framework.doCometSupport(ar, response);
+                    if (action.type() == Action.TYPE.SUSPEND) {
+                        ar.destroyable(false);
+                        response.destroyable(false);
+                    }
+                } catch (Exception e) {
+                    LOG.log(Level.WARNING, "Error during request dispatching", e);
+                    if (e instanceof InvalidPathException) {
+                        response.setStatus(400);
+                    } else {
+                        response.setStatus(500);
+                    }
+                    response.getOutputStream().write(createResponse(response, null, true));
+                }
+                return Action.CANCELLED;
+            } catch (IOException e) {
+                LOG.log(Level.WARNING, "Error during protocol processing", e);
+                return Action.CONTINUE;
+            }           
+        } else {
+            request.setAttribute(REQUEST_DISPATCHED, null);
+            request.setAttribute(RESPONSE_PARENT, null);
+            request.destroyable(false);
+        }
+        return Action.CONTINUE;
+    }
+
+    private void attachWriter(final AtmosphereResource r) {
+        AtmosphereResponse res = r.getResponse();
+        AsyncIOWriter writer = res.getAsyncIOWriter();
+
+        if (writer instanceof AtmosphereInterceptorWriter) {
+            //REVIST need a better way to add a custom filter at the first entry and not at the last as
+            // e.g. interceptor(AsyncIOInterceptor interceptor, int position)
+            Deque<AsyncIOInterceptor> filters = AtmosphereInterceptorWriter.class.cast(writer).filters();
+            if (!filters.contains(interceptor)) {
+                filters.addFirst(interceptor);
+            }
+        }
+    }
+
+    private static AtmosphereRequest createAtmosphereRequest(AtmosphereRequest r, byte[] data) throws IOException {
+        AtmosphereRequest.Builder b = new AtmosphereRequest.Builder();
+        ByteArrayInputStream in = new ByteArrayInputStream(data);
+        Map<String, String> hdrs = WebSocketUtils.readHeaders(in);
+        String path = hdrs.get(WebSocketUtils.URI_KEY);
+        String origin = r.getRequestURI();
+        if (!path.startsWith(origin)) {
+            LOG.log(Level.WARNING, "invalid path: {0} not within {1}", new Object[]{path, origin});
+            throw new InvalidPathException();
+        }
+
+        String requestURI = path;
+        String requestURL = r.getRequestURL() + requestURI.substring(r.getRequestURI().length());
+        String contentType = hdrs.get("Content-Type");
+        
+        String method = hdrs.get(WebSocketUtils.METHOD_KEY);
+        b.pathInfo(path)
+                .contentType(contentType)
+                .headers(hdrs)
+                .method(method)
+                .requestURI(requestURI)
+                .requestURL(requestURL)
+                .request(r);
+        // add the body only if it is present
+        byte[] body = WebSocketUtils.readBody(in);
+        if (body.length > 0) {
+            b.body(body);
+        }
+        return b.build();
+    }
+
+    private final class Interceptor extends AsyncIOInterceptorAdapter {
+
+        @Override
+        public byte[] transformPayload(AtmosphereResponse response, byte[] responseDraft, byte[] data) 
+            throws IOException {
+            if (LOG.isLoggable(Level.INFO)) {
+                LOG.log(Level.INFO, "transformPayload with draft={0}", new String(responseDraft));
+            }
+            AtmosphereRequest request = response.request();
+            if (request.getAttribute(RESPONSE_PARENT) == null) {
+                request.setAttribute(RESPONSE_PARENT, "true");
+                return createResponse(response, responseDraft, true);
+            } else {
+                return createResponse(response, responseDraft, false);
+            }
+        }
+
+        @Override
+        public byte[] error(AtmosphereResponse response, int statusCode, String reasonPhrase) {
+            if (LOG.isLoggable(Level.INFO)) {
+                LOG.log(Level.INFO, "status={0}", statusCode);
+            }
+            response.setStatus(statusCode, reasonPhrase);
+            return createResponse(response, null, true);
+        }
+    }
+
+    private static byte[] createResponse(AtmosphereResponse response, byte[] payload, boolean parent) {
+        AtmosphereRequest request = response.request();
+        String refid = (String)request.getAttribute(WebSocketConstants.DEFAULT_REQUEST_ID_KEY);
+
+        Map<String, String> headers = new HashMap<String, String>();
+        if (refid != null) {
+            response.addHeader(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, refid);
+            headers.put(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, refid);
+        }
+        if (parent) {
+            headers.put(WebSocketUtils.SC_KEY, Integer.toString(response.getStatus()));
+            if (payload != null && payload.length > 0) {
+                headers.put("Content-Type",  response.getContentType());
+            }
+        }
+        return WebSocketUtils.buildResponse(headers, payload, 0, payload == null ? 0 : payload.length);
+    }
+
+    // a workaround to flush the header data upon close when no write operation occurs  
+    private static class WrappedAtmosphereResponse extends AtmosphereResponse {
+        public WrappedAtmosphereResponse(AtmosphereResponse resp, AtmosphereRequest req) {
+            super((HttpServletResponse)resp.getResponse(), resp.getAsyncIOWriter(), req, resp.isDestroyable());
+        }
+
+        @Override
+        public ServletOutputStream getOutputStream() throws IOException {
+            final ServletOutputStream delegate = super.getOutputStream();
+            return new ServletOutputStream() {
+                private boolean written;
+
+                @Override
+                public void write(int i) throws IOException {
+                    written = true;
+                    delegate.write(i);
+                }
+
+                @Override
+                public void close() throws IOException {
+                    if (!written) {
+                        delegate.write(createResponse(WrappedAtmosphereResponse.this, null, true));
+                    }
+                    delegate.close();
+                }
+
+                @Override
+                public void flush() throws IOException {
+                    delegate.flush();
+                }
+
+                @Override
+                public void write(byte[] b, int off, int len) throws IOException {
+                    written = true;
+                    delegate.write(b, off, len);
+                }
+
+                @Override
+                public void write(byte[] b) throws IOException {
+                    written = true;
+                    delegate.write(b);
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
index 6ae3c9f..f48efb5 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
@@ -41,9 +41,6 @@ import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.transport.websocket.InvalidPathException;
 import org.apache.cxf.transport.websocket.WebSocketConstants;
-import org.apache.cxf.transport.websocket.WebSocketServletHolder;
-import org.apache.cxf.transport.websocket.WebSocketVirtualServletRequest;
-import org.apache.cxf.transport.websocket.WebSocketVirtualServletResponse;
 import org.eclipse.jetty.websocket.WebSocket;
 
 class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessage {

http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketServletHolder.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketServletHolder.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketServletHolder.java
new file mode 100644
index 0000000..44eb7b8
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketServletHolder.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.jetty;
+
+import java.io.IOException;
+import java.security.Principal;
+import java.util.Enumeration;
+import java.util.Locale;
+
+import javax.servlet.DispatcherType;
+import javax.servlet.ServletContext;
+
+/**
+ * 
+ */
+interface WebSocketServletHolder {
+    String getAuthType();
+    String getContextPath();
+    String getLocalAddr();
+    String getLocalName();
+    int getLocalPort();
+    Locale getLocale();
+    Enumeration<Locale> getLocales();
+    String getProtocol();
+    String getRemoteAddr();
+    String getRemoteHost();
+    int getRemotePort();
+    String getRequestURI();
+    StringBuffer getRequestURL();
+    DispatcherType getDispatcherType();
+    boolean isSecure();
+    String getPathInfo();
+    String getPathTranslated();
+    String getScheme();
+    String getServerName();
+    String getServletPath();
+    ServletContext getServletContext();
+    int getServerPort();
+    Principal getUserPrincipal();
+    Object getAttribute(String name);
+    void write(byte[] data, int offset, int length) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletRequest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletRequest.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletRequest.java
new file mode 100644
index 0000000..e2b3c33
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletRequest.java
@@ -0,0 +1,527 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.jetty;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.security.Principal;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.DispatcherType;
+import javax.servlet.RequestDispatcher;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+import javax.servlet.http.Part;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.websocket.InvalidPathException;
+import org.apache.cxf.transport.websocket.WebSocketUtils;
+
+/**
+ * 
+ */
+class WebSocketVirtualServletRequest implements HttpServletRequest {
+    private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletRequest.class);
+
+    private WebSocketServletHolder webSocketHolder;
+    private InputStream in;
+    private Map<String, String> requestHeaders;
+    private Map<String, Object> attributes;
+    
+    public WebSocketVirtualServletRequest(WebSocketServletHolder websocket, InputStream in) 
+        throws IOException {
+        this.webSocketHolder = websocket;
+        this.in = in;
+
+        this.requestHeaders = WebSocketUtils.readHeaders(in);
+        String path = requestHeaders.get(WebSocketUtils.URI_KEY);
+        String origin = websocket.getRequestURI();
+        if (!path.startsWith(origin)) {
+            LOG.log(Level.WARNING, "invalid path: {0} not within {1}", new Object[]{path, origin});
+            throw new InvalidPathException();
+        }
+        this.attributes = new TreeMap<String, Object>(String.CASE_INSENSITIVE_ORDER);
+        Object v = websocket.getAttribute("org.apache.cxf.transport.endpoint.address");
+        if (v != null) {
+            attributes.put("org.apache.cxf.transport.endpoint.address", v);
+        }
+    }
+
+    @Override
+    public AsyncContext getAsyncContext() {
+        return null;
+    }
+
+    @Override
+    public Object getAttribute(String name) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "getAttribute({0}) -> {1}", new Object[] {name , attributes.get(name)});
+        }
+        return attributes.get(name);
+    }
+
+    @Override
+    public Enumeration<String> getAttributeNames() {
+        LOG.log(Level.FINE, "getAttributeNames()");
+        return Collections.enumeration(attributes.keySet());
+    }
+
+    @Override
+    public String getCharacterEncoding() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.FINE, "getCharacterEncoding()");
+        return null;
+    }
+
+    @Override
+    public int getContentLength() {
+        LOG.log(Level.FINE, "getContentLength()");
+        return 0;
+    }
+
+    @Override
+    public String getContentType() {
+        LOG.log(Level.FINE, "getContentType()");
+        return requestHeaders.get("Content-Type");
+    }
+
+    @Override
+    public DispatcherType getDispatcherType() {
+        LOG.log(Level.FINE, "getDispatcherType()");
+        return webSocketHolder.getDispatcherType();
+    }
+
+    @Override
+    public ServletInputStream getInputStream() throws IOException {
+        return new ServletInputStream() {
+            @Override
+            public int read() throws IOException {
+                return in.read();
+            }
+
+            @Override
+            public int read(byte[] b, int off, int len) throws IOException {
+                return in.read(b, off, len);
+            }
+        };
+    }
+
+    @Override
+    public String getLocalAddr() {
+        LOG.log(Level.FINE, "getLocalAddr()");
+        return webSocketHolder.getLocalAddr();
+    }
+
+    @Override
+    public String getLocalName() {
+        LOG.log(Level.FINE, "getLocalName()");
+        return webSocketHolder.getLocalName();
+    }
+
+    @Override
+    public int getLocalPort() {
+        LOG.log(Level.FINE, "getLocalPort()");
+        return webSocketHolder.getLocalPort();
+    }
+
+    @Override
+    public Locale getLocale() {
+        LOG.log(Level.FINE, "getLocale()");
+        return webSocketHolder.getLocale();
+    }
+
+    @Override
+    public Enumeration<Locale> getLocales() {
+        LOG.log(Level.FINE, "getLocales()");
+        return webSocketHolder.getLocales();
+    }
+
+    @Override
+    public String getParameter(String name) {
+        // TODO Auto-generated method stub
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "getParameter({0})", name);
+        }
+        return null;
+    }
+
+    @Override
+    public Map<String, String[]> getParameterMap() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.FINE, "getParameterMap()");
+        return null;
+    }
+
+    @Override
+    public Enumeration<String> getParameterNames() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.FINE, "getParameterNames()");
+        return null;
+    }
+
+    @Override
+    public String[] getParameterValues(String name) {
+        // TODO Auto-generated method stub
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "getParameterValues({0})", name);
+        }
+        return null;
+    }
+
+    @Override
+    public String getProtocol() {
+        LOG.log(Level.FINE, "getProtocol");
+        return webSocketHolder.getProtocol();
+    }
+
+    @Override
+    public BufferedReader getReader() throws IOException {
+        LOG.log(Level.FINE, "getReader");
+        return new BufferedReader(new InputStreamReader(in, "utf-8"));
+    }
+
+    @Override
+    public String getRealPath(String path) {
+        // TODO Auto-generated method stub
+        LOG.log(Level.FINE, "getRealPath");
+        return null;
+    }
+
+    @Override
+    public String getRemoteAddr() {
+        LOG.log(Level.FINE, "getRemoteAddr");
+        return webSocketHolder.getRemoteAddr();
+    }
+
+    @Override
+    public String getRemoteHost() {
+        LOG.log(Level.FINE, "getRemoteHost");
+        return webSocketHolder.getRemoteHost();
+    }
+
+    @Override
+    public int getRemotePort() {
+        LOG.log(Level.FINE, "getRemotePort");
+        return webSocketHolder.getRemotePort();
+    }
+
+    @Override
+    public RequestDispatcher getRequestDispatcher(String path) {
+        // TODO Auto-generated method stub
+        LOG.log(Level.FINE, "getRequestDispatcher");
+        return null;
+    }
+
+    @Override
+    public String getScheme() {
+        LOG.log(Level.FINE, "getScheme");
+        return webSocketHolder.getScheme();
+    }
+
+    @Override
+    public String getServerName() {
+        return webSocketHolder.getServerName();
+    }
+
+    @Override
+    public int getServerPort() {
+        LOG.log(Level.FINE, "getServerPort");
+        return webSocketHolder.getServerPort();
+    }
+
+    @Override
+    public ServletContext getServletContext() {
+        LOG.log(Level.FINE, "getServletContext");
+        return webSocketHolder.getServletContext();
+    }
+
+    @Override
+    public boolean isAsyncStarted() {
+        LOG.log(Level.FINE, "isAsyncStarted");
+        return false;
+    }
+
+    @Override
+    public boolean isAsyncSupported() {
+        LOG.log(Level.FINE, "isAsyncSupported");
+        return false;
+    }
+
+    @Override
+    public boolean isSecure() {
+        LOG.log(Level.FINE, "isSecure");
+        return webSocketHolder.isSecure();
+    }
+
+    @Override
+    public void removeAttribute(String name) {
+        LOG.log(Level.FINE, "removeAttribute");
+        attributes.remove(name);
+    }
+
+    @Override
+    public void setAttribute(String name, Object o) {
+        LOG.log(Level.FINE, "setAttribute");
+        attributes.put(name,  o);
+    }
+
+    @Override
+    public void setCharacterEncoding(String env) throws UnsupportedEncodingException {
+        LOG.log(Level.FINE, "setCharacterEncoding");
+        // ignore as we stick to utf-8.
+    }
+
+    @Override
+    public AsyncContext startAsync() {
+        LOG.log(Level.FINE, "startAsync");
+        return null;
+    }
+
+    @Override
+    public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse) {
+        // TODO Auto-generated method stub
+        LOG.log(Level.FINE, "startAsync");
+        return null;
+    }
+
+    @Override
+    public boolean authenticate(HttpServletResponse servletResponse) throws IOException, ServletException {
+        // TODO Auto-generated method stub
+        LOG.log(Level.FINE, "authenticate");
+        return false;
+    }
+
+    @Override
+    public String getAuthType() {
+        LOG.log(Level.FINE, "getAuthType");
+        return webSocketHolder.getAuthType();
+    }
+
+    @Override
+    public String getContextPath() {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "getContextPath -> " + webSocketHolder.getContextPath());
+        }
+        return webSocketHolder.getContextPath();
+    }
+
+    @Override
+    public Cookie[] getCookies() {
+        LOG.log(Level.FINE, "getCookies");
+        return null;
+    }
+
+    @Override
+    public long getDateHeader(String name) {
+        LOG.log(Level.FINE, "getDateHeader");
+        return 0;
+    }
+
+    @Override
+    public String getHeader(String name) {
+        LOG.log(Level.FINE, "getHeader");
+        return requestHeaders.get(name);
+    }
+
+    @Override
+    public Enumeration<String> getHeaderNames() {
+        LOG.log(Level.FINE, "getHeaderNames");
+        return Collections.enumeration(requestHeaders.keySet());
+    }
+
+    @Override
+    public Enumeration<String> getHeaders(String name) {
+        LOG.log(Level.FINE, "getHeaders");
+        // our protocol assumes no multiple headers
+        return Collections.enumeration(Arrays.asList(requestHeaders.get(name)));
+    }
+
+    @Override
+    public int getIntHeader(String name) {
+        LOG.log(Level.FINE, "getIntHeader");
+        String v = requestHeaders.get(name);
+        return v == null ? -1 : Integer.parseInt(v);
+    }
+
+    @Override
+    public String getMethod() {
+        LOG.log(Level.FINE, "getMethod");
+        return requestHeaders.get(WebSocketUtils.METHOD_KEY);
+    }
+
+    @Override
+    public Part getPart(String name) throws IOException, ServletException {
+        LOG.log(Level.FINE, "getPart");
+        return null;
+    }
+
+    @Override
+    public Collection<Part> getParts() throws IOException, ServletException {
+        LOG.log(Level.FINE, "getParts");
+        return null;
+    }
+
+    @Override
+    public String getPathInfo() {
+        String uri = requestHeaders.get(WebSocketUtils.URI_KEY);
+        String servletpath = webSocketHolder.getServletPath(); 
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "getPathInfo " + servletpath + " " + uri);
+        }
+        //TODO remove the query string part
+        //REVISIT may cache this value in requstHeaders?
+        return uri.substring(servletpath.length());
+    }
+
+    @Override
+    public String getPathTranslated() {
+        String path = getPathInfo();
+        String opathtrans = webSocketHolder.getPathTranslated();
+        // some container may choose not to return this value
+        if (opathtrans == null) {
+            return null;
+        }
+        String opathinfo = webSocketHolder.getPathInfo();
+        LOG.log(Level.FINE, "getPathTranslated " + path + " " + opathinfo);
+        int pos = opathtrans.indexOf(opathinfo);
+        //REVISIT may cache this value in requstHeaders?
+        return new StringBuilder().append(opathtrans.substring(0, pos)).append(path).toString();
+    }
+
+    @Override
+    public String getQueryString() {
+        LOG.log(Level.FINE, "getQueryString");
+        return null;
+    }
+
+    @Override
+    public String getRemoteUser() {
+        LOG.log(Level.FINE, "getRemoteUser");
+        return null;
+    }
+
+    @Override
+    public String getRequestURI() {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "getRequestURI " + requestHeaders.get(WebSocketUtils.URI_KEY));
+        }
+        return requestHeaders.get(WebSocketUtils.URI_KEY);
+    }
+
+    @Override
+    public StringBuffer getRequestURL() {
+        StringBuffer sb = webSocketHolder.getRequestURL();
+        String ouri = webSocketHolder.getRequestURI();
+        String uri = getRequestURI();
+        sb.append(uri.substring(ouri.length()));
+        LOG.log(Level.FINE, "getRequestURL " + uri);
+        return sb;
+    }
+
+    @Override
+    public String getRequestedSessionId() {
+        LOG.log(Level.FINE, "getRequestedSessionId");
+        return null;
+    }
+
+    @Override
+    public String getServletPath() {
+        LOG.log(Level.FINE, "getServletPath " + webSocketHolder.getServletPath());
+        return webSocketHolder.getServletPath();
+    }
+
+    @Override
+    public HttpSession getSession() {
+        LOG.log(Level.FINE, "getSession");
+        return null;
+    }
+
+    @Override
+    public HttpSession getSession(boolean create) {
+        LOG.log(Level.FINE, "getSession");
+        return null;
+    }
+
+    @Override
+    public Principal getUserPrincipal() {
+        LOG.log(Level.FINE, "getUserPrincipal");
+        return webSocketHolder.getUserPrincipal();
+    }
+
+    @Override
+    public boolean isRequestedSessionIdFromCookie() {
+        LOG.log(Level.FINE, "isRequestedSessionIdFromCookie");
+        return false;
+    }
+
+    @Override
+    public boolean isRequestedSessionIdFromURL() {
+        LOG.log(Level.FINE, "isRequestedSessionIdFromURL");
+        return false;
+    }
+
+    @Override
+    public boolean isRequestedSessionIdFromUrl() {
+        LOG.log(Level.FINE, "isRequestedSessionIdFromUrl");
+        return false;
+    }
+
+    @Override
+    public boolean isRequestedSessionIdValid() {
+        LOG.log(Level.FINE, "isRequestedSessionIdValid");
+        return false;
+    }
+
+    @Override
+    public boolean isUserInRole(String role) {
+        LOG.log(Level.FINE, "isUserInRole");
+        return false;
+    }
+
+    @Override
+    public void login(String username, String password) throws ServletException {
+        LOG.log(Level.FINE, "login");
+        
+    }
+
+    @Override
+    public void logout() throws ServletException {
+        LOG.log(Level.FINE, "logout");
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletResponse.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletResponse.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletResponse.java
new file mode 100644
index 0000000..c736861
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletResponse.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.jetty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
+import org.apache.cxf.transport.websocket.WebSocketUtils;
+
+/**
+ * 
+ */
+class WebSocketVirtualServletResponse implements HttpServletResponse {
+    private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletResponse.class);
+    private WebSocketServletHolder webSocketHolder;
+    private Map<String, String> responseHeaders;
+    private ServletOutputStream outputStream;
+
+    public WebSocketVirtualServletResponse(WebSocketServletHolder websocket) {
+        this.webSocketHolder = websocket;
+        this.responseHeaders = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+        this.outputStream = createOutputStream();
+    }
+
+    @Override
+    public void flushBuffer() throws IOException {
+        LOG.log(Level.FINE, "flushBuffer()");
+        outputStream.flush();
+    }
+
+    @Override
+    public int getBufferSize() {
+        LOG.log(Level.FINE, "getBufferSize()");
+        return 0;
+    }
+
+    @Override
+    public String getCharacterEncoding() {
+        LOG.log(Level.FINE, "getCharacterEncoding()");
+        return null;
+    }
+
+    @Override
+    public String getContentType() {
+        LOG.log(Level.FINE, "getContentType()");
+        return responseHeaders.get("Content-Type");
+    }
+
+    @Override
+    public Locale getLocale() {
+        LOG.log(Level.FINE, "getLocale");
+        return null;
+    }
+
+    @Override
+    public ServletOutputStream getOutputStream() throws IOException {
+        return outputStream;
+    }
+
+    @Override
+    public PrintWriter getWriter() throws IOException {
+        LOG.log(Level.FINE, "getWriter()");
+        return new PrintWriter(getOutputStream());
+    }
+
+    @Override
+    public boolean isCommitted() {
+        return false;
+    }
+
+    @Override
+    public void reset() {
+    }
+
+    @Override
+    public void resetBuffer() {
+        LOG.log(Level.FINE, "resetBuffer()");
+    }
+
+    @Override
+    public void setBufferSize(int size) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "setBufferSize({0})", size);
+        }
+    }
+
+    @Override
+    public void setCharacterEncoding(String charset) {
+        // TODO 
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "setCharacterEncoding({0})", charset);
+        }
+    }
+
+    @Override
+    public void setContentLength(int len) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "setContentLength({0})", len);
+        }
+        responseHeaders.put("Content-Length", Integer.toString(len));
+    }
+
+    @Override
+    public void setContentType(String type) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "setContentType({0})", type);
+        }
+        responseHeaders.put("Content-Type", type);
+    }
+
+    @Override
+    public void setLocale(Locale loc) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "setLocale({0})", loc);
+        }
+    }
+
+    @Override
+    public void addCookie(Cookie cookie) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "addCookie({0})", cookie);
+        }
+    }
+
+    @Override
+    public void addDateHeader(String name, long date) {
+        // TODO
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "addDateHeader({0}, {1})", new Object[]{name, date});
+        }
+    }
+
+    @Override
+    public void addHeader(String name, String value) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "addHeader({0}, {1})", new Object[]{name, value});
+        }
+        responseHeaders.put(name, value);
+    }
+
+    @Override
+    public void addIntHeader(String name, int value) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "addIntHeader({0}, {1})", new Object[]{name, value});
+        }
+        responseHeaders.put(name, Integer.toString(value));
+    }
+
+    @Override
+    public boolean containsHeader(String name) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "containsHeader({0})", name);
+        }
+        return responseHeaders.containsKey(name);
+    }
+
+    @Override
+    public String encodeRedirectURL(String url) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "encodeRedirectURL({0})", url);
+        }
+        return null;
+    }
+
+    @Override
+    public String encodeRedirectUrl(String url) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "encodeRedirectUrl({0})", url);
+        }
+        return null;
+    }
+
+    @Override
+    public String encodeURL(String url) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "encodeURL({0})", url);
+        }
+        return null;
+    }
+
+    @Override
+    public String encodeUrl(String url) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "encodeUrl({0})", url);
+        }
+        return null;
+    }
+
+    @Override
+    public String getHeader(String name) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "getHeader({0})", name);
+        }
+        return null;
+    }
+
+    @Override
+    public Collection<String> getHeaderNames() {
+        LOG.log(Level.FINE, "getHeaderNames()");
+        return null;
+    }
+
+    @Override
+    public Collection<String> getHeaders(String name) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "getHeaders({0})", name);
+        }
+        return null;
+    }
+
+    @Override
+    public int getStatus() {
+        LOG.log(Level.FINE, "getStatus()");
+        String v = responseHeaders.get(WebSocketUtils.SC_KEY);
+        return v == null ? 200 : Integer.parseInt(v);
+    }
+
+    @Override
+    public void sendError(int sc) throws IOException {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "sendError{0}", sc);
+        }
+        responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+    }
+
+    @Override
+    public void sendError(int sc, String msg) throws IOException {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "sendError({0}, {1})", new Object[]{sc, msg});
+        }
+        responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+    }
+
+    @Override
+    public void sendRedirect(String location) throws IOException {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "sendRedirect({0})", location);
+        }
+    }
+
+    @Override
+    public void setDateHeader(String name, long date) {
+        // ignore
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "setDateHeader({0}, {1})", new Object[]{name, date});
+        }
+    }
+
+    @Override
+    public void setHeader(String name, String value) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "setHeader({0}, {1})", new Object[]{name, value});
+        }
+        responseHeaders.put(name, value);
+    }
+
+    @Override
+    public void setIntHeader(String name, int value) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "setIntHeader({0}, {1})", new Object[]{name, value});
+        }
+    }
+
+    @Override
+    public void setStatus(int sc) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "setStatus({0})", sc);
+        }
+        responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+    }
+
+    @Override
+    public void setStatus(int sc, String sm) {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, "setStatus({0}, {1})", new Object[]{sc, sm});
+        }
+        responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+    }
+
+    private ServletOutputStream createOutputStream() {
+        //REVISIT
+        // This output buffering is needed as the server side websocket does
+        // not support the fragment transmission mode when sending back a large data.
+        // And this buffering is only used for the response for the initial service innovation.
+        // For the subsequently pushed data to the socket are sent back
+        // unbuffered as individual websocket messages.
+        // the things to consider :
+        // - provide a size limit if we are use this buffering
+        // - add a chunking mode in the cxf websocket's binding.
+        return new ServletOutputStream() {
+            private InternalByteArrayOutputStream buffer = new InternalByteArrayOutputStream();
+
+            @Override
+            public void write(int b) throws IOException {
+                byte[] data = new byte[1];
+                data[0] = (byte)b;
+                write(data, 0, 1);
+            }
+
+            @Override
+            public void write(byte[] data) throws IOException {
+                write(data, 0, data.length);
+            }
+
+            @Override
+            public void write(byte[] data, int offset, int length) throws IOException {
+                if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) {
+                    // buffer the data until it gets flushed for the first time
+                    buffer.write(data, offset, length);
+                } else {
+                    // unbuffered write to the socket
+                    String respid = responseHeaders.get(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY);
+                    byte[] headers = respid != null 
+                        ? WebSocketUtils.buildHeaderLine(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, respid) : null;
+                    data = WebSocketUtils.buildResponse(headers, data, offset, length);
+                    webSocketHolder.write(data, 0, data.length);
+                }
+            }
+
+            @Override
+            public void close() throws IOException {
+                if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) {
+                    byte[] data = WebSocketUtils.buildResponse(responseHeaders, buffer.getBytes(), 0, buffer.size());
+                    webSocketHolder.write(data, 0, data.length);
+                    responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true");
+                }
+                super.close();
+            }
+        };
+    }
+
+    private static class InternalByteArrayOutputStream extends ByteArrayOutputStream {
+        public byte[] getBytes() {
+            return buf;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
index 09e61c2..9795e1c 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
@@ -300,7 +300,15 @@ class WebSocketTestClient {
         }
 
         private int length(Object o) {
-            return o instanceof char[] ? ((String)o).length() : (o instanceof byte[] ? ((byte[])o).length : 0);
+            if (o instanceof String) {
+                return ((String)o).length();
+            } else if (o instanceof char[]) {
+                return ((char[])o).length;
+            } else if (o instanceof byte[]) {
+                return ((byte[])o).length;
+            } else {
+                return 0;
+            }
         }
 
         private int getchar(Object o, int p) {


Mime
View raw message