cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject cxf git commit: [CXF-6308] Make WebSocket transport's embedded jetty mode to use atmosphere if available
Date Fri, 20 Mar 2015 23:52:02 GMT
Repository: cxf
Updated Branches:
  refs/heads/master 5988e70d4 -> fe4deb5b6


[CXF-6308] Make WebSocket transport's embedded jetty mode to use atmosphere if available


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/fe4deb5b
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/fe4deb5b
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/fe4deb5b

Branch: refs/heads/master
Commit: fe4deb5b67a1b3b595bc43e4176c9c311a08ed2f
Parents: 5988e70
Author: Akitoshi Yoshida <ay@apache.org>
Authored: Sat Mar 21 00:51:31 2015 +0100
Committer: Akitoshi Yoshida <ay@apache.org>
Committed: Sat Mar 21 00:51:46 2015 +0100

----------------------------------------------------------------------
 .../websocket/WebSocketDestinationFactory.java  |  18 +-
 .../websocket/atmosphere/AtmosphereUtils.java   |  41 +++++
 .../AtmosphereWebSocketJettyDestination.java    | 173 +++++++++++++++++++
 .../AtmosphereWebSocketServletDestination.java  |  26 +--
 4 files changed, 239 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/fe4deb5b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
index fbe658f..37b21da 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
@@ -30,6 +30,7 @@ import org.apache.cxf.transport.http.DestinationRegistry;
 import org.apache.cxf.transport.http.HTTPTransportFactory;
 import org.apache.cxf.transport.http.HttpDestinationFactory;
 import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngineFactory;
+import org.apache.cxf.transport.websocket.atmosphere.AtmosphereWebSocketJettyDestination;
 import org.apache.cxf.transport.websocket.atmosphere.AtmosphereWebSocketServletDestination;
 import org.apache.cxf.transport.websocket.jetty.JettyWebSocketDestination;
 import org.apache.cxf.transport.websocket.jetty.JettyWebSocketServletDestination;
@@ -51,13 +52,18 @@ public class WebSocketDestinationFactory implements HttpDestinationFactory
{
     public AbstractHTTPDestination createDestination(EndpointInfo endpointInfo, Bus bus,
                                                      DestinationRegistry registry) throws
IOException {
         if (endpointInfo.getAddress().startsWith("ws")) {
-            // for the embedded mode, we stick with jetty. 
+            // for the embedded mode, we stick to jetty
             JettyHTTPServerEngineFactory serverEngineFactory = bus
                 .getExtension(JettyHTTPServerEngineFactory.class);
-            if (serverEngineFactory.isJetty8()) {
-                return new JettyWebSocketDestination(bus, registry, endpointInfo, serverEngineFactory);
+            if (ATMOSPHERE_AVAILABLE) {
+                // use atmosphere if available
+                return new AtmosphereWebSocketJettyDestination(bus, registry, endpointInfo,
serverEngineFactory);
             } else {
-                return new Jetty9WebSocketDestination(bus, registry, endpointInfo, serverEngineFactory);
+                if (serverEngineFactory.isJetty8()) {
+                    return new JettyWebSocketDestination(bus, registry, endpointInfo, serverEngineFactory);
+                } else {
+                    return new Jetty9WebSocketDestination(bus, registry, endpointInfo, serverEngineFactory);
+                }
             }
         } else {
             //REVISIT other way of getting the registry of http so that the plain cxf servlet
finds the destination?
@@ -65,14 +71,14 @@ public class WebSocketDestinationFactory implements HttpDestinationFactory
{
             
             // choose atmosphere if available, otherwise assume jetty is available
             if (ATMOSPHERE_AVAILABLE) {
-                // use atmosphere
+                // use atmosphere if available
                 return new AtmosphereWebSocketServletDestination(bus, registry,
                                                                  endpointInfo, endpointInfo.getAddress());
             } else {
                 JettyHTTPServerEngineFactory serverEngineFactory = bus
                     .getExtension(JettyHTTPServerEngineFactory.class);
+                // use jetty-websocket
                 if (serverEngineFactory.isJetty8()) { 
-                    // use jetty-websocket
                     return new JettyWebSocketServletDestination(bus, registry,
                                                                 endpointInfo, endpointInfo.getAddress());
                 } else { 

http://git-wip-us.apache.org/repos/asf/cxf/blob/fe4deb5b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java
new file mode 100644
index 0000000..5f35211
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.atmosphere;
+
+import org.apache.cxf.Bus;
+import org.atmosphere.cpr.AtmosphereInterceptor;
+
+/**
+ * 
+ */
+public final class AtmosphereUtils {
+
+    private AtmosphereUtils() {
+    }
+
+    public static AtmosphereInterceptor getInterceptor(Bus bus) {
+        AtmosphereInterceptor ai = (AtmosphereInterceptor)bus.getProperty("atmosphere.interceptor");
+        if (ai == null) {
+            ai = new DefaultProtocolInterceptor(); 
+        }
+        return ai;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/fe4deb5b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java
new file mode 100644
index 0000000..ebc33f6
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.atmosphere;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.DestinationRegistry;
+import org.apache.cxf.transport.http_jetty.JettyHTTPDestination;
+import org.apache.cxf.transport.http_jetty.JettyHTTPHandler;
+import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngineFactory;
+import org.apache.cxf.transport.websocket.WebSocketDestinationService;
+import org.apache.cxf.workqueue.WorkQueueManager;
+import org.atmosphere.cpr.ApplicationConfig;
+import org.atmosphere.cpr.AtmosphereFramework;
+import org.atmosphere.cpr.AtmosphereRequest;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResponse;
+import org.atmosphere.handler.AbstractReflectorAtmosphereHandler;
+import org.atmosphere.util.Utils;
+import org.eclipse.jetty.server.Request;
+
+
+/**
+ * 
+ */
+public class AtmosphereWebSocketJettyDestination extends JettyHTTPDestination implements

+    WebSocketDestinationService {
+    private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketJettyDestination.class);
+    private AtmosphereFramework framework;
+    private Executor executor;
+    
+    public AtmosphereWebSocketJettyDestination(Bus bus, DestinationRegistry registry, EndpointInfo
ei,
+                                     JettyHTTPServerEngineFactory serverEngineFactory) throws
IOException {
+        super(bus, registry, ei, serverEngineFactory);
+        framework = new AtmosphereFramework(false, true);
+        framework.setUseNativeImplementation(false);
+        framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true");
+        framework.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, "true");
+        framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");
+        framework.interceptor(AtmosphereUtils.getInterceptor(bus));
+        framework.addAtmosphereHandler("/", new DestinationHandler());
+        framework.init();
+
+        // the executor for decoupling the service invocation from websocket's onMessage
call which is
+        // synchronously blocked
+        executor = bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue();
+    }
+    
+    @Override
+    public void invokeInternal(ServletConfig config, ServletContext context, HttpServletRequest
req,
+                               HttpServletResponse resp) throws IOException {
+        super.invoke(config, context, req, resp);
+    }
+
+    @Override
+    protected String getAddress(EndpointInfo endpointInfo) {
+        String address = endpointInfo.getAddress();
+        if (address.startsWith("ws")) {
+            address = "http" + address.substring(2);
+        }
+        return address;
+    }
+
+
+    @Override
+    protected String getBasePath(String contextPath) throws IOException {
+        if (StringUtils.isEmpty(endpointInfo.getAddress())) {
+            return "";
+        }
+        return new URL(getAddress(endpointInfo)).getPath();
+    }
+    
+    @Override
+    protected JettyHTTPHandler createJettyHTTPHandler(JettyHTTPDestination jhd, boolean cmExact)
{
+        return new AtmosphereJettyWebSocketHandler(jhd, cmExact);
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            framework.destroy();
+        } catch (Exception e) {
+            // ignore
+        } finally {
+            super.shutdown();
+        }
+    }
+
+    private class AtmosphereJettyWebSocketHandler extends JettyHTTPHandler {
+        public AtmosphereJettyWebSocketHandler(JettyHTTPDestination jhd, boolean cmExact)
{
+            super(jhd, cmExact);
+        }
+        
+        @Override
+        public void handle(String target, Request baseRequest, HttpServletRequest request,
+                           HttpServletResponse response) throws IOException, ServletException
{
+            if (Utils.webSocketEnabled(request)) {
+                try {
+                    framework.doCometSupport(AtmosphereRequest.wrap(request), 
+                                             AtmosphereResponse.wrap(response));
+                    baseRequest.setHandled(true);
+                } catch (ServletException e) {
+                    throw new IOException(e);
+                }
+                return;
+            } else {
+                super.handle(target, baseRequest, request, response);
+            }
+        }
+    }
+
+    private class DestinationHandler extends AbstractReflectorAtmosphereHandler {
+
+        @Override
+        public void onRequest(final AtmosphereResource resource) throws IOException {
+            LOG.fine("onRequest");
+            executeHandlerTask(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        invokeInternal(null, 
+                            resource.getRequest().getServletContext(), resource.getRequest(),
resource.getResponse());
+                    } catch (Exception e) {
+                        LOG.log(Level.WARNING, "Failed to invoke service", e);
+                    }
+                }
+            });
+        }
+    }
+    
+    private void executeHandlerTask(Runnable r) {
+        try {
+            executor.execute(r);
+        } catch (RejectedExecutionException e) {
+            LOG.warning(
+                "Executor queue is full, run the service invocation task in caller thread."

+                + "  Users can specify a larger executor queue to avoid this.");
+            r.run();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/fe4deb5b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
index 7aa4cd3..983ed96 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
@@ -40,7 +40,6 @@ import org.apache.cxf.transport.websocket.WebSocketDestinationService;
 import org.apache.cxf.workqueue.WorkQueueManager;
 import org.atmosphere.cpr.ApplicationConfig;
 import org.atmosphere.cpr.AtmosphereFramework;
-import org.atmosphere.cpr.AtmosphereInterceptor;
 import org.atmosphere.cpr.AtmosphereRequest;
 import org.atmosphere.cpr.AtmosphereResource;
 import org.atmosphere.cpr.AtmosphereResponse;
@@ -60,12 +59,12 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination
im
     public AtmosphereWebSocketServletDestination(Bus bus, DestinationRegistry registry, EndpointInfo
ei, 
                                                  String path) throws IOException {
         super(bus, registry, ei, path);
-        this.framework = new AtmosphereFramework(false, true);
+        framework = new AtmosphereFramework(false, true);
         framework.setUseNativeImplementation(false);
         framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true");
         framework.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, "true");
         framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");
-        framework.interceptor(getInterceptor(bus));
+        framework.interceptor(AtmosphereUtils.getInterceptor(bus));
         framework.addAtmosphereHandler("/", new DestinationHandler());
         framework.init();
 
@@ -99,6 +98,17 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination
im
         return executor;
     }
 
+    @Override
+    public void shutdown() {
+        try {
+            framework.destroy();
+        } catch (Exception e) {
+            // ignore
+        } finally {
+            super.shutdown();
+        }
+    }
+
     private class DestinationHandler extends AbstractReflectorAtmosphereHandler {
 
         @Override
@@ -128,14 +138,4 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination
im
             r.run();
         }
     }
-
-    //FIXME a temporary workaround until we decide how to customize atmosphere using cxf's
destination configuration
-    private AtmosphereInterceptor getInterceptor(Bus bus) {
-        AtmosphereInterceptor ai = (AtmosphereInterceptor)bus.getProperty("atmosphere.interceptor");
-        if (ai == null) {
-            ai = new DefaultProtocolInterceptor(); 
-        }
-        LOG.info("AtmosphereInterceptor: " + ai);
-        return ai;
-    }
 }


Mime
View raw message