Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 32C5617893 for ; Fri, 20 Mar 2015 23:52:03 +0000 (UTC) Received: (qmail 33406 invoked by uid 500); 20 Mar 2015 23:52:03 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 33339 invoked by uid 500); 20 Mar 2015 23:52:03 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 33330 invoked by uid 99); 20 Mar 2015 23:52:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Mar 2015 23:52:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DEC29E112F; Fri, 20 Mar 2015 23:52:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ay@apache.org To: commits@cxf.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: cxf git commit: [CXF-6308] Make WebSocket transport's embedded jetty mode to use atmosphere if available Date: Fri, 20 Mar 2015 23:52:02 +0000 (UTC) Repository: cxf Updated Branches: refs/heads/master 5988e70d4 -> fe4deb5b6 [CXF-6308] Make WebSocket transport's embedded jetty mode to use atmosphere if available Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/fe4deb5b Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/fe4deb5b Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/fe4deb5b Branch: refs/heads/master Commit: fe4deb5b67a1b3b595bc43e4176c9c311a08ed2f Parents: 5988e70 Author: Akitoshi Yoshida Authored: Sat Mar 21 00:51:31 2015 +0100 Committer: Akitoshi Yoshida Committed: Sat Mar 21 00:51:46 2015 +0100 ---------------------------------------------------------------------- .../websocket/WebSocketDestinationFactory.java | 18 +- .../websocket/atmosphere/AtmosphereUtils.java | 41 +++++ .../AtmosphereWebSocketJettyDestination.java | 173 +++++++++++++++++++ .../AtmosphereWebSocketServletDestination.java | 26 +-- 4 files changed, 239 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/fe4deb5b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java index fbe658f..37b21da 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java @@ -30,6 +30,7 @@ import org.apache.cxf.transport.http.DestinationRegistry; import org.apache.cxf.transport.http.HTTPTransportFactory; import org.apache.cxf.transport.http.HttpDestinationFactory; import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngineFactory; +import org.apache.cxf.transport.websocket.atmosphere.AtmosphereWebSocketJettyDestination; import org.apache.cxf.transport.websocket.atmosphere.AtmosphereWebSocketServletDestination; import org.apache.cxf.transport.websocket.jetty.JettyWebSocketDestination; import org.apache.cxf.transport.websocket.jetty.JettyWebSocketServletDestination; @@ -51,13 +52,18 @@ public class WebSocketDestinationFactory implements HttpDestinationFactory { public AbstractHTTPDestination createDestination(EndpointInfo endpointInfo, Bus bus, DestinationRegistry registry) throws IOException { if (endpointInfo.getAddress().startsWith("ws")) { - // for the embedded mode, we stick with jetty. + // for the embedded mode, we stick to jetty JettyHTTPServerEngineFactory serverEngineFactory = bus .getExtension(JettyHTTPServerEngineFactory.class); - if (serverEngineFactory.isJetty8()) { - return new JettyWebSocketDestination(bus, registry, endpointInfo, serverEngineFactory); + if (ATMOSPHERE_AVAILABLE) { + // use atmosphere if available + return new AtmosphereWebSocketJettyDestination(bus, registry, endpointInfo, serverEngineFactory); } else { - return new Jetty9WebSocketDestination(bus, registry, endpointInfo, serverEngineFactory); + if (serverEngineFactory.isJetty8()) { + return new JettyWebSocketDestination(bus, registry, endpointInfo, serverEngineFactory); + } else { + return new Jetty9WebSocketDestination(bus, registry, endpointInfo, serverEngineFactory); + } } } else { //REVISIT other way of getting the registry of http so that the plain cxf servlet finds the destination? @@ -65,14 +71,14 @@ public class WebSocketDestinationFactory implements HttpDestinationFactory { // choose atmosphere if available, otherwise assume jetty is available if (ATMOSPHERE_AVAILABLE) { - // use atmosphere + // use atmosphere if available return new AtmosphereWebSocketServletDestination(bus, registry, endpointInfo, endpointInfo.getAddress()); } else { JettyHTTPServerEngineFactory serverEngineFactory = bus .getExtension(JettyHTTPServerEngineFactory.class); + // use jetty-websocket if (serverEngineFactory.isJetty8()) { - // use jetty-websocket return new JettyWebSocketServletDestination(bus, registry, endpointInfo, endpointInfo.getAddress()); } else { http://git-wip-us.apache.org/repos/asf/cxf/blob/fe4deb5b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java new file mode 100644 index 0000000..5f35211 --- /dev/null +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.websocket.atmosphere; + +import org.apache.cxf.Bus; +import org.atmosphere.cpr.AtmosphereInterceptor; + +/** + * + */ +public final class AtmosphereUtils { + + private AtmosphereUtils() { + } + + public static AtmosphereInterceptor getInterceptor(Bus bus) { + AtmosphereInterceptor ai = (AtmosphereInterceptor)bus.getProperty("atmosphere.interceptor"); + if (ai == null) { + ai = new DefaultProtocolInterceptor(); + } + return ai; + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/fe4deb5b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java new file mode 100644 index 0000000..ebc33f6 --- /dev/null +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.websocket.atmosphere; + +import java.io.IOException; +import java.net.URL; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.cxf.Bus; +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.common.util.StringUtils; +import org.apache.cxf.service.model.EndpointInfo; +import org.apache.cxf.transport.http.DestinationRegistry; +import org.apache.cxf.transport.http_jetty.JettyHTTPDestination; +import org.apache.cxf.transport.http_jetty.JettyHTTPHandler; +import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngineFactory; +import org.apache.cxf.transport.websocket.WebSocketDestinationService; +import org.apache.cxf.workqueue.WorkQueueManager; +import org.atmosphere.cpr.ApplicationConfig; +import org.atmosphere.cpr.AtmosphereFramework; +import org.atmosphere.cpr.AtmosphereRequest; +import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.AtmosphereResponse; +import org.atmosphere.handler.AbstractReflectorAtmosphereHandler; +import org.atmosphere.util.Utils; +import org.eclipse.jetty.server.Request; + + +/** + * + */ +public class AtmosphereWebSocketJettyDestination extends JettyHTTPDestination implements + WebSocketDestinationService { + private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketJettyDestination.class); + private AtmosphereFramework framework; + private Executor executor; + + public AtmosphereWebSocketJettyDestination(Bus bus, DestinationRegistry registry, EndpointInfo ei, + JettyHTTPServerEngineFactory serverEngineFactory) throws IOException { + super(bus, registry, ei, serverEngineFactory); + framework = new AtmosphereFramework(false, true); + framework.setUseNativeImplementation(false); + framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true"); + framework.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, "true"); + framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true"); + framework.interceptor(AtmosphereUtils.getInterceptor(bus)); + framework.addAtmosphereHandler("/", new DestinationHandler()); + framework.init(); + + // the executor for decoupling the service invocation from websocket's onMessage call which is + // synchronously blocked + executor = bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue(); + } + + @Override + public void invokeInternal(ServletConfig config, ServletContext context, HttpServletRequest req, + HttpServletResponse resp) throws IOException { + super.invoke(config, context, req, resp); + } + + @Override + protected String getAddress(EndpointInfo endpointInfo) { + String address = endpointInfo.getAddress(); + if (address.startsWith("ws")) { + address = "http" + address.substring(2); + } + return address; + } + + + @Override + protected String getBasePath(String contextPath) throws IOException { + if (StringUtils.isEmpty(endpointInfo.getAddress())) { + return ""; + } + return new URL(getAddress(endpointInfo)).getPath(); + } + + @Override + protected JettyHTTPHandler createJettyHTTPHandler(JettyHTTPDestination jhd, boolean cmExact) { + return new AtmosphereJettyWebSocketHandler(jhd, cmExact); + } + + @Override + public void shutdown() { + try { + framework.destroy(); + } catch (Exception e) { + // ignore + } finally { + super.shutdown(); + } + } + + private class AtmosphereJettyWebSocketHandler extends JettyHTTPHandler { + public AtmosphereJettyWebSocketHandler(JettyHTTPDestination jhd, boolean cmExact) { + super(jhd, cmExact); + } + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, + HttpServletResponse response) throws IOException, ServletException { + if (Utils.webSocketEnabled(request)) { + try { + framework.doCometSupport(AtmosphereRequest.wrap(request), + AtmosphereResponse.wrap(response)); + baseRequest.setHandled(true); + } catch (ServletException e) { + throw new IOException(e); + } + return; + } else { + super.handle(target, baseRequest, request, response); + } + } + } + + private class DestinationHandler extends AbstractReflectorAtmosphereHandler { + + @Override + public void onRequest(final AtmosphereResource resource) throws IOException { + LOG.fine("onRequest"); + executeHandlerTask(new Runnable() { + @Override + public void run() { + try { + invokeInternal(null, + resource.getRequest().getServletContext(), resource.getRequest(), resource.getResponse()); + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to invoke service", e); + } + } + }); + } + } + + private void executeHandlerTask(Runnable r) { + try { + executor.execute(r); + } catch (RejectedExecutionException e) { + LOG.warning( + "Executor queue is full, run the service invocation task in caller thread." + + " Users can specify a larger executor queue to avoid this."); + r.run(); + } + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/fe4deb5b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java index 7aa4cd3..983ed96 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java @@ -40,7 +40,6 @@ import org.apache.cxf.transport.websocket.WebSocketDestinationService; import org.apache.cxf.workqueue.WorkQueueManager; import org.atmosphere.cpr.ApplicationConfig; import org.atmosphere.cpr.AtmosphereFramework; -import org.atmosphere.cpr.AtmosphereInterceptor; import org.atmosphere.cpr.AtmosphereRequest; import org.atmosphere.cpr.AtmosphereResource; import org.atmosphere.cpr.AtmosphereResponse; @@ -60,12 +59,12 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im public AtmosphereWebSocketServletDestination(Bus bus, DestinationRegistry registry, EndpointInfo ei, String path) throws IOException { super(bus, registry, ei, path); - this.framework = new AtmosphereFramework(false, true); + framework = new AtmosphereFramework(false, true); framework.setUseNativeImplementation(false); framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true"); framework.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, "true"); framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true"); - framework.interceptor(getInterceptor(bus)); + framework.interceptor(AtmosphereUtils.getInterceptor(bus)); framework.addAtmosphereHandler("/", new DestinationHandler()); framework.init(); @@ -99,6 +98,17 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im return executor; } + @Override + public void shutdown() { + try { + framework.destroy(); + } catch (Exception e) { + // ignore + } finally { + super.shutdown(); + } + } + private class DestinationHandler extends AbstractReflectorAtmosphereHandler { @Override @@ -128,14 +138,4 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im r.run(); } } - - //FIXME a temporary workaround until we decide how to customize atmosphere using cxf's destination configuration - private AtmosphereInterceptor getInterceptor(Bus bus) { - AtmosphereInterceptor ai = (AtmosphereInterceptor)bus.getProperty("atmosphere.interceptor"); - if (ai == null) { - ai = new DefaultProtocolInterceptor(); - } - LOG.info("AtmosphereInterceptor: " + ai); - return ai; - } }