cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [2/2] git commit: [CXF-5604] Add atmosphere based implementation in websocket transport
Date Wed, 12 Mar 2014 19:08:49 GMT
[CXF-5604] Add atmosphere based implementation in websocket transport


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/2d8264af
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/2d8264af
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/2d8264af

Branch: refs/heads/master
Commit: 2d8264aff8c1b26d60f6d4dee3bcb572927cc81a
Parents: 6069d72
Author: Akitoshi Yoshida <ay@apache.org>
Authored: Wed Mar 12 20:05:38 2014 +0100
Committer: Akitoshi Yoshida <ay@apache.org>
Committed: Wed Mar 12 20:08:11 2014 +0100

----------------------------------------------------------------------
 parent/pom.xml                                  |   1 +
 rt/transports/websocket/pom.xml                 |   7 +-
 .../websocket/WebSocketDestinationFactory.java  |  29 +-
 .../websocket/WebSocketDestinationService.java  |  36 +
 .../websocket/WebSocketServletHolder.java       |  58 ++
 .../cxf/transport/websocket/WebSocketUtils.java | 144 +++
 .../WebSocketVirtualServletRequest.java         | 532 ++++++++++
 .../WebSocketVirtualServletResponse.java        | 379 +++++++
 .../atmosphere/AtmosphereWebSocketHandler.java  | 264 +++++
 .../atmosphere/AtmosphereWebSocketHandler.java~ | 293 ++++++
 .../AtmosphereWebSocketServletDestination.java  |  87 ++
 .../AtmosphereWebSocketStreamHandler.java       |  73 ++
 .../websocket/jetty/JettyWebSocket.java         | 980 ++-----------------
 .../jetty/JettyWebSocketDestination.java        |  13 +-
 .../websocket/jetty/JettyWebSocketHandler.java  |   1 -
 .../websocket/jetty/JettyWebSocketManager.java  |   3 +-
 .../jetty/JettyWebSocketServletDestination.java |  10 +-
 .../jetty/JettyWebSocketManagerTest.java        |  10 +-
 18 files changed, 2008 insertions(+), 912 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index b250214..32fb08f 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -74,6 +74,7 @@
         <cxf.abdera.version>1.1.3</cxf.abdera.version>
         <cxf.activemq.version>5.9.0</cxf.activemq.version>
         <cxf.ahc.version>1.8.1</cxf.ahc.version>
+        <cxf.atmosphere.version>2.1.0</cxf.atmosphere.version>
         <cxf.axiom.version>1.2.14</cxf.axiom.version>
         <cxf.bcprov.version>1.50</cxf.bcprov.version>
         <cxf.cglib.bundle.version>2.2_2</cxf.cglib.bundle.version>

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/pom.xml
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/pom.xml b/rt/transports/websocket/pom.xml
index 346c32e..616fd25 100644
--- a/rt/transports/websocket/pom.xml
+++ b/rt/transports/websocket/pom.xml
@@ -47,7 +47,6 @@
         </cxf.osgi.dynamic.import>
         <!-- keep in sync with services exported in activator -->
         <cxf.export.service>
-            org.apache.aries.blueprint.NamespaceHandler;osgi.service.blueprint.namespace="http://cxf.apache.org/transports/websocket/configuration"
         </cxf.export.service>
     </properties>
     <dependencies>
@@ -120,6 +119,12 @@
             <optional>true</optional>
         </dependency>
         <dependency>
+            <groupId>org.atmosphere</groupId>
+            <artifactId>atmosphere-runtime</artifactId>
+            <version>${cxf.atmosphere.version}</version>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
             <scope>runtime</scope>

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
index 4c6fdf1..14b5f8f 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
@@ -30,24 +30,44 @@ import org.apache.cxf.transport.http.DestinationRegistry;
 import org.apache.cxf.transport.http.HTTPTransportFactory;
 import org.apache.cxf.transport.http.HttpDestinationFactory;
 import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngineFactory;
+import org.apache.cxf.transport.websocket.atmosphere.AtmosphereWebSocketServletDestination;
 import org.apache.cxf.transport.websocket.jetty.JettyWebSocketDestination;
 import org.apache.cxf.transport.websocket.jetty.JettyWebSocketServletDestination;
 
 @NoJSR250Annotations()
 public class WebSocketDestinationFactory implements HttpDestinationFactory {
-
+    private static final boolean ATMOSPHERE_AVAILABLE = probeClass("org.atmosphere.cpr.ApplicationConfig");
+    
+    private static boolean probeClass(String name) {
+        try {
+            Class.forName(name, true, WebSocketDestinationFactory.class.getClassLoader());
+            return true;
+        } catch (Throwable t) {
+            return false;
+        }
+    }
+    
     public AbstractHTTPDestination createDestination(EndpointInfo endpointInfo, Bus bus,
                                                      DestinationRegistry registry) throws IOException {
-
         if (endpointInfo.getAddress().startsWith("ws")) {
-            //TODO for now jetty specific, 
+            // for the embedded mode, we stick with jetty. 
             JettyHTTPServerEngineFactory serverEngineFactory = bus
                 .getExtension(JettyHTTPServerEngineFactory.class);
             return new JettyWebSocketDestination(bus, registry, endpointInfo, serverEngineFactory);
         } else {
             //REVISIT other way of getting the registry of http so that the plain cxf servlet finds the destination?
             registry = getDestinationRegistry(bus);
-            return new JettyWebSocketServletDestination(bus, registry, endpointInfo, endpointInfo.getAddress());
+            
+            // choose atmosphere if available, otherwise assume jetty is available
+            if (ATMOSPHERE_AVAILABLE) {
+                // use atmosphere
+                return new AtmosphereWebSocketServletDestination(bus, registry,
+                                                                 endpointInfo, endpointInfo.getAddress());
+            } else {
+                // use jetty-websocket
+                return new JettyWebSocketServletDestination(bus, registry,
+                                                            endpointInfo, endpointInfo.getAddress());
+            }
         }
     }
 
@@ -65,5 +85,6 @@ public class WebSocketDestinationFactory implements HttpDestinationFactory {
         }
         return null;
     }
+    
 
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationService.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationService.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationService.java
new file mode 100644
index 0000000..701bc00
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationService.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket;
+
+import java.io.IOException;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * 
+ */
+public interface WebSocketDestinationService {
+    void invokeInternal(ServletConfig config, ServletContext context,
+                        HttpServletRequest req, HttpServletResponse resp) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java
new file mode 100644
index 0000000..bd4dea9
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket;
+
+import java.io.IOException;
+import java.security.Principal;
+import java.util.Enumeration;
+import java.util.Locale;
+
+import javax.servlet.DispatcherType;
+import javax.servlet.ServletContext;
+
+/**
+ * 
+ */
+public interface WebSocketServletHolder {
+    String getAuthType();
+    String getContextPath();
+    String getLocalAddr();
+    String getLocalName();
+    int getLocalPort();
+    Locale getLocale();
+    Enumeration<Locale> getLocales();
+    String getProtocol();
+    String getRemoteAddr();
+    String getRemoteHost();
+    int getRemotePort();
+    String getRequestURI();
+    StringBuffer getRequestURL();
+    DispatcherType getDispatcherType();
+    boolean isSecure();
+    String getPathInfo();
+    String getPathTranslated();
+    String getScheme();
+    String getServerName();
+    String getServletPath();
+    ServletContext getServletContext();
+    int getServerPort();
+    Principal getUserPrincipal();
+    void write(byte[] data, int offset, int length) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
new file mode 100644
index 0000000..188caf5
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * 
+ */
+public final class WebSocketUtils {
+    static final String URI_KEY = "$uri";
+    static final String METHOD_KEY = "$method";
+    static final String SC_KEY = "$sc";
+    static final String SM_KEY = "$sm";
+    static final String FLUSHED_KEY = "$flushed";
+    private static final String CRLF = "\r\n";
+    private static final String DEFAULT_SC = "200";
+
+    private WebSocketUtils() {
+    }
+    
+    /**
+     * Read header properties from the specified input stream.
+     *  
+     * Only a restricted syntax is allowed as the syntax is in our control.
+     * Not allowed are:
+     * - multiline or line-wrapped headers are not not
+     * - charset other than utf-8. (although i would have preferred iso-8859-1 ;-)
+     * 
+     * @param in the input stream
+     * @return a map of name value pairs.
+     * @throws IOException
+     */
+    public static Map<String, String> readHeaders(InputStream in) throws IOException {
+        Map<String, String> headers = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+        // read the request line
+        String line = readLine(in);
+        int del = line.indexOf(' ');
+        if (del < 0) {
+            throw new IOException("invalid request: " + line);
+        }
+        headers.put(METHOD_KEY, line.substring(0, del).trim());
+        headers.put(URI_KEY, line.substring(del + 1).trim());
+        
+        // read headers
+        while ((line = readLine(in)) != null) {
+            if (line.length() > 0) {
+                del = line.indexOf(':');
+                if (del < 0) {
+                    headers.put(line.trim(), "");
+                } else {
+                    headers.put(line.substring(0, del).trim(), line.substring(del + 1).trim());
+                }
+            }
+        }
+
+        return headers;
+    }
+
+
+    /**
+     * Read a line terminated by '\n' optionally preceded by '\r' from the 
+     * specified input stream.
+     * @param in the input stream
+     * @return
+     * @throws IOException
+     */
+    // this is copied from AttachmentDeserializer with a minor change to restrict the line termination rule.
+    public static String readLine(InputStream in) throws IOException {
+        StringBuilder buffer = new StringBuilder(128);
+
+        int c;
+
+        while ((c = in.read()) != -1) {
+            // a linefeed is a terminator, always.
+            if (c == '\n') {
+                break;
+            } else if (c == '\r') {
+                //just ignore the CR.  The next character SHOULD be an NL.  If not, we're
+                //just going to discard this
+                continue;
+            } else {
+                // just add to the buffer
+                buffer.append((char)c);
+            }
+        }
+
+        // no characters found...this was either an eof or a null line.
+        if (buffer.length() == 0) {
+            return null;
+        }
+
+        return buffer.toString();
+    }
+
+    public static byte[] buildResponse(Map<String, String> headers, byte[] data, int offset, int length) {
+        StringBuilder sb = new StringBuilder();
+        String v = headers.get(SC_KEY);
+        sb.append(v == null ? DEFAULT_SC : v).append(CRLF);
+        v = headers.get("Content-Type");
+        if (v != null) {
+            sb.append("Content-Type: ").append(v).append(CRLF);
+        }
+        sb.append(CRLF);
+        
+        byte[] hb = sb.toString().getBytes();
+        byte[] longdata = new byte[hb.length + length];
+        System.arraycopy(hb, 0, longdata, 0, hb.length);
+        if (data != null && length > 0) {
+            System.arraycopy(data, offset, longdata, hb.length, length);
+        }
+        return longdata;
+    }
+
+    public static byte[] buildResponse(byte[] data, int offset, int length) {
+        byte[] longdata = new byte[length + 2];
+        longdata[0] = 0x0d;
+        longdata[1] = 0x0a;
+        System.arraycopy(data, offset, longdata, 2, length);
+        return longdata;
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java
new file mode 100644
index 0000000..c9d9e94
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java
@@ -0,0 +1,532 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.security.Principal;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.DispatcherType;
+import javax.servlet.RequestDispatcher;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+import javax.servlet.http.Part;
+
+import org.apache.cxf.common.logging.LogUtils;
+
+/**
+ * 
+ */
+public class WebSocketVirtualServletRequest implements HttpServletRequest {
+    private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletRequest.class);
+
+    private WebSocketServletHolder webSocketHolder;
+    private InputStream in;
+    private Map<String, String> requestHeaders;
+    private Map<String, Object> attributes;
+    
+    public WebSocketVirtualServletRequest(WebSocketServletHolder websocket, InputStream in) 
+        throws IOException {
+        this.webSocketHolder = websocket;
+        this.in = in;
+
+        this.requestHeaders = WebSocketUtils.readHeaders(in);
+        String path = requestHeaders.get(WebSocketUtils.URI_KEY);
+        String origin = websocket.getRequestURI();
+        if (!path.startsWith(origin)) {
+            //REVISIT for now, log it here and reject the request later.  
+            LOG.log(Level.WARNING, "invalid path: {0} not within {1}", new Object[]{path, origin});
+        }
+        this.attributes = new TreeMap<String, Object>(String.CASE_INSENSITIVE_ORDER);
+    }
+
+    @Override
+    public AsyncContext getAsyncContext() {
+        return null;
+    }
+
+    @Override
+    public Object getAttribute(String name) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "getAttribute({0})", name);
+        }
+        return attributes.get(name);
+    }
+
+    @Override
+    public Enumeration<String> getAttributeNames() {
+        LOG.log(Level.INFO, "getAttributeNames()");
+        return Collections.enumeration(attributes.keySet());
+    }
+
+    @Override
+    public String getCharacterEncoding() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "getCharacterEncoding()");
+        return null;
+    }
+
+    @Override
+    public int getContentLength() {
+        LOG.log(Level.INFO, "getContentLength()");
+        return 0;
+    }
+
+    @Override
+    public String getContentType() {
+        LOG.log(Level.INFO, "getContentType()");
+        return requestHeaders.get("Content-Type");
+    }
+
+    @Override
+    public DispatcherType getDispatcherType() {
+        LOG.log(Level.INFO, "getDispatcherType()");
+        return webSocketHolder.getDispatcherType();
+    }
+
+    @Override
+    public ServletInputStream getInputStream() throws IOException {
+        return new ServletInputStream() {
+            @Override
+            public int read() throws IOException {
+                return in.read();
+            }
+
+            @Override
+            public int read(byte[] b, int off, int len) throws IOException {
+                return in.read(b, off, len);
+            }
+        };
+    }
+
+    @Override
+    public String getLocalAddr() {
+        LOG.log(Level.INFO, "getLocalAddr()");
+        return webSocketHolder.getLocalAddr();
+    }
+
+    @Override
+    public String getLocalName() {
+        LOG.log(Level.INFO, "getLocalName()");
+        return webSocketHolder.getLocalName();
+    }
+
+    @Override
+    public int getLocalPort() {
+        LOG.log(Level.INFO, "getLocalPort()");
+        return webSocketHolder.getLocalPort();
+    }
+
+    @Override
+    public Locale getLocale() {
+        LOG.log(Level.INFO, "getLocale()");
+        return webSocketHolder.getLocale();
+    }
+
+    @Override
+    public Enumeration<Locale> getLocales() {
+        LOG.log(Level.INFO, "getLocales()");
+        return webSocketHolder.getLocales();
+    }
+
+    @Override
+    public String getParameter(String name) {
+        // TODO Auto-generated method stub
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "getParameter({0})", name);
+        }
+        return null;
+    }
+
+    @Override
+    public Map<String, String[]> getParameterMap() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "getParameterMap()");
+        return null;
+    }
+
+    @Override
+    public Enumeration<String> getParameterNames() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "getParameterNames()");
+        return null;
+    }
+
+    @Override
+    public String[] getParameterValues(String name) {
+        // TODO Auto-generated method stub
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "getParameterValues({0})", name);
+        }
+        return null;
+    }
+
+    @Override
+    public String getProtocol() {
+        LOG.log(Level.INFO, "getProtocol");
+        return webSocketHolder.getProtocol();
+    }
+
+    @Override
+    public BufferedReader getReader() throws IOException {
+        LOG.log(Level.INFO, "getReader");
+        return new BufferedReader(new InputStreamReader(in, "utf-8"));
+    }
+
+    @Override
+    public String getRealPath(String path) {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "getRealPath");
+        return null;
+    }
+
+    @Override
+    public String getRemoteAddr() {
+        LOG.log(Level.INFO, "getRemoteAddr");
+        return webSocketHolder.getRemoteAddr();
+    }
+
+    @Override
+    public String getRemoteHost() {
+        LOG.log(Level.INFO, "getRemoteHost");
+        return webSocketHolder.getRemoteHost();
+    }
+
+    @Override
+    public int getRemotePort() {
+        LOG.log(Level.INFO, "getRemotePort");
+        return webSocketHolder.getRemotePort();
+    }
+
+    @Override
+    public RequestDispatcher getRequestDispatcher(String path) {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "getRequestDispatcher");
+        return null;
+    }
+
+    @Override
+    public String getScheme() {
+        LOG.log(Level.INFO, "getScheme");
+        return webSocketHolder.getScheme();
+    }
+
+    @Override
+    public String getServerName() {
+        return webSocketHolder.getServerName();
+    }
+
+    @Override
+    public int getServerPort() {
+        LOG.log(Level.INFO, "getServerPort");
+        return webSocketHolder.getServerPort();
+    }
+
+    @Override
+    public ServletContext getServletContext() {
+        LOG.log(Level.INFO, "getServletContext");
+        return webSocketHolder.getServletContext();
+    }
+
+    @Override
+    public boolean isAsyncStarted() {
+        LOG.log(Level.INFO, "isAsyncStarted");
+        return false;
+    }
+
+    @Override
+    public boolean isAsyncSupported() {
+        LOG.log(Level.INFO, "isAsyncSupported");
+        return false;
+    }
+
+    @Override
+    public boolean isSecure() {
+        LOG.log(Level.INFO, "isSecure");
+        return webSocketHolder.isSecure();
+    }
+
+    @Override
+    public void removeAttribute(String name) {
+        LOG.log(Level.INFO, "removeAttribute");
+        attributes.remove(name);
+    }
+
+    @Override
+    public void setAttribute(String name, Object o) {
+        LOG.log(Level.INFO, "setAttribute");
+        attributes.put(name,  o);
+    }
+
+    @Override
+    public void setCharacterEncoding(String env) throws UnsupportedEncodingException {
+        LOG.log(Level.INFO, "setCharacterEncoding");
+        // ignore as we stick to utf-8.
+    }
+
+    @Override
+    public AsyncContext startAsync() {
+        LOG.log(Level.INFO, "startAsync");
+        return null;
+    }
+
+    @Override
+    public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse) {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "startAsync");
+        return null;
+    }
+
+    @Override
+    public boolean authenticate(HttpServletResponse servletResponse) throws IOException, ServletException {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "authenticate");
+        return false;
+    }
+
+    @Override
+    public String getAuthType() {
+        LOG.log(Level.INFO, "getAuthType");
+        return webSocketHolder.getAuthType();
+    }
+
+    @Override
+    public String getContextPath() {
+        LOG.log(Level.INFO, "getContextPath");
+        return webSocketHolder.getContextPath();
+    }
+
+    @Override
+    public Cookie[] getCookies() {
+        LOG.log(Level.INFO, "getCookies");
+        return null;
+    }
+
+    @Override
+    public long getDateHeader(String name) {
+        LOG.log(Level.INFO, "getDateHeader");
+        return 0;
+    }
+
+    @Override
+    public String getHeader(String name) {
+        LOG.log(Level.INFO, "getHeader");
+        return requestHeaders.get(name);
+    }
+
+    @Override
+    public Enumeration<String> getHeaderNames() {
+        LOG.log(Level.INFO, "getHeaderNames");
+        return Collections.enumeration(requestHeaders.keySet());
+    }
+
+    @Override
+    public Enumeration<String> getHeaders(String name) {
+        LOG.log(Level.INFO, "getHeaders");
+        // our protocol assumes no multiple headers
+        return Collections.enumeration(Arrays.asList(requestHeaders.get(name)));
+    }
+
+    @Override
+    public int getIntHeader(String name) {
+        LOG.log(Level.INFO, "getIntHeader");
+        String v = requestHeaders.get(name);
+        return v == null ? -1 : Integer.parseInt(v);
+    }
+
+    @Override
+    public String getMethod() {
+        LOG.log(Level.INFO, "getMethod");
+        return requestHeaders.get(WebSocketUtils.METHOD_KEY);
+    }
+
+    @Override
+    public Part getPart(String name) throws IOException, ServletException {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "getPart");
+        return null;
+    }
+
+    @Override
+    public Collection<Part> getParts() throws IOException, ServletException {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "getParts");
+        return null;
+    }
+
+    @Override
+    public String getPathInfo() {
+        LOG.log(Level.INFO, "getPathInfo");
+        String uri = requestHeaders.get(WebSocketUtils.URI_KEY);
+        String servletpath = webSocketHolder.getServletPath();
+        //TODO remove the query string part
+        //REVISIT may cache this value in requstHeaders?
+        return uri.substring(servletpath.length());
+    }
+
+    @Override
+    public String getPathTranslated() {
+        LOG.log(Level.INFO, "getPathTranslated");
+        String path = getPathInfo();
+        String opathtrans = webSocketHolder.getPathTranslated();
+        String opathinfo = webSocketHolder.getPathInfo();
+        int pos = opathtrans.indexOf(opathinfo);
+        //REVISIT may cache this value in requstHeaders?
+        return new StringBuilder().append(opathtrans.substring(0, pos)).append(path).toString();
+    }
+
+    @Override
+    public String getQueryString() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "getQueryString");
+        return null;
+    }
+
+    @Override
+    public String getRemoteUser() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "getRemoteUser");
+        return null;
+    }
+
+    @Override
+    public String getRequestURI() {
+        LOG.log(Level.INFO, "getRequestURI");
+        return requestHeaders.get(WebSocketUtils.URI_KEY);
+    }
+
+    @Override
+    public StringBuffer getRequestURL() {
+        LOG.log(Level.INFO, "getRequestURL");
+        StringBuffer sb = webSocketHolder.getRequestURL();
+        String ouri = webSocketHolder.getRequestURI();
+        String uri = getRequestURI();
+        //REVISIT the way to reject the requeist uri that does not match the original request
+        if (!uri.startsWith(ouri)) {
+            sb.append("invalid").append(uri);
+        } else {
+            sb.append(uri.substring(ouri.length()));
+        }
+        
+        return sb;
+    }
+
+    @Override
+    public String getRequestedSessionId() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "getRequestedSessionId");
+        return null;
+    }
+
+    @Override
+    public String getServletPath() {
+        LOG.log(Level.INFO, "getServletPath");
+        return webSocketHolder.getServletPath();
+    }
+
+    @Override
+    public HttpSession getSession() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "getSession");
+        return null;
+    }
+
+    @Override
+    public HttpSession getSession(boolean create) {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "getSession");
+        return null;
+    }
+
+    @Override
+    public Principal getUserPrincipal() {
+        LOG.log(Level.INFO, "getUserPrincipal");
+        return webSocketHolder.getUserPrincipal();
+    }
+
+    @Override
+    public boolean isRequestedSessionIdFromCookie() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "isRequestedSessionIdFromCookie");
+        return false;
+    }
+
+    @Override
+    public boolean isRequestedSessionIdFromURL() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "isRequestedSessionIdFromURL");
+        return false;
+    }
+
+    @Override
+    public boolean isRequestedSessionIdFromUrl() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "isRequestedSessionIdFromUrl");
+        return false;
+    }
+
+    @Override
+    public boolean isRequestedSessionIdValid() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "isRequestedSessionIdValid");
+        return false;
+    }
+
+    @Override
+    public boolean isUserInRole(String role) {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "isUserInRole");
+        return false;
+    }
+
+    @Override
+    public void login(String username, String password) throws ServletException {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "login");
+        
+    }
+
+    @Override
+    public void logout() throws ServletException {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "logout");
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
new file mode 100644
index 0000000..7693e02
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.common.logging.LogUtils;
+
+/**
+ * 
+ */
+public class WebSocketVirtualServletResponse implements HttpServletResponse {
+    private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletResponse.class);
+    private WebSocketServletHolder webSocketHolder;
+    private Map<String, String> responseHeaders;
+    private boolean flushed;
+
+    public WebSocketVirtualServletResponse(WebSocketServletHolder websocket) {
+        this.webSocketHolder = websocket;
+        this.responseHeaders = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+    }
+
+    @Override
+    public void flushBuffer() throws IOException {
+        LOG.log(Level.INFO, "flushBuffer()");
+        if (!flushed) {
+            //REVISIT this mechanism to determine if the headers have been flushed
+            if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) {
+                byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0);
+                webSocketHolder.write(data, 0, data.length);
+                responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true");
+            }
+            flushed = true;
+        }
+    }
+
+    @Override
+    public int getBufferSize() {
+        LOG.log(Level.INFO, "getBufferSize()");
+        return 0;
+    }
+
+    @Override
+    public String getCharacterEncoding() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "getCharacterEncoding()");
+        return null;
+    }
+
+    @Override
+    public String getContentType() {
+        LOG.log(Level.INFO, "getContentType()");
+        return responseHeaders.get("Content-Type");
+    }
+
+    @Override
+    public Locale getLocale() {
+        // TODO Auto-generated method stub
+        LOG.log(Level.INFO, "getLocale");
+        return null;
+    }
+
+    @Override
+    public ServletOutputStream getOutputStream() throws IOException {
+        return new ServletOutputStream() {
+
+            @Override
+            public void write(int b) throws IOException {
+                byte[] data = new byte[1];
+                data[0] = (byte)b;
+                write(data, 0, 1);
+            }
+
+            @Override
+            public void write(byte[] data, int offset, int length) throws IOException {
+                if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) {
+                    data = WebSocketUtils.buildResponse(responseHeaders, data, offset, length);
+                    responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true");
+                } else {
+                    data = WebSocketUtils.buildResponse(data, offset, length);
+                }
+                webSocketHolder.write(data, 0, data.length);
+            }
+
+            @Override
+            public void close() throws IOException {
+                if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) {
+                    byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0);
+                    webSocketHolder.write(data, 0, data.length);
+                    responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true");
+                }
+                super.close();
+            }
+            
+        };
+    }
+
+    @Override
+    public PrintWriter getWriter() throws IOException {
+        LOG.log(Level.INFO, "getWriter()");
+        return new PrintWriter(new OutputStream() {
+
+            @Override
+            public void write(int b) throws IOException {
+                byte[] data = new byte[1];
+                data[0] = (byte)b;
+                write(data, 0, 1);
+            }
+            
+            @Override
+            public void write(byte[] data, int offset, int length) throws IOException {
+                if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) {
+                    data = WebSocketUtils.buildResponse(responseHeaders, data, offset, length);
+                    responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true");
+                } else {
+                    data = WebSocketUtils.buildResponse(data, offset, length);
+                }
+                webSocketHolder.write(data, 0, data.length);
+            }
+
+            @Override
+            public void close() throws IOException {
+                if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) {
+                    byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0);
+                    webSocketHolder.write(data, 0, data.length);
+                    responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true");
+                }                
+                super.close();
+            }
+        });
+    }
+
+    @Override
+    public boolean isCommitted() {
+        return false;
+    }
+
+    @Override
+    public void reset() {
+    }
+
+    @Override
+    public void resetBuffer() {
+        LOG.log(Level.INFO, "resetBuffer()");
+    }
+
+    @Override
+    public void setBufferSize(int size) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "setBufferSize({0})", size);
+        }
+    }
+
+    @Override
+    public void setCharacterEncoding(String charset) {
+        // TODO 
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "setCharacterEncoding({0})", charset);
+        }
+    }
+
+    @Override
+    public void setContentLength(int len) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "setContentLength({0})", len);
+        }
+        responseHeaders.put("Content-Length", Integer.toString(len));
+    }
+
+    @Override
+    public void setContentType(String type) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "setContentType({0})", type);
+        }
+        responseHeaders.put("Content-Type", type);
+    }
+
+    @Override
+    public void setLocale(Locale loc) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "setLocale({0})", loc);
+        }
+    }
+
+    @Override
+    public void addCookie(Cookie cookie) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "addCookie({0})", cookie);
+        }
+    }
+
+    @Override
+    public void addDateHeader(String name, long date) {
+        // TODO
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "addDateHeader({0}, {1})", new Object[]{name, date});
+        }
+    }
+
+    @Override
+    public void addHeader(String name, String value) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "addHeader({0}, {1})", new Object[]{name, value});
+        }
+        responseHeaders.put(name, value);
+    }
+
+    @Override
+    public void addIntHeader(String name, int value) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "addIntHeader({0}, {1})", new Object[]{name, value});
+        }
+        responseHeaders.put(name, Integer.toString(value));
+    }
+
+    @Override
+    public boolean containsHeader(String name) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "containsHeader({0})", name);
+        }
+        return responseHeaders.containsKey(name);
+    }
+
+    @Override
+    public String encodeRedirectURL(String url) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "encodeRedirectURL({0})", url);
+        }
+        return null;
+    }
+
+    @Override
+    public String encodeRedirectUrl(String url) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "encodeRedirectUrl({0})", url);
+        }
+        return null;
+    }
+
+    @Override
+    public String encodeURL(String url) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "encodeURL({0})", url);
+        }
+        return null;
+    }
+
+    @Override
+    public String encodeUrl(String url) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "encodeUrl({0})", url);
+        }
+        return null;
+    }
+
+    @Override
+    public String getHeader(String name) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "getHeader({0})", name);
+        }
+        return null;
+    }
+
+    @Override
+    public Collection<String> getHeaderNames() {
+        LOG.log(Level.INFO, "getHeaderNames()");
+        return null;
+    }
+
+    @Override
+    public Collection<String> getHeaders(String name) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "getHeaders({0})", name);
+        }
+        return null;
+    }
+
+    @Override
+    public int getStatus() {
+        LOG.log(Level.INFO, "getStatus()");
+        String v = responseHeaders.get(WebSocketUtils.SC_KEY);
+        return v == null ? 200 : Integer.parseInt(v);
+    }
+
+    @Override
+    public void sendError(int sc) throws IOException {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "sendError{0}", sc);
+        }
+        responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+    }
+
+    @Override
+    public void sendError(int sc, String msg) throws IOException {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "sendError({0}, {1})", new Object[]{sc, msg});
+        }
+        responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+        responseHeaders.put(WebSocketUtils.SM_KEY, msg);
+    }
+
+    @Override
+    public void sendRedirect(String location) throws IOException {
+        // TODO Auto-generated method stub
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "sendRedirect({0})", location);
+        }
+    }
+
+    @Override
+    public void setDateHeader(String name, long date) {
+        // ignore
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "setDateHeader({0}, {1})", new Object[]{name, date});
+        }
+    }
+
+    @Override
+    public void setHeader(String name, String value) {
+        // TODO Auto-generated method stub
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "setHeader({0}, {1})", new Object[]{name, value});
+        }
+    }
+
+    @Override
+    public void setIntHeader(String name, int value) {
+        // TODO Auto-generated method stub
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "setIntHeader({0}, {1})", new Object[]{name, value});
+        }
+    }
+
+    @Override
+    public void setStatus(int sc) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "setStatus({0})", sc);
+        }
+        responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+    }
+
+    @Override
+    public void setStatus(int sc, String sm) {
+        if (LOG.isLoggable(Level.INFO)) {
+            LOG.log(Level.INFO, "setStatus({0}, {1})", new Object[]{sc, sm});
+        }
+        responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+        responseHeaders.put(WebSocketUtils.SM_KEY, sm);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
new file mode 100644
index 0000000..f2f066a
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.atmosphere;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.Principal;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Locale;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.DispatcherType;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.websocket.WebSocketDestinationService;
+import org.apache.cxf.transport.websocket.WebSocketServletHolder;
+import org.apache.cxf.transport.websocket.WebSocketVirtualServletRequest;
+import org.apache.cxf.transport.websocket.WebSocketVirtualServletResponse;
+import org.atmosphere.cpr.AtmosphereConfig;
+import org.atmosphere.cpr.AtmosphereRequest;
+import org.atmosphere.websocket.WebSocket;
+import org.atmosphere.websocket.WebSocketProcessor.WebSocketException;
+import org.atmosphere.websocket.WebSocketProtocol;
+
+/**
+ * 
+ */
+public class AtmosphereWebSocketHandler implements WebSocketProtocol {
+    private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketHandler.class);
+
+    protected AtmosphereWebSocketServletDestination destination;
+    
+    
+    public AtmosphereWebSocketServletDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(AtmosphereWebSocketServletDestination destination) {
+        this.destination = destination;
+    }
+
+    /** {@inheritDoc}*/
+    @Override
+    public void configure(AtmosphereConfig config) {
+        // TODO Auto-generated method stub
+
+    }
+
+    /** {@inheritDoc}*/
+    @Override
+    public List<AtmosphereRequest> onMessage(WebSocket webSocket, String data) {
+        LOG.info("onMessage(WebSocket, String)");
+        return null;
+    }
+
+    /** {@inheritDoc}*/
+    @Override
+    public List<AtmosphereRequest> onMessage(WebSocket webSocket, byte[] data, int offset, int length) {
+        LOG.info("onMessage(WebSocket, byte[], int, int)");
+        
+        try {
+            WebSocketServletHolder webSocketHolder = new AtmosphereWebSocketServletHolder(webSocket);
+            HttpServletRequest request = createServletRequest(webSocketHolder, data, offset, length);
+            HttpServletResponse response = createServletResponse(webSocketHolder);
+            if (destination != null) {
+                ((WebSocketDestinationService)destination).invokeInternal(null, 
+                    webSocket.resource().getRequest().getServletContext(),
+                    request, response);
+            }
+        } catch (Exception e) {
+            LOG.log(Level.WARNING, "Failed to invoke service", e);
+        }
+        return null;
+    }
+
+    /** {@inheritDoc}*/
+    @Override
+    public void onOpen(WebSocket webSocket) {
+        LOG.info("onOpen(WebSocket)");
+    }
+
+    /** {@inheritDoc}*/
+    @Override
+    public void onClose(WebSocket webSocket) {
+        LOG.info("onClose(WebSocket)");
+        
+    }
+
+    /** {@inheritDoc}*/
+    @Override
+    public void onError(WebSocket webSocket, WebSocketException t) {
+        LOG.info("onError(WebSocket, WebSocketException)");
+    }
+
+    protected WebSocketVirtualServletRequest createServletRequest(WebSocketServletHolder webSocketHolder, 
+                                                                  byte[] data, int offset, int length) 
+        throws IOException {
+        return new WebSocketVirtualServletRequest(webSocketHolder, 
+                                                  new ByteArrayInputStream(data, offset, length));
+    }
+    
+    protected WebSocketVirtualServletRequest createServletRequest(WebSocketServletHolder webSocketHolder, 
+                                                                  InputStream stream)
+        throws IOException {
+        return new WebSocketVirtualServletRequest(webSocketHolder, stream);
+    }
+
+    protected WebSocketVirtualServletResponse createServletResponse(WebSocketServletHolder webSocketHolder) 
+        throws IOException {
+        return new WebSocketVirtualServletResponse(webSocketHolder);
+    }
+    
+    protected static class AtmosphereWebSocketServletHolder implements WebSocketServletHolder {
+        private WebSocket webSocket;
+        
+        public AtmosphereWebSocketServletHolder(WebSocket webSocket) {
+            this.webSocket = webSocket;
+        }
+        
+        @Override
+        public String getAuthType() {
+            return webSocket.resource().getRequest().getAuthType();
+        }
+
+        @Override
+        public String getContextPath() {
+            return webSocket.resource().getRequest().getContextPath();
+        }
+
+        @Override
+        public String getLocalAddr() {
+            return webSocket.resource().getRequest().getLocalAddr();
+        }
+
+        @Override
+        public String getLocalName() {
+            return webSocket.resource().getRequest().getLocalName();
+        }
+
+        @Override
+        public int getLocalPort() {
+            return webSocket.resource().getRequest().getLocalPort();
+        }
+
+        @Override
+        public Locale getLocale() {
+            return webSocket.resource().getRequest().getLocale();
+        }
+
+        @Override
+        public Enumeration<Locale> getLocales() {
+            return webSocket.resource().getRequest().getLocales();
+        }
+
+        @Override
+        public String getProtocol() {
+            return webSocket.resource().getRequest().getProtocol();
+        }
+
+        @Override
+        public String getRemoteAddr() {
+            return webSocket.resource().getRequest().getRemoteAddr();
+        }
+
+        @Override
+        public String getRemoteHost() {
+            return webSocket.resource().getRequest().getRemoteHost();
+        }
+
+        @Override
+        public int getRemotePort() {
+            return webSocket.resource().getRequest().getRemotePort();
+        }
+
+        @Override
+        public String getRequestURI() {
+            return webSocket.resource().getRequest().getRequestURI();
+        }
+
+        @Override
+        public StringBuffer getRequestURL() {
+            return webSocket.resource().getRequest().getRequestURL();
+        }
+
+        @Override
+        public DispatcherType getDispatcherType() {
+            return webSocket.resource().getRequest().getDispatcherType();
+        }
+
+        @Override
+        public boolean isSecure() {
+            return webSocket.resource().getRequest().isSecure();
+        }
+
+        @Override
+        public String getPathInfo() {
+            return webSocket.resource().getRequest().getServletPath();
+        }
+
+        @Override
+        public String getPathTranslated() {
+            return webSocket.resource().getRequest().getPathTranslated();
+        }
+
+        @Override
+        public String getScheme() {
+            return webSocket.resource().getRequest().getScheme();
+        }
+
+        @Override
+        public String getServerName() {
+            return webSocket.resource().getRequest().getServerName();
+        }
+
+        @Override
+        public String getServletPath() {
+            return webSocket.resource().getRequest().getServletPath();
+        }
+
+        @Override
+        public int getServerPort() {
+            return webSocket.resource().getRequest().getServerPort();
+        }
+
+        @Override
+        public ServletContext getServletContext() {
+            return webSocket.resource().getRequest().getServletContext();
+        }
+        
+        @Override
+        public Principal getUserPrincipal() {
+            return webSocket.resource().getRequest().getUserPrincipal();
+        }
+
+        @Override
+        public void write(byte[] data, int offset, int length) throws IOException {
+            webSocket.write(data, offset, length);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java~
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java~ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java~
new file mode 100644
index 0000000..4ce80f1
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java~
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.atmosphere;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.Principal;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.DispatcherType;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.websocket.WebSocketDestinationService;
+import org.apache.cxf.transport.websocket.WebSocketServletHolder;
+import org.apache.cxf.transport.websocket.WebSocketVirtualServletRequest;
+import org.apache.cxf.transport.websocket.WebSocketVirtualServletResponse;
+import org.atmosphere.cpr.AtmosphereConfig;
+import org.atmosphere.cpr.AtmosphereRequest;
+import org.atmosphere.websocket.WebSocket;
+import org.atmosphere.websocket.WebSocketProcessor.WebSocketException;
+import org.atmosphere.websocket.WebSocketProtocol;
+
+/**
+ * 
+ */
+public class AtmosphereWebSocketHandler implements WebSocketProtocol {
+    private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketHandler.class);
+
+    protected AtmosphereWebSocketServletDestination destination;
+    
+    
+    public AtmosphereWebSocketServletDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(AtmosphereWebSocketServletDestination destination) {
+        this.destination = destination;
+    }
+
+    /** {@inheritDoc}*/
+    @Override
+    public void configure(AtmosphereConfig config) {
+        // TODO Auto-generated method stub
+
+    }
+
+    /** {@inheritDoc}*/
+    @Override
+    public List<AtmosphereRequest> onMessage(WebSocket webSocket, String data) {
+        LOG.info("onMessage(WebSocket, String)");
+        return null;
+    }
+
+    /** {@inheritDoc}*/
+    @Override
+    public List<AtmosphereRequest> onMessage(WebSocket webSocket, byte[] data, int offset, int length) {
+        LOG.info("onMessage(WebSocket, byte[], int, int)");
+        
+        try {
+            WebSocketServletHolder webSocketHolder = new AtmosphereWebSocketServletHolder(webSocket);
+            HttpServletRequest request = createServletRequest(webSocketHolder, data, offset, length);
+            HttpServletResponse response = createServletResponse(webSocketHolder);
+            if (destination != null) {
+                ((WebSocketDestinationService)destination).invokeInternal(null, webSocket.resource().getRequest().getServletContext(),
+                                                                          request, response);
+            }
+        } catch (Exception e) {
+            LOG.log(Level.WARNING, "Failed to invoke service", e);
+        }
+        return null;
+    }
+
+    /** {@inheritDoc}*/
+    @Override
+    public void onOpen(WebSocket webSocket) {
+        LOG.info("onOpen(WebSocket)");
+    }
+
+    /** {@inheritDoc}*/
+    @Override
+    public void onClose(WebSocket webSocket) {
+        LOG.info("onClose(WebSocket)");
+        
+    }
+
+    /** {@inheritDoc}*/
+    @Override
+    public void onError(WebSocket webSocket, WebSocketException t) {
+        // TODO Auto-generated method stub
+        LOG.info("onError(WebSocket, WebSocketException)");
+    }
+
+    protected WebSocketVirtualServletRequest createServletRequest(WebSocketServletHolder webSocketHolder, byte[] data, int offset, int length) 
+        throws IOException {
+        return new WebSocketVirtualServletRequest(null, webSocketHolder, new ByteArrayInputStream(data, offset, length));
+    }
+    
+    protected WebSocketVirtualServletRequest createServletRequest(WebSocketServletHolder webSocketHolder, InputStream stream)
+        throws IOException {
+        return new WebSocketVirtualServletRequest(null, webSocketHolder, stream);
+    }
+
+    protected WebSocketVirtualServletResponse createServletResponse(WebSocketServletHolder webSocketHolder) throws IOException {
+        return new WebSocketVirtualServletResponse(webSocketHolder);
+    }
+    
+    static class AtmosphereWebSocketServletHolder implements WebSocketServletHolder {
+        private WebSocket webSocket;
+        
+        public AtmosphereWebSocketServletHolder(WebSocket webSocket) {
+            this.webSocket = webSocket;
+        }
+        
+        @Override
+        public String getAuthType() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public String getContextPath() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public String getLocalAddr() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public String getLocalName() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public int getLocalPort() {
+            // TODO Auto-generated method stub
+            return 0;
+        }
+
+        @Override
+        public Locale getLocale() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Enumeration<Locale> getLocales() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public String getProtocol() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public String getRemoteAddr() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public String getRemoteHost() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public int getRemotePort() {
+            // TODO Auto-generated method stub
+            return 0;
+        }
+
+        @Override
+        public String getRequestURI() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public StringBuffer getRequestURL() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public DispatcherType getDispatcherType() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public boolean isSecure() {
+            // TODO Auto-generated method stub
+            return false;
+        }
+
+        @Override
+        public String getPathInfo() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public String getPathTranslated() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public String getScheme() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public String getServerName() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public String getServletPath() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public int getServerPort() {
+            // TODO Auto-generated method stub
+            return 0;
+        }
+
+        @Override
+        public Principal getUserPrincipal() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public void write(byte[] data, int i, int length) throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        public ServletOutputStream getServletOutputStream(Map<String, String> headers) {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public OutputStream getOutputStream(Map<String, String> headers) {
+            // TODO Auto-generated method stub
+            return null;
+        }
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
new file mode 100644
index 0000000..ffeeb5c
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.atmosphere;
+
+import java.io.IOException;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.DestinationRegistry;
+import org.apache.cxf.transport.servlet.ServletDestination;
+import org.apache.cxf.transport.websocket.WebSocketDestinationService;
+import org.atmosphere.cpr.ApplicationConfig;
+import org.atmosphere.cpr.AtmosphereFramework;
+import org.atmosphere.cpr.AtmosphereRequest;
+import org.atmosphere.cpr.AtmosphereResponse;
+import org.atmosphere.util.Utils;
+import org.atmosphere.websocket.WebSocketProtocol;
+
+/**
+ * 
+ */
+public class AtmosphereWebSocketServletDestination extends ServletDestination implements
+    WebSocketDestinationService {
+    private AtmosphereFramework framework;
+
+    public AtmosphereWebSocketServletDestination(Bus bus, DestinationRegistry registry, EndpointInfo ei, 
+                                                 String path) throws IOException {
+        super(bus, registry, ei, ei.toString());
+        this.framework = new AtmosphereFramework(false, true);
+
+        framework.setUseNativeImplementation(false);
+        framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");
+        //TODO provide a way to switch between the non-stream handler and the stream handler
+        framework.addInitParameter(ApplicationConfig.WEBSOCKET_PROTOCOL, 
+                                   AtmosphereWebSocketHandler.class.getName());
+        framework.init();
+
+        WebSocketProtocol wsp = framework.getWebSocketProtocol();
+        if (wsp instanceof AtmosphereWebSocketHandler) {
+            ((AtmosphereWebSocketHandler)wsp).setDestination(this);
+        }
+    }
+
+    @Override
+    public void invoke(ServletConfig config, ServletContext context, HttpServletRequest req,
+                       HttpServletResponse resp) throws IOException {
+        if (Utils.webSocketEnabled(req)) {
+            try {
+                framework.doCometSupport(AtmosphereRequest.wrap(req), AtmosphereResponse.wrap(resp));
+            } catch (ServletException e) {
+                throw new IOException(e);
+            }
+            return;
+        }
+        super.invoke(config, context, req, resp);
+    }
+
+    @Override
+    public void invokeInternal(ServletConfig config, ServletContext context, HttpServletRequest req,
+                               HttpServletResponse resp) throws IOException {
+        super.invoke(config, context, req, resp);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
new file mode 100644
index 0000000..8574858
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.atmosphere;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.websocket.WebSocketDestinationService;
+import org.apache.cxf.transport.websocket.WebSocketServletHolder;
+import org.atmosphere.cpr.AtmosphereRequest;
+import org.atmosphere.websocket.WebSocket;
+import org.atmosphere.websocket.WebSocketProtocolStream;
+
+/**
+ * 
+ */
+public class AtmosphereWebSocketStreamHandler extends AtmosphereWebSocketHandler implements 
+    WebSocketProtocolStream {
+    private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketStreamHandler.class);
+
+    @Override
+    public List<AtmosphereRequest> onTextStream(WebSocket webSocket, Reader r) {
+        LOG.info("onTextStream(WebSocket, Reader)");
+        //TODO add support for Reader
+        throw new IllegalArgumentException("not implemented");
+    }
+
+    @Override
+    public List<AtmosphereRequest> onBinaryStream(WebSocket webSocket, InputStream stream) {
+        LOG.info("onBinaryStream(WebSocket, InputStream)");
+        
+        try {
+            WebSocketServletHolder webSocketHolder = new AtmosphereWebSocketServletHolder(webSocket);
+            HttpServletRequest request = createServletRequest(webSocketHolder, stream);
+            HttpServletResponse response = createServletResponse(webSocketHolder);
+            if (destination != null) {
+                ((WebSocketDestinationService)destination).invokeInternal(null, 
+                    webSocket.resource().getRequest().getServletContext(),
+                    request, response);
+            }
+        } catch (Exception e) {
+            LOG.log(Level.WARNING, "Failed to invoke service", e);
+        }
+        return null;
+    }
+
+
+
+}


Mime
View raw message