cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject cxf git commit: [CXF-6314] WebSocket transport should not require dependency to jetty plus refactoring specific for cxf 3.0.x
Date Tue, 24 Mar 2015 15:53:07 GMT
Repository: cxf
Updated Branches:
  refs/heads/3.0.x-fixes c415e11d0 -> e3580d7a6


[CXF-6314] WebSocket transport should not require dependency to jetty plus refactoring specific for cxf 3.0.x


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/e3580d7a
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/e3580d7a
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/e3580d7a

Branch: refs/heads/3.0.x-fixes
Commit: e3580d7a6bb5ec330b870ec68d561566fac5be5c
Parents: c415e11
Author: Akitoshi Yoshida <ay@apache.org>
Authored: Tue Mar 24 16:52:05 2015 +0100
Committer: Akitoshi Yoshida <ay@apache.org>
Committed: Tue Mar 24 16:52:38 2015 +0100

----------------------------------------------------------------------
 rt/transports/websocket/pom.xml                 |   4 +-
 .../websocket/WebSocketDestinationFactory.java  |  40 +-
 .../websocket/jetty/JettyWebSocket.java         | 361 ------------------
 .../jetty/JettyWebSocketDestination.java        | 381 ++++++++++++++++++-
 .../websocket/jetty/JettyWebSocketHandler.java  |   9 +-
 .../websocket/jetty/JettyWebSocketManager.java  |  93 -----
 .../jetty/JettyWebSocketServletDestination.java | 362 +++++++++++++++++-
 .../jetty/JettyWebSocketManagerTest.java        | 126 ------
 .../JettyWebSocketServletDestinationTest.java   |  79 ----
 9 files changed, 757 insertions(+), 698 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/e3580d7a/rt/transports/websocket/pom.xml
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/pom.xml b/rt/transports/websocket/pom.xml
index f3a0def..7148b13 100644
--- a/rt/transports/websocket/pom.xml
+++ b/rt/transports/websocket/pom.xml
@@ -38,7 +38,7 @@
 -->
         <cxf.osgi.import>
             javax.servlet*;version="${cxf.osgi.javax.servlet.version}",
-            org.eclipse.jetty*;version="${cxf.jetty.osgi.version}",
+            org.eclipse.jetty*;version="${cxf.jetty.osgi.version}";resolution:=optional,
             org.apache.aries*;version="${cxf.aries.version.range}";resolution:=optional,
             org.atmosphere*;version="${cxf.atmosphere.version.range}";resolution:=optional,
             org.springframework*;resolution:="optional";version="${cxf.osgi.spring.version}"
@@ -101,11 +101,13 @@
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-server</artifactId>
+            <optional>true</optional>
         </dependency>
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-security</artifactId>
             <version>${cxf.jetty.version}</version>
+            <optional>true</optional>
         </dependency>
         <dependency>
             <groupId>org.eclipse.jetty</groupId>

http://git-wip-us.apache.org/repos/asf/cxf/blob/e3580d7a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
index 19d6dbf..f4f2d51 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
@@ -19,6 +19,7 @@
 package org.apache.cxf.transport.websocket;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.common.injection.NoJSR250Annotations;
@@ -30,15 +31,18 @@ import org.apache.cxf.transport.http.DestinationRegistry;
 import org.apache.cxf.transport.http.HTTPTransportFactory;
 import org.apache.cxf.transport.http.HttpDestinationFactory;
 import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngineFactory;
-import org.apache.cxf.transport.websocket.atmosphere.AtmosphereWebSocketJettyDestination;
 import org.apache.cxf.transport.websocket.atmosphere.AtmosphereWebSocketServletDestination;
-import org.apache.cxf.transport.websocket.jetty.JettyWebSocketDestination;
 import org.apache.cxf.transport.websocket.jetty.JettyWebSocketServletDestination;
+//import org.apache.cxf.transport.websocket.jetty.JettyWebSocketServletDestination;
 
 @NoJSR250Annotations()
 public class WebSocketDestinationFactory implements HttpDestinationFactory {
     private static final boolean ATMOSPHERE_AVAILABLE = probeClass("org.atmosphere.cpr.ApplicationConfig");
-    
+    private static final Constructor<?> JETTY_WEBSOCKET_DESTINATION_CTR = 
+        probeConstructor("org.apache.cxf.transport.websocket.jetty.JettyWebSocketDestination");    
+    private static final Constructor<?> ATMOSPHERE_WEBSOCKET_JETTY_DESTINATION_CTR = 
+        probeConstructor("org.apache.cxf.transport.websocket.atmosphere.AtmosphereWebSocketJettyDestination");    
+
     private static boolean probeClass(String name) {
         try {
             Class.forName(name, true, WebSocketDestinationFactory.class.getClassLoader());
@@ -48,6 +52,16 @@ public class WebSocketDestinationFactory implements HttpDestinationFactory {
         }
     }
     
+    private static Constructor<?> probeConstructor(String name) {
+        try {
+            Class<?> clz = Class.forName(name, true, WebSocketDestinationFactory.class.getClassLoader());
+            return clz.getConstructor(Bus.class, DestinationRegistry.class, 
+                                      EndpointInfo.class, JettyHTTPServerEngineFactory.class);
+        } catch (Throwable t) {
+            return null;
+        }
+    }
+    
     public AbstractHTTPDestination createDestination(EndpointInfo endpointInfo, Bus bus,
                                                      DestinationRegistry registry) throws IOException {
         if (endpointInfo.getAddress().startsWith("ws")) {
@@ -56,9 +70,11 @@ public class WebSocketDestinationFactory implements HttpDestinationFactory {
                 .getExtension(JettyHTTPServerEngineFactory.class);
             if (ATMOSPHERE_AVAILABLE) {
                 // use atmosphere if available
-                return new AtmosphereWebSocketJettyDestination(bus, registry, endpointInfo, serverEngineFactory);
+                return createJettyHTTPDestination(ATMOSPHERE_WEBSOCKET_JETTY_DESTINATION_CTR,
+                                                  bus, registry, endpointInfo, serverEngineFactory);
             } else {
-                return new JettyWebSocketDestination(bus, registry, endpointInfo, serverEngineFactory);
+                return createJettyHTTPDestination(JETTY_WEBSOCKET_DESTINATION_CTR,
+                                                  bus, registry, endpointInfo, serverEngineFactory);
             }
         } else {
             //REVISIT other way of getting the registry of http so that the plain cxf servlet finds the destination?
@@ -92,5 +108,17 @@ public class WebSocketDestinationFactory implements HttpDestinationFactory {
         return null;
     }
     
-
+    private AbstractHTTPDestination createJettyHTTPDestination(Constructor<?> ctr, Bus bus, 
+                                                               DestinationRegistry registry, EndpointInfo ei,
+                                                               JettyHTTPServerEngineFactory jhsef) throws IOException {
+        if (ctr != null) {
+            try {
+                return (AbstractHTTPDestination) ctr.newInstance(bus, registry, ei, jhsef);
+            } catch (Throwable t) {
+                // log
+                t.printStackTrace();
+            }
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/e3580d7a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
deleted file mode 100644
index f48efb5..0000000
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.transport.websocket.jetty;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.security.Principal;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.servlet.DispatcherType;
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.helpers.CastUtils;
-import org.apache.cxf.transport.websocket.InvalidPathException;
-import org.apache.cxf.transport.websocket.WebSocketConstants;
-import org.eclipse.jetty.websocket.WebSocket;
-
-class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessage {
-    private static final Logger LOG = LogUtils.getL7dLogger(JettyWebSocket.class);
-
-    private JettyWebSocketManager manager;
-    private Connection webSocketConnection;
-    private WebSocketServletHolder webSocketHolder;
-    private String protocol;
-
-    //REVISIT make these keys configurable
-    private String requestIdKey = WebSocketConstants.DEFAULT_REQUEST_ID_KEY;
-    private String responseIdKey = WebSocketConstants.DEFAULT_RESPONSE_ID_KEY;
-    
-    public JettyWebSocket(JettyWebSocketManager manager, HttpServletRequest request, String protocol) {
-        this.manager = manager;
-        this.protocol = protocol;
-        this.webSocketHolder = new JettyWebSocketServletHolder(this, request);
-    }
-    
-
-    @Override
-    public void onClose(int closeCode, String message) {
-        if (LOG.isLoggable(Level.FINE)) {
-            LOG.log(Level.FINE, "onClose({0}, {1})", new Object[]{closeCode, message});
-        }
-        this.webSocketConnection = null;
-    }
-
-    @Override
-    public void onOpen(Connection connection) {
-        if (LOG.isLoggable(Level.FINE)) {
-            LOG.log(Level.FINE, "onOpen({0}))", connection);
-        }
-        this.webSocketConnection = connection;
-    }
-
-    @Override
-    public void onMessage(String data) {
-        if (LOG.isLoggable(Level.FINE)) {
-            LOG.log(Level.FINE, "onMessage({0})", data);
-        }
-        try {
-            //TODO may want use string directly instead of converting it to byte[]
-            byte[] bdata = data.getBytes("utf-8");
-            invokeService(bdata, 0, bdata.length);
-        } catch (UnsupportedEncodingException e) {
-            // will not happen
-        }            
-    }
-
-    @Override
-    public void onMessage(byte[] data, int offset, int length) {
-        if (LOG.isLoggable(Level.FINE)) {
-            LOG.log(Level.FINE, "onMessage({0}, {1}, {2})", new Object[]{data, offset, length});
-        }
-        final byte[] safedata = new byte[length];
-        System.arraycopy(data, offset, safedata, 0, length);
-        invokeService(safedata, 0, safedata.length);
-    }
-    
-    private void invokeService(final byte[] data, final int offset, final int length) {
-        // invoke the service asynchronously as the jetty websocket's onMessage is synchronously blocked
-        // make sure the byte array passed to this method is immutable, as the websocket framework
-        // may corrupt the byte array after this method is returned (i.e., before the data is returned in
-        // the executor's thread.
-        executeServiceTask(new Runnable() {
-            @Override
-            public void run() {
-                HttpServletRequest request = null;
-                HttpServletResponse response = null;
-                try {
-                    response = createServletResponse();
-                    request = createServletRequest(data, offset, length);
-                    if (manager != null) {
-                        String reqid = request.getHeader(requestIdKey);
-                        if (reqid != null) {
-                            response.setHeader(responseIdKey, reqid);
-                        }
-                        manager.service(request, response);
-                    }
-                } catch (InvalidPathException ex) { 
-                    reportErrorStatus(response, 400);
-                } catch (Exception e) {
-                    LOG.log(Level.WARNING, "Failed to invoke service", e);
-                    reportErrorStatus(response, 500);
-                }
-            }
-        });
-    }
-
-    private void executeServiceTask(Runnable r) {
-        try {
-            manager.getExecutor().execute(r);
-        } catch (RejectedExecutionException e) {
-            LOG.warning(
-                "Executor queue is full, run the service invocation task in caller thread." 
-                + "  Users can specify a larger executor queue to avoid this.");
-            r.run();
-        }
-    }
-
-    // may want to move this error reporting code to WebSocketServletHolder
-    private void reportErrorStatus(HttpServletResponse response, int status) {
-        if (response != null) {
-            response.setStatus(status);
-            try {
-                response.getWriter().write("\r\n");
-                response.getWriter().close();
-                response.flushBuffer();
-            } catch (IOException ex) {
-                throw new RuntimeException(ex);
-            }
-        }
-    }
-    
-    private WebSocketVirtualServletRequest createServletRequest(byte[] data, int offset, int length) 
-        throws IOException {
-        return new WebSocketVirtualServletRequest(webSocketHolder, new ByteArrayInputStream(data, offset, length));
-    }
-
-    private WebSocketVirtualServletResponse createServletResponse() throws IOException {
-        return new WebSocketVirtualServletResponse(webSocketHolder);
-    }
-    
-    /**
-     * Writes to the underlining socket.
-     * 
-     * @param data
-     * @param offset
-     * @param length
-     */
-    void write(byte[] data, int offset, int length) throws IOException {
-        LOG.log(Level.FINE, "write(byte[], offset, length)");
-        webSocketConnection.sendMessage(data, offset, length);
-    }
-    
-    String getProtocol() {
-        return protocol;
-    }
-    
-    private static class JettyWebSocketServletHolder implements WebSocketServletHolder {
-        private JettyWebSocket webSocket;
-        private Map<String, Object> requestProperties;
-        private Map<String, Object> requestAttributes;
-        
-        public JettyWebSocketServletHolder(JettyWebSocket webSocket, HttpServletRequest request) {
-            this.webSocket = webSocket;
-            this.requestProperties = readProperties(request);
-            this.requestAttributes = new TreeMap<String, Object>(String.CASE_INSENSITIVE_ORDER);
-            // attributes that are needed for finding the operation in some cases
-            Object v = request.getAttribute("org.apache.cxf.transport.endpoint.address");
-            if (v != null) {
-                requestAttributes.put("org.apache.cxf.transport.endpoint.address", v);
-            }
-        }
-
-        @SuppressWarnings("unchecked")
-        private <T> T getRequestProperty(String name, Class<T> cls) {
-            return (T)requestProperties.get(name);
-        }
-
-        private Map<String, Object> readProperties(HttpServletRequest request) {
-            Map<String, Object> properties = new HashMap<String, Object>();
-            properties.put("servletPath", request.getServletPath());
-            properties.put("requestURI", request.getRequestURI());
-            properties.put("requestURL", request.getRequestURL());
-            properties.put("contextPath", request.getContextPath());
-            properties.put("servletPath", request.getServletPath());
-            properties.put("servletContext", request.getServletContext());
-            properties.put("pathInfo", request.getPathInfo());
-            properties.put("pathTranslated", request.getPathTranslated());
-            properties.put("protocol", request.getProtocol());
-            properties.put("scheme", request.getScheme());
-            // some additional ones
-            properties.put("localAddr", request.getLocalAddr());
-            properties.put("localName", request.getLocalName());
-            properties.put("localPort", request.getLocalPort());
-            properties.put("locale", request.getLocale());
-            properties.put("locales", request.getLocales());
-            properties.put("remoteHost", request.getRemoteHost());
-            properties.put("remotePort", request.getRemotePort());
-            properties.put("remoteAddr", request.getRemoteAddr());
-            properties.put("serverName", request.getServerName());
-            properties.put("serverPort", request.getServerPort());
-            properties.put("secure", request.isSecure());
-            properties.put("authType", request.getAuthType());
-            properties.put("dispatcherType", request.getDispatcherType());
-
-            return properties;
-        }
-
-        @Override
-        public String getAuthType() {
-            return getRequestProperty("authType", String.class);
-        }
-
-        @Override
-        public String getContextPath() {
-            return getRequestProperty("contextPath", String.class);
-        }
-
-        @Override
-        public String getLocalAddr() {
-            return getRequestProperty("LocalAddr", String.class);
-        }
-
-        @Override
-        public String getLocalName() {
-            return getRequestProperty("localName", String.class);
-        }
-
-        @Override
-        public int getLocalPort() {
-            return getRequestProperty("localPort", int.class);
-        }
-
-        @Override
-        public Locale getLocale() {
-            return getRequestProperty("locale", Locale.class);
-        }
-
-        @Override
-        public Enumeration<Locale> getLocales() {
-            return CastUtils.cast(getRequestProperty("locales", Enumeration.class));
-        }
-
-        @Override
-        public String getProtocol() {
-            return getRequestProperty("protocol", String.class);
-        }
-
-        @Override
-        public String getRemoteAddr() {
-            return getRequestProperty("remoteAddr", String.class);
-        }
-
-        @Override
-        public String getRemoteHost() {
-            return getRequestProperty("remoteHost", String.class);
-        }
-
-        @Override
-        public int getRemotePort() {
-            return getRequestProperty("remotePort", int.class);
-        }
-
-        @Override
-        public String getRequestURI() {
-            return getRequestProperty("requestURI", String.class);
-        }
-
-        @Override
-        public StringBuffer getRequestURL() {
-            return getRequestProperty("requestURL", StringBuffer.class);
-        }
-
-        @Override
-        public DispatcherType getDispatcherType() {
-            return getRequestProperty("dispatcherType", DispatcherType.class);
-        }
-
-        @Override
-        public boolean isSecure() {
-            return getRequestProperty("secure", boolean.class);
-        }
-
-        @Override
-        public String getPathInfo() {
-            return getRequestProperty("pathInfo", String.class);
-        }
-
-        @Override
-        public String getPathTranslated() {
-            return getRequestProperty("pathTranslated", String.class);
-        }
-
-        @Override
-        public String getScheme() {
-            return getRequestProperty("scheme", String.class);
-        }
-
-        @Override
-        public String getServerName() {
-            return getRequestProperty("serverName", String.class);
-        }
-
-        @Override
-        public String getServletPath() {
-            return getRequestProperty("servletPath", String.class);
-        }
-
-        @Override
-        public int getServerPort() {
-            return getRequestProperty("serverPort", int.class);
-        }
-
-        @Override
-        public ServletContext getServletContext() {
-            return getRequestProperty("serverContext", ServletContext.class);
-        }
-
-        @Override
-        public Principal getUserPrincipal() {
-            return getRequestProperty("userPrincipal", Principal.class);
-        }
-
-        @Override
-        public Object getAttribute(String name) {
-            return requestAttributes.get(name);
-        }
-
-        @Override
-        public void write(byte[] data, int offset, int length) throws IOException {
-            webSocket.write(data, offset, length);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/e3580d7a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketDestination.java
index c2c2234..69ffeca 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketDestination.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketDestination.java
@@ -19,22 +19,41 @@
 
 package org.apache.cxf.transport.websocket.jetty;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.net.URL;
+import java.security.Principal;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import javax.servlet.DispatcherType;
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.http.DestinationRegistry;
 import org.apache.cxf.transport.http_jetty.JettyHTTPDestination;
 import org.apache.cxf.transport.http_jetty.JettyHTTPHandler;
 import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngineFactory;
+import org.apache.cxf.transport.websocket.InvalidPathException;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
 import org.apache.cxf.transport.websocket.WebSocketDestinationService;
+import org.apache.cxf.workqueue.WorkQueueManager;
+import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.websocket.WebSocket;
 import org.eclipse.jetty.websocket.WebSocketFactory;
 
@@ -42,14 +61,21 @@ import org.eclipse.jetty.websocket.WebSocketFactory;
  * 
  */
 public class JettyWebSocketDestination extends JettyHTTPDestination implements 
-    WebSocketDestinationService, WebSocketFactory.Acceptor {
-    private JettyWebSocketManager webSocketManager;
+    WebSocketDestinationService {
+    private static final Logger LOG = LogUtils.getL7dLogger(JettyWebSocketDestination.class);
+
+    //REVISIT make these keys configurable
+    private String requestIdKey = WebSocketConstants.DEFAULT_REQUEST_ID_KEY;
+    private String responseIdKey = WebSocketConstants.DEFAULT_RESPONSE_ID_KEY;
+
+    private WebSocketFactory webSocketFactory;
+    private final Executor executor;
 
     public JettyWebSocketDestination(Bus bus, DestinationRegistry registry, EndpointInfo ei,
                                      JettyHTTPServerEngineFactory serverEngineFactory) throws IOException {
         super(bus, registry, ei, serverEngineFactory);
-        webSocketManager = new JettyWebSocketManager();
-        webSocketManager.init(this);
+        webSocketFactory = new WebSocketFactory(new Acceptor(), 8192);
+        executor = bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue();
     }
     
     @Override
@@ -57,7 +83,16 @@ public class JettyWebSocketDestination extends JettyHTTPDestination implements
                                HttpServletResponse resp) throws IOException {
         super.invoke(config, context, req, resp);
     }
-
+    public void invoke(final ServletConfig config, 
+                       final ServletContext context, 
+                       final HttpServletRequest request, 
+                       final HttpServletResponse response) throws IOException {
+        if (webSocketFactory.acceptWebSocket(request, response)) {
+            ((Request)request).setHandled(true);
+            return;
+        }
+        super.invoke(config, context, request, response);
+    }
     @Override
     protected String getAddress(EndpointInfo endpointInfo) {
         String address = endpointInfo.getAddress();
@@ -66,8 +101,7 @@ public class JettyWebSocketDestination extends JettyHTTPDestination implements
         }
         return address;
     }
-
-
+        
     @Override
     protected String getBasePath(String contextPath) throws IOException {
         if (StringUtils.isEmpty(endpointInfo.getAddress())) {
@@ -75,26 +109,16 @@ public class JettyWebSocketDestination extends JettyHTTPDestination implements
         }
         return new URL(getAddress(endpointInfo)).getPath();
     }
-    
-    @Override
-    protected JettyHTTPHandler createJettyHTTPHandler(JettyHTTPDestination jhd, boolean cmExact) {
-        return new JettyWebSocketHandler(jhd, cmExact, webSocketManager);
-    }
-
-    @Override
-    public boolean checkOrigin(HttpServletRequest arg0, String arg1) {
-        return true;
-    }
 
     @Override
-    public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
-        return new JettyWebSocket(webSocketManager, request, protocol);
+    protected JettyHTTPHandler createJettyHTTPHandler(JettyHTTPDestination jhd, boolean cmExact) {
+        return new JettyWebSocketHandler(jhd, cmExact, webSocketFactory);
     }
-
+    
     @Override
     public void shutdown() {
         try {
-            webSocketManager.destroy();
+            webSocketFactory.stop();
         } catch (Exception e) {
             // ignore
         } finally {
@@ -102,4 +126,319 @@ public class JettyWebSocketDestination extends JettyHTTPDestination implements
         }
     }
 
+    private class Acceptor implements WebSocketFactory.Acceptor {
+
+        @Override
+        public boolean checkOrigin(HttpServletRequest request, String protocol) {
+            return true;
+        }
+
+        @Override
+        public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
+            return new JettyWebSocket(request, protocol);
+        }
+    }
+
+    private class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessage {
+        private Connection webSocketConnection;
+        private WebSocketServletHolder webSocketHolder;
+//        private String protocol;
+
+        public JettyWebSocket(HttpServletRequest request, String protocol) {
+//            this.protocol = protocol;
+            this.webSocketHolder = new JettyWebSocketServletHolder(this, request);
+        }
+        
+        @Override
+        public void onClose(int closeCode, String message) {
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.log(Level.FINE, "onClose({0}, {1})", new Object[]{closeCode, message});
+            }
+            this.webSocketConnection = null;
+        }
+
+        @Override
+        public void onOpen(Connection connection) {
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.log(Level.FINE, "onOpen({0}))", connection);
+            }
+            this.webSocketConnection = connection;
+        }
+
+        @Override
+        public void onMessage(String data) {
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.log(Level.FINE, "onMessage({0})", data);
+            }
+            try {
+                //TODO may want use string directly instead of converting it to byte[]
+                byte[] bdata = data.getBytes("utf-8");
+                invokeService(bdata, 0, bdata.length);
+            } catch (UnsupportedEncodingException e) {
+                // will not happen
+            }            
+        }
+
+        @Override
+        public void onMessage(byte[] data, int offset, int length) {
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.log(Level.FINE, "onMessage({0}, {1}, {2})", new Object[]{data, offset, length});
+            }
+            final byte[] safedata = new byte[length];
+            System.arraycopy(data, offset, safedata, 0, length);
+            invokeService(safedata, 0, safedata.length);
+        }
+        
+        private void invokeService(final byte[] data, final int offset, final int length) {
+            // invoke the service asynchronously as the jetty websocket's onMessage is synchronously blocked
+            // make sure the byte array passed to this method is immutable, as the websocket framework
+            // may corrupt the byte array after this method is returned (i.e., before the data is returned in
+            // the executor's thread.
+            executeServiceTask(new Runnable() {
+                @Override
+                public void run() {
+                    HttpServletRequest request = null;
+                    HttpServletResponse response = null;
+                    try {
+                        response = createServletResponse();
+                        request = createServletRequest(data, offset, length);
+                        String reqid = request.getHeader(requestIdKey);
+                        if (reqid != null) {
+                            response.setHeader(responseIdKey, reqid);
+                        }
+                        invoke(null, null, request, response);
+                    } catch (InvalidPathException ex) { 
+                        reportErrorStatus(response, 400);
+                    } catch (Exception e) {
+                        LOG.log(Level.WARNING, "Failed to invoke service", e);
+                        reportErrorStatus(response, 500);
+                    }
+                }
+            });
+        }
+
+        private void executeServiceTask(Runnable r) {
+            try {
+                executor.execute(r);
+            } catch (RejectedExecutionException e) {
+                LOG.warning(
+                    "Executor queue is full, run the service invocation task in caller thread." 
+                    + "  Users can specify a larger executor queue to avoid this.");
+                r.run();
+            }
+        }
+
+        // may want to move this error reporting code to WebSocketServletHolder
+        private void reportErrorStatus(HttpServletResponse response, int status) {
+            if (response != null) {
+                response.setStatus(status);
+                try {
+                    response.getWriter().write("\r\n");
+                    response.getWriter().close();
+                    response.flushBuffer();
+                } catch (IOException ex) {
+                    throw new RuntimeException(ex);
+                }
+            }
+        }
+        
+        private WebSocketVirtualServletRequest createServletRequest(byte[] data, int offset, int length) 
+            throws IOException {
+            return new WebSocketVirtualServletRequest(webSocketHolder, new ByteArrayInputStream(data, offset, length));
+        }
+
+        private WebSocketVirtualServletResponse createServletResponse() throws IOException {
+            return new WebSocketVirtualServletResponse(webSocketHolder);
+        }
+        
+        /**
+         * Writes to the underlining socket.
+         * 
+         * @param data
+         * @param offset
+         * @param length
+         */
+        void write(byte[] data, int offset, int length) throws IOException {
+            LOG.log(Level.FINE, "write(byte[], offset, length)");
+            webSocketConnection.sendMessage(data, offset, length);
+        }
+    }
+    
+    private static class JettyWebSocketServletHolder implements WebSocketServletHolder {
+        private JettyWebSocket webSocket;
+        private Map<String, Object> requestProperties;
+        private Map<String, Object> requestAttributes;
+        
+        public JettyWebSocketServletHolder(JettyWebSocket webSocket, HttpServletRequest request) {
+            this.webSocket = webSocket;
+            this.requestProperties = readProperties(request);
+            this.requestAttributes = new TreeMap<String, Object>(String.CASE_INSENSITIVE_ORDER);
+            // attributes that are needed for finding the operation in some cases
+            Object v = request.getAttribute("org.apache.cxf.transport.endpoint.address");
+            if (v != null) {
+                requestAttributes.put("org.apache.cxf.transport.endpoint.address", v);
+            }
+        }
+
+        @SuppressWarnings("unchecked")
+        private <T> T getRequestProperty(String name, Class<T> cls) {
+            return (T)requestProperties.get(name);
+        }
+
+        private Map<String, Object> readProperties(HttpServletRequest request) {
+            Map<String, Object> properties = new HashMap<String, Object>();
+            properties.put("servletPath", request.getServletPath());
+            properties.put("requestURI", request.getRequestURI());
+            properties.put("requestURL", request.getRequestURL());
+            properties.put("contextPath", request.getContextPath());
+            properties.put("servletPath", request.getServletPath());
+            properties.put("servletContext", request.getServletContext());
+            properties.put("pathInfo", request.getPathInfo());
+            properties.put("pathTranslated", request.getPathTranslated());
+            properties.put("protocol", request.getProtocol());
+            properties.put("scheme", request.getScheme());
+            // some additional ones
+            properties.put("localAddr", request.getLocalAddr());
+            properties.put("localName", request.getLocalName());
+            properties.put("localPort", request.getLocalPort());
+            properties.put("locale", request.getLocale());
+            properties.put("locales", request.getLocales());
+            properties.put("remoteHost", request.getRemoteHost());
+            properties.put("remotePort", request.getRemotePort());
+            properties.put("remoteAddr", request.getRemoteAddr());
+            properties.put("serverName", request.getServerName());
+            properties.put("serverPort", request.getServerPort());
+            properties.put("secure", request.isSecure());
+            properties.put("authType", request.getAuthType());
+            properties.put("dispatcherType", request.getDispatcherType());
+
+            return properties;
+        }
+
+        @Override
+        public String getAuthType() {
+            return getRequestProperty("authType", String.class);
+        }
+
+        @Override
+        public String getContextPath() {
+            return getRequestProperty("contextPath", String.class);
+        }
+
+        @Override
+        public String getLocalAddr() {
+            return getRequestProperty("LocalAddr", String.class);
+        }
+
+        @Override
+        public String getLocalName() {
+            return getRequestProperty("localName", String.class);
+        }
+
+        @Override
+        public int getLocalPort() {
+            return getRequestProperty("localPort", int.class);
+        }
+
+        @Override
+        public Locale getLocale() {
+            return getRequestProperty("locale", Locale.class);
+        }
+
+        @Override
+        public Enumeration<Locale> getLocales() {
+            return CastUtils.cast(getRequestProperty("locales", Enumeration.class));
+        }
+
+        @Override
+        public String getProtocol() {
+            return getRequestProperty("protocol", String.class);
+        }
+
+        @Override
+        public String getRemoteAddr() {
+            return getRequestProperty("remoteAddr", String.class);
+        }
+
+        @Override
+        public String getRemoteHost() {
+            return getRequestProperty("remoteHost", String.class);
+        }
+
+        @Override
+        public int getRemotePort() {
+            return getRequestProperty("remotePort", int.class);
+        }
+
+        @Override
+        public String getRequestURI() {
+            return getRequestProperty("requestURI", String.class);
+        }
+
+        @Override
+        public StringBuffer getRequestURL() {
+            return getRequestProperty("requestURL", StringBuffer.class);
+        }
+
+        @Override
+        public DispatcherType getDispatcherType() {
+            return getRequestProperty("dispatcherType", DispatcherType.class);
+        }
+
+        @Override
+        public boolean isSecure() {
+            return getRequestProperty("secure", boolean.class);
+        }
+
+        @Override
+        public String getPathInfo() {
+            return getRequestProperty("pathInfo", String.class);
+        }
+
+        @Override
+        public String getPathTranslated() {
+            return getRequestProperty("pathTranslated", String.class);
+        }
+
+        @Override
+        public String getScheme() {
+            return getRequestProperty("scheme", String.class);
+        }
+
+        @Override
+        public String getServerName() {
+            return getRequestProperty("serverName", String.class);
+        }
+
+        @Override
+        public String getServletPath() {
+            return getRequestProperty("servletPath", String.class);
+        }
+
+        @Override
+        public int getServerPort() {
+            return getRequestProperty("serverPort", int.class);
+        }
+
+        @Override
+        public ServletContext getServletContext() {
+            return getRequestProperty("serverContext", ServletContext.class);
+        }
+
+        @Override
+        public Principal getUserPrincipal() {
+            return getRequestProperty("userPrincipal", Principal.class);
+        }
+
+        @Override
+        public Object getAttribute(String name) {
+            return requestAttributes.get(name);
+        }
+
+        @Override
+        public void write(byte[] data, int offset, int length) throws IOException {
+            webSocket.write(data, offset, length);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/e3580d7a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketHandler.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketHandler.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketHandler.java
index 33da833..57231c0 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketHandler.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketHandler.java
@@ -27,23 +27,24 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.cxf.transport.http_jetty.JettyHTTPDestination;
 import org.apache.cxf.transport.http_jetty.JettyHTTPHandler;
 import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.websocket.WebSocketFactory;
 
 /**
  * The extended version of JettyHTTPHandler that can support websocket.
  */
 class JettyWebSocketHandler extends JettyHTTPHandler {
-    private JettyWebSocketManager webSocketManager;
+    private WebSocketFactory webSocketFactory;;
 
     public JettyWebSocketHandler(JettyHTTPDestination jhd, boolean cmExact,
-                                 JettyWebSocketManager webSocketManager) {
+                                 WebSocketFactory webSocketFactory) {
         super(jhd, cmExact);
-        this.webSocketManager = webSocketManager;
+        this.webSocketFactory = webSocketFactory;
     }
     
     @Override
     public void handle(String target, Request baseRequest, HttpServletRequest request,
                        HttpServletResponse response) throws IOException, ServletException {
-        if (webSocketManager.acceptWebSocket(request, response)) {
+        if (webSocketFactory.acceptWebSocket(request, response)) {
             baseRequest.setHandled(true);
         } else {
             super.handle(target, baseRequest, request, response);

http://git-wip-us.apache.org/repos/asf/cxf/blob/e3580d7a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
deleted file mode 100644
index 7aef4b7..0000000
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.transport.websocket.jetty;
-
-import java.io.IOException;
-import java.util.concurrent.Executor;
-
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.apache.cxf.transport.websocket.WebSocketDestinationService;
-import org.apache.cxf.workqueue.WorkQueueManager;
-import org.eclipse.jetty.websocket.WebSocketFactory;
-import org.eclipse.jetty.websocket.WebSocketFactory.Acceptor;
-
-/**
- * This class is used to provide the common functionality used by
- * the two jetty websocket destination classes: JettyWebSocketDestination and JettyWebSocketServletDestination.
- */
-class JettyWebSocketManager {
-    private WebSocketFactory webSocketFactory;
-    private AbstractHTTPDestination destination;
-    private ServletContext servletContext;
-    private Executor executor;
-
-    public void init(AbstractHTTPDestination dest) {
-        this.destination = dest;
-
-        //TODO customize websocket factory configuration options when using the destination.
-        webSocketFactory = new WebSocketFactory((Acceptor)dest, 8192);
-
-        // the executor for decoupling the service invocation from websocket's onMessage call which is
-        // synchronously blocked
-        executor = dest.getBus().getExtension(WorkQueueManager.class).getAutomaticWorkQueue();
-    }
-
-    void setServletContext(ServletContext servletContext) {
-        this.servletContext = servletContext;
-    }
-
-    public void destroy() {
-        try {
-            webSocketFactory.stop();
-        } catch (Exception e) {
-            // ignore
-        }
-    }
-
-    public boolean acceptWebSocket(ServletRequest req, ServletResponse res) throws IOException {
-        try {
-            HttpServletRequest request = (HttpServletRequest) req;
-            HttpServletResponse response = (HttpServletResponse) res;
-            if (webSocketFactory.acceptWebSocket(request, response) || response.isCommitted()) {
-                return true;
-            }
-        } catch (ClassCastException e) {
-            // ignore
-        }
-        return false;
-    }
-
-    void service(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
-        if (destination != null) {
-            ((WebSocketDestinationService)destination).invokeInternal(null, servletContext, request, response);
-        }
-    }
-    
-    Executor getExecutor() {
-        return executor;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/e3580d7a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketServletDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketServletDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketServletDestination.java
index 296410b..4ed2c40 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketServletDestination.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketServletDestination.java
@@ -19,42 +19,74 @@
 
 package org.apache.cxf.transport.websocket.jetty;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.Principal;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import javax.servlet.DispatcherType;
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.http.DestinationRegistry;
 import org.apache.cxf.transport.servlet.ServletDestination;
+import org.apache.cxf.transport.websocket.InvalidPathException;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
 import org.apache.cxf.transport.websocket.WebSocketDestinationService;
+import org.apache.cxf.workqueue.WorkQueueManager;
 import org.eclipse.jetty.websocket.WebSocket;
 import org.eclipse.jetty.websocket.WebSocketFactory;
 
 /**
+ * This class is only used in CXF 3.0.x in the servlet container mode when atmosphere is not available.
+ * In CXF 3.1.x, JettyWebSocketDestination can be directly used for the servlet container mode when
+ * atmosphere is not available.
  * 
+ * To keep JettyWebSocketDestination in cxf 3.1.x and 3.0.x aligned, this class 
+ * duplicates some parts of JettyWebSocketDestination instead of creating the common parts as
+ * separate shared classes for now.
  */
 public class JettyWebSocketServletDestination extends ServletDestination implements 
     WebSocketDestinationService, WebSocketFactory.Acceptor {
-    private JettyWebSocketManager webSocketManager;
+    private static final Logger LOG = LogUtils.getL7dLogger(JettyWebSocketServletDestination.class);
+
+    //REVISIT make these keys configurable
+    private String requestIdKey = WebSocketConstants.DEFAULT_REQUEST_ID_KEY;
+    private String responseIdKey = WebSocketConstants.DEFAULT_RESPONSE_ID_KEY;
+
+    private WebSocketFactory webSocketFactory;
+    private final Executor executor;
+
 
     public JettyWebSocketServletDestination(Bus bus, DestinationRegistry registry, EndpointInfo ei,
                                             String path) throws IOException {
         super(bus, registry, ei, path);
-        webSocketManager = new JettyWebSocketManager();
-        webSocketManager.init(this);
+        webSocketFactory = new WebSocketFactory(new Acceptor(), 8192);
+        executor = bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue();
     }
 
     @Override
     public void invoke(ServletConfig config, ServletContext context, HttpServletRequest req,
                        HttpServletResponse resp) throws IOException {
-        if (webSocketManager.acceptWebSocket(req, resp)) {
+        if (webSocketFactory.acceptWebSocket(req, resp)) {
             return;
         }
-
+    
         super.invoke(config, context, req, resp);
     }
 
@@ -71,17 +103,333 @@ public class JettyWebSocketServletDestination extends ServletDestination impleme
 
     @Override
     public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
-        return new JettyWebSocket(webSocketManager, request, protocol);
+        return new JettyWebSocket(request, protocol);
     }
 
     @Override
     public void shutdown() {
         try {
-            webSocketManager.destroy();
+            webSocketFactory.stop();
         } catch (Exception e) {
             // ignore
         } finally {
             super.shutdown();
         }
     }
+
+    private class Acceptor implements WebSocketFactory.Acceptor {
+
+        @Override
+        public boolean checkOrigin(HttpServletRequest request, String protocol) {
+            return true;
+        }
+
+        @Override
+        public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
+            return new JettyWebSocket(request, protocol);
+        }
+    }
+
+    private class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessage {
+        private Connection webSocketConnection;
+        private WebSocketServletHolder webSocketHolder;
+//        private String protocol;
+
+        public JettyWebSocket(HttpServletRequest request, String protocol) {
+//            this.protocol = protocol;
+            this.webSocketHolder = new JettyWebSocketServletHolder(this, request);
+        }
+        
+        @Override
+        public void onClose(int closeCode, String message) {
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.log(Level.FINE, "onClose({0}, {1})", new Object[]{closeCode, message});
+            }
+            this.webSocketConnection = null;
+        }
+
+        @Override
+        public void onOpen(Connection connection) {
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.log(Level.FINE, "onOpen({0}))", connection);
+            }
+            this.webSocketConnection = connection;
+        }
+
+        @Override
+        public void onMessage(String data) {
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.log(Level.FINE, "onMessage({0})", data);
+            }
+            try {
+                //TODO may want use string directly instead of converting it to byte[]
+                byte[] bdata = data.getBytes("utf-8");
+                invokeService(bdata, 0, bdata.length);
+            } catch (UnsupportedEncodingException e) {
+                // will not happen
+            }            
+        }
+
+        @Override
+        public void onMessage(byte[] data, int offset, int length) {
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.log(Level.FINE, "onMessage({0}, {1}, {2})", new Object[]{data, offset, length});
+            }
+            final byte[] safedata = new byte[length];
+            System.arraycopy(data, offset, safedata, 0, length);
+            invokeService(safedata, 0, safedata.length);
+        }
+        
+        private void invokeService(final byte[] data, final int offset, final int length) {
+            // invoke the service asynchronously as the jetty websocket's onMessage is synchronously blocked
+            // make sure the byte array passed to this method is immutable, as the websocket framework
+            // may corrupt the byte array after this method is returned (i.e., before the data is returned in
+            // the executor's thread.
+            executeServiceTask(new Runnable() {
+                @Override
+                public void run() {
+                    HttpServletRequest request = null;
+                    HttpServletResponse response = null;
+                    try {
+                        response = createServletResponse();
+                        request = createServletRequest(data, offset, length);
+                        String reqid = request.getHeader(requestIdKey);
+                        if (reqid != null) {
+                            response.setHeader(responseIdKey, reqid);
+                        }
+                        invoke(null, null, request, response);
+                    } catch (InvalidPathException ex) { 
+                        reportErrorStatus(response, 400);
+                    } catch (Exception e) {
+                        LOG.log(Level.WARNING, "Failed to invoke service", e);
+                        reportErrorStatus(response, 500);
+                    }
+                }
+            });
+        }
+
+        private void executeServiceTask(Runnable r) {
+            try {
+                executor.execute(r);
+            } catch (RejectedExecutionException e) {
+                LOG.warning(
+                    "Executor queue is full, run the service invocation task in caller thread." 
+                    + "  Users can specify a larger executor queue to avoid this.");
+                r.run();
+            }
+        }
+
+        // may want to move this error reporting code to WebSocketServletHolder
+        private void reportErrorStatus(HttpServletResponse response, int status) {
+            if (response != null) {
+                response.setStatus(status);
+                try {
+                    response.getWriter().write("\r\n");
+                    response.getWriter().close();
+                    response.flushBuffer();
+                } catch (IOException ex) {
+                    throw new RuntimeException(ex);
+                }
+            }
+        }
+        
+        private WebSocketVirtualServletRequest createServletRequest(byte[] data, int offset, int length) 
+            throws IOException {
+            return new WebSocketVirtualServletRequest(webSocketHolder, new ByteArrayInputStream(data, offset, length));
+        }
+
+        private WebSocketVirtualServletResponse createServletResponse() throws IOException {
+            return new WebSocketVirtualServletResponse(webSocketHolder);
+        }
+        
+        /**
+         * Writes to the underlining socket.
+         * 
+         * @param data
+         * @param offset
+         * @param length
+         */
+        void write(byte[] data, int offset, int length) throws IOException {
+            LOG.log(Level.FINE, "write(byte[], offset, length)");
+            webSocketConnection.sendMessage(data, offset, length);
+        }
+    }
+    
+    private static class JettyWebSocketServletHolder implements WebSocketServletHolder {
+        private JettyWebSocket webSocket;
+        private Map<String, Object> requestProperties;
+        private Map<String, Object> requestAttributes;
+        
+        public JettyWebSocketServletHolder(JettyWebSocket webSocket, HttpServletRequest request) {
+            this.webSocket = webSocket;
+            this.requestProperties = readProperties(request);
+            this.requestAttributes = new TreeMap<String, Object>(String.CASE_INSENSITIVE_ORDER);
+            // attributes that are needed for finding the operation in some cases
+            Object v = request.getAttribute("org.apache.cxf.transport.endpoint.address");
+            if (v != null) {
+                requestAttributes.put("org.apache.cxf.transport.endpoint.address", v);
+            }
+        }
+
+        @SuppressWarnings("unchecked")
+        private <T> T getRequestProperty(String name, Class<T> cls) {
+            return (T)requestProperties.get(name);
+        }
+
+        private Map<String, Object> readProperties(HttpServletRequest request) {
+            Map<String, Object> properties = new HashMap<String, Object>();
+            properties.put("servletPath", request.getServletPath());
+            properties.put("requestURI", request.getRequestURI());
+            properties.put("requestURL", request.getRequestURL());
+            properties.put("contextPath", request.getContextPath());
+            properties.put("servletPath", request.getServletPath());
+            properties.put("servletContext", request.getServletContext());
+            properties.put("pathInfo", request.getPathInfo());
+            properties.put("pathTranslated", request.getPathTranslated());
+            properties.put("protocol", request.getProtocol());
+            properties.put("scheme", request.getScheme());
+            // some additional ones
+            properties.put("localAddr", request.getLocalAddr());
+            properties.put("localName", request.getLocalName());
+            properties.put("localPort", request.getLocalPort());
+            properties.put("locale", request.getLocale());
+            properties.put("locales", request.getLocales());
+            properties.put("remoteHost", request.getRemoteHost());
+            properties.put("remotePort", request.getRemotePort());
+            properties.put("remoteAddr", request.getRemoteAddr());
+            properties.put("serverName", request.getServerName());
+            properties.put("serverPort", request.getServerPort());
+            properties.put("secure", request.isSecure());
+            properties.put("authType", request.getAuthType());
+            properties.put("dispatcherType", request.getDispatcherType());
+
+            return properties;
+        }
+
+        @Override
+        public String getAuthType() {
+            return getRequestProperty("authType", String.class);
+        }
+
+        @Override
+        public String getContextPath() {
+            return getRequestProperty("contextPath", String.class);
+        }
+
+        @Override
+        public String getLocalAddr() {
+            return getRequestProperty("LocalAddr", String.class);
+        }
+
+        @Override
+        public String getLocalName() {
+            return getRequestProperty("localName", String.class);
+        }
+
+        @Override
+        public int getLocalPort() {
+            return getRequestProperty("localPort", int.class);
+        }
+
+        @Override
+        public Locale getLocale() {
+            return getRequestProperty("locale", Locale.class);
+        }
+
+        @Override
+        public Enumeration<Locale> getLocales() {
+            return CastUtils.cast(getRequestProperty("locales", Enumeration.class));
+        }
+
+        @Override
+        public String getProtocol() {
+            return getRequestProperty("protocol", String.class);
+        }
+
+        @Override
+        public String getRemoteAddr() {
+            return getRequestProperty("remoteAddr", String.class);
+        }
+
+        @Override
+        public String getRemoteHost() {
+            return getRequestProperty("remoteHost", String.class);
+        }
+
+        @Override
+        public int getRemotePort() {
+            return getRequestProperty("remotePort", int.class);
+        }
+
+        @Override
+        public String getRequestURI() {
+            return getRequestProperty("requestURI", String.class);
+        }
+
+        @Override
+        public StringBuffer getRequestURL() {
+            return getRequestProperty("requestURL", StringBuffer.class);
+        }
+
+        @Override
+        public DispatcherType getDispatcherType() {
+            return getRequestProperty("dispatcherType", DispatcherType.class);
+        }
+
+        @Override
+        public boolean isSecure() {
+            return getRequestProperty("secure", boolean.class);
+        }
+
+        @Override
+        public String getPathInfo() {
+            return getRequestProperty("pathInfo", String.class);
+        }
+
+        @Override
+        public String getPathTranslated() {
+            return getRequestProperty("pathTranslated", String.class);
+        }
+
+        @Override
+        public String getScheme() {
+            return getRequestProperty("scheme", String.class);
+        }
+
+        @Override
+        public String getServerName() {
+            return getRequestProperty("serverName", String.class);
+        }
+
+        @Override
+        public String getServletPath() {
+            return getRequestProperty("servletPath", String.class);
+        }
+
+        @Override
+        public int getServerPort() {
+            return getRequestProperty("serverPort", int.class);
+        }
+
+        @Override
+        public ServletContext getServletContext() {
+            return getRequestProperty("serverContext", ServletContext.class);
+        }
+
+        @Override
+        public Principal getUserPrincipal() {
+            return getRequestProperty("userPrincipal", Principal.class);
+        }
+
+        @Override
+        public Object getAttribute(String name) {
+            return requestAttributes.get(name);
+        }
+
+        @Override
+        public void write(byte[] data, int offset, int length) throws IOException {
+            webSocket.write(data, offset, length);
+        }
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/e3580d7a/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManagerTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManagerTest.java b/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManagerTest.java
deleted file mode 100644
index f595fea..0000000
--- a/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManagerTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.transport.websocket.jetty;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.apache.cxf.workqueue.AutomaticWorkQueue;
-import org.apache.cxf.workqueue.WorkQueueManager;
-import org.easymock.EasyMock;
-import org.easymock.IMocksControl;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * 
- */
-public class JettyWebSocketManagerTest extends Assert {
-    private IMocksControl control;
-
-    @Before
-    public void setUp() {
-        control = EasyMock.createNiceControl();
-    }
-    
-    @Test
-    public void testServiceUsingJettyDestination() throws Exception {
-        JettyWebSocketManager jwsm = new JettyWebSocketManager();
-
-        JettyWebSocketDestination dest = control.createMock(JettyWebSocketDestination.class);
-        setupDestination(dest);
-
-        HttpServletRequest request = control.createMock(HttpServletRequest.class);
-        HttpServletResponse response = control.createMock(HttpServletResponse.class);
-        
-        dest.invokeInternal(EasyMock.isNull(ServletConfig.class), EasyMock.anyObject(ServletContext.class), 
-                    EasyMock.eq(request), EasyMock.eq(response));
-        EasyMock.expectLastCall();
-        
-        control.replay();
-        jwsm.init(dest);
-
-        jwsm.service(request, response);
-        control.verify();
-    }
-
-    @Test
-    public void testServiceUsingServletDestination() throws Exception {
-        JettyWebSocketManager jwsm = new JettyWebSocketManager();
-        
-        JettyWebSocketServletDestination dest = control.createMock(JettyWebSocketServletDestination.class);
-        setupDestination(dest);
-
-        HttpServletRequest request = control.createMock(HttpServletRequest.class);
-        HttpServletResponse response = control.createMock(HttpServletResponse.class);
-        
-        dest.invokeInternal(EasyMock.isNull(ServletConfig.class), EasyMock.anyObject(ServletContext.class), 
-                    EasyMock.eq(request), EasyMock.eq(response));
-        EasyMock.expectLastCall();
-        control.replay();
-        jwsm.init(dest);
-        
-        jwsm.service(request, response);
-        control.verify();
-    }
-
-
-    private void setupDestination(AbstractHTTPDestination dest) {
-        Bus bus = control.createMock(Bus.class);
-        WorkQueueManager wqm = control.createMock(WorkQueueManager.class);
-
-        EasyMock.expect(dest.getBus()).andReturn(bus);
-        EasyMock.expect(bus.getExtension(WorkQueueManager.class)).andReturn(wqm);
-        EasyMock.expect(wqm.getAutomaticWorkQueue()).andReturn(
-            new AutomaticWorkQueue() {
-                @Override
-                public void execute(Runnable work, long timeout) {
-                }
-
-                @Override
-                public void schedule(Runnable work, long delay) {
-                }
-
-                @Override
-                public void execute(Runnable command) {
-                }
-
-                @Override
-                public String getName() {
-                    return null;
-                }
-
-                @Override
-                public void shutdown(boolean processRemainingWorkItems) {
-                }
-
-                @Override
-                public boolean isShutdown() {
-                    return false;
-                }
-            });
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/e3580d7a/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketServletDestinationTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketServletDestinationTest.java b/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketServletDestinationTest.java
deleted file mode 100644
index 9e88d64..0000000
--- a/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketServletDestinationTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.transport.websocket.jetty;
-
-import java.io.IOException;
-
-import javax.xml.namespace.QName;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.bus.extension.ExtensionManagerBus;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.http.DestinationRegistry;
-import org.apache.cxf.transport.http.HTTPTransportFactory;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * 
- */
-public class JettyWebSocketServletDestinationTest extends Assert {
-    private static final String ENDPOINT_ADDRESS = "/websocket/nada";
-    private static final QName ENDPOINT_NAME = new QName("urn:websocket:probe", "nada");
-
-    @Test
-    public void testRegisteration() throws Exception {
-        Bus bus = new ExtensionManagerBus();        
-        DestinationRegistry registry = new HTTPTransportFactory().getRegistry();
-        EndpointInfo endpoint = new EndpointInfo();
-        endpoint.setAddress(ENDPOINT_ADDRESS);
-        endpoint.setName(ENDPOINT_NAME);
-
-        TestJettyWebSocketServletDestination dest = 
-            new TestJettyWebSocketServletDestination(bus, registry, endpoint, ENDPOINT_ADDRESS);
-
-        dest.activate();
-        
-        assertNotNull(registry.getDestinationForPath(ENDPOINT_ADDRESS));
-        
-        dest.deactivate();
-
-        assertNull(registry.getDestinationForPath(ENDPOINT_ADDRESS));
-    }
-    
-    private static class TestJettyWebSocketServletDestination extends JettyWebSocketServletDestination {
-
-        public TestJettyWebSocketServletDestination(Bus bus, DestinationRegistry registry, EndpointInfo ei,
-                                                    String path) throws IOException {
-            super(bus, registry, ei, path);
-        }
-
-        @Override
-        public void activate() {
-            super.activate();
-        }
-
-        @Override
-        public void deactivate() {
-            super.deactivate();
-        }
-    }
-}


Mime
View raw message