Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3E164200C6C for ; Fri, 21 Apr 2017 04:04:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3C809160B9F; Fri, 21 Apr 2017 02:04:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E632A160BB5 for ; Fri, 21 Apr 2017 04:04:43 +0200 (CEST) Received: (qmail 33257 invoked by uid 500); 21 Apr 2017 02:04:43 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 32821 invoked by uid 99); 21 Apr 2017 02:04:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Apr 2017 02:04:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 95539F4A29; Fri, 21 Apr 2017 02:04:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ffang@apache.org To: commits@cxf.apache.org Date: Fri, 21 Apr 2017 02:04:43 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/8] cxf git commit: add websocket undertow transport archived-at: Fri, 21 Apr 2017 02:04:46 -0000 add websocket undertow transport Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/6755b06c Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/6755b06c Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/6755b06c Branch: refs/heads/master Commit: 6755b06ca3eede0132d17633783800f3e3f07484 Parents: 156fd30 Author: Freeman Fang Authored: Thu Apr 13 14:04:55 2017 +0800 Committer: Freeman Fang Committed: Thu Apr 13 14:04:55 2017 +0800 ---------------------------------------------------------------------- .../http_undertow/UndertowHTTPDestination.java | 7 +- .../http_undertow/UndertowHTTPHandler.java | 6 + rt/transports/websocket/pom.xml | 6 + .../websocket/WebSocketDestinationFactory.java | 87 ++- .../AtmosphereWebSocketUndertowDestination.java | 302 ++++++++++ .../undertow/ByteBufferInputStream.java | 51 ++ .../WebSocketUndertowServletRequest.java | 589 +++++++++++++++++++ .../WebSocketUndertowServletResponse.java | 392 ++++++++++++ systests/transport-undertow/pom.xml | 61 ++ .../systest/http_undertow/websocket/Book.java | 123 ++++ .../websocket/BookNotFoundDetails.java | 36 ++ .../websocket/BookNotFoundFault.java | 41 ++ .../websocket/BookServerWebSocket.java | 83 +++ .../websocket/BookStorePerRequest.java | 129 ++++ .../websocket/BookStoreWebSocket.java | 186 ++++++ .../http_undertow/websocket/Chapter.java | 106 ++++ .../JAXRSClientServerWebSocketTest.java | 482 +++++++++++++++ .../JAXRSClientServerWebSocketTest.java.bak | 438 ++++++++++++++ .../http_undertow/websocket/SuperBook.java | 45 ++ .../websocket/SuperBookInterface.java | 23 + .../websocket/WebSocketTestClient.java | 329 +++++++++++ 21 files changed, 3498 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPDestination.java b/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPDestination.java index 81a4e59..b96c1b9 100644 --- a/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPDestination.java +++ b/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPDestination.java @@ -80,9 +80,6 @@ public class UndertowHTTPDestination extends ServletDestination { //Add the default port if the address is missing it super(bus, registry, ei, getAddressValue(ei, true).getAddress(), true); this.serverEngineFactory = serverEngineFactory; - if (serverEngineFactory != null) { - nurl = new URL(getAddress(endpointInfo)); - } loader = bus.getExtension(ClassLoader.class); } @@ -101,6 +98,8 @@ public class UndertowHTTPDestination extends ServletDestination { IOException { if (serverEngineFactory == null) { return; + } else { + nurl = new URL(getAddress(endpointInfo)); } engine = serverEngineFactory.retrieveUndertowHTTPServerEngine(nurl.getPort()); @@ -239,7 +238,7 @@ public class UndertowHTTPDestination extends ServletDestination { } - protected final String getAddress(EndpointInfo endpointInfo) { + protected String getAddress(EndpointInfo endpointInfo) { return endpointInfo.getAddress(); } http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPHandler.java ---------------------------------------------------------------------- diff --git a/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPHandler.java b/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPHandler.java index 2c1914a..ff05c97 100644 --- a/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPHandler.java +++ b/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPHandler.java @@ -66,6 +66,10 @@ public class UndertowHTTPHandler implements HttpHandler { } } + public ServletContext getServletContext() { + return this.servletContext; + } + public void setName(String name) { urlName = name; } @@ -87,6 +91,8 @@ public class UndertowHTTPHandler implements HttpHandler { undertowExchange.dispatch(this); return; } + + HttpServletResponseImpl response = new HttpServletResponseImpl(undertowExchange, (ServletContextImpl)servletContext); HttpServletRequestImpl request = new HttpServletRequestImpl(undertowExchange, http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/websocket/pom.xml ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/pom.xml b/rt/transports/websocket/pom.xml index e14afb1..e10afe7 100644 --- a/rt/transports/websocket/pom.xml +++ b/rt/transports/websocket/pom.xml @@ -94,6 +94,12 @@ true + org.apache.cxf + cxf-rt-transports-http-undertow + ${project.version} + true + + org.springframework spring-core true http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java index 686b89a..bbd6f5a 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java @@ -32,19 +32,25 @@ import org.apache.cxf.transport.http.DestinationRegistry; import org.apache.cxf.transport.http.HTTPTransportFactory; import org.apache.cxf.transport.http.HttpDestinationFactory; import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngineFactory; +import org.apache.cxf.transport.http_undertow.UndertowHTTPServerEngineFactory; import org.apache.cxf.transport.websocket.atmosphere.AtmosphereWebSocketServletDestination; //import org.apache.cxf.transport.websocket.jetty.JettyWebSocketServletDestination; @NoJSR250Annotations() public class WebSocketDestinationFactory implements HttpDestinationFactory { private static final boolean ATMOSPHERE_AVAILABLE = probeClass("org.atmosphere.cpr.ApplicationConfig"); - private static final Constructor JETTY9_WEBSOCKET_DESTINATION_CTR = + private static final boolean JETTY_AVAILABLE = probeClass("org.eclipse.jetty.server.Server"); + private static final boolean UNDERTOW_AVAILABLE = probeClass("io.undertow.websockets.core.WebSockets"); + private static final Constructor JETTY9_WEBSOCKET_DESTINATION_CTR = probeConstructor("org.apache.cxf.transport.websocket.jetty9.Jetty9WebSocketDestination"); - private static final Constructor ATMOSPHERE_WEBSOCKET_JETTY_DESTINATION_CTR = + private static final Constructor ATMOSPHERE_WEBSOCKET_JETTY_DESTINATION_CTR = probeConstructor("org.apache.cxf.transport.websocket.atmosphere.AtmosphereWebSocketJettyDestination"); + private static final Constructor ATMOSPHERE_WEBSOCKET_UNDERTOW_DESTINATION_CTR = + probeUndertowConstructor( + "org.apache.cxf.transport.websocket.atmosphere.AtmosphereWebSocketUndertowDestination"); - private final boolean atmosphereDisabled = Boolean.valueOf( - SystemPropertyAction.getPropertyOrNull("org.apache.cxf.transport.websocket.atmosphere.disabled")); + private final boolean atmosphereDisabled = Boolean.valueOf(SystemPropertyAction + .getPropertyOrNull("org.apache.cxf.transport.websocket.atmosphere.disabled")); private static boolean probeClass(String name) { try { @@ -58,8 +64,18 @@ public class WebSocketDestinationFactory implements HttpDestinationFactory { private static Constructor probeConstructor(String name) { try { Class clz = Class.forName(name, true, WebSocketDestinationFactory.class.getClassLoader()); - return clz.getConstructor(Bus.class, DestinationRegistry.class, - EndpointInfo.class, JettyHTTPServerEngineFactory.class); + return clz.getConstructor(Bus.class, DestinationRegistry.class, EndpointInfo.class, + JettyHTTPServerEngineFactory.class); + } catch (Throwable t) { + return null; + } + } + + private static Constructor probeUndertowConstructor(String name) { + try { + Class clz = Class.forName(name, true, WebSocketDestinationFactory.class.getClassLoader()); + return clz.getConstructor(Bus.class, DestinationRegistry.class, EndpointInfo.class, + UndertowHTTPServerEngineFactory.class); } catch (Throwable t) { return null; } @@ -68,30 +84,44 @@ public class WebSocketDestinationFactory implements HttpDestinationFactory { public AbstractHTTPDestination createDestination(EndpointInfo endpointInfo, Bus bus, DestinationRegistry registry) throws IOException { if (endpointInfo.getAddress().startsWith("ws")) { - // for the embedded mode, we stick to jetty - JettyHTTPServerEngineFactory serverEngineFactory = bus - .getExtension(JettyHTTPServerEngineFactory.class); + if (ATMOSPHERE_AVAILABLE && !atmosphereDisabled) { // use atmosphere if available - return createJettyHTTPDestination(ATMOSPHERE_WEBSOCKET_JETTY_DESTINATION_CTR, - bus, registry, endpointInfo, serverEngineFactory); + if (JETTY_AVAILABLE) { + // for the embedded mode, we stick to jetty + JettyHTTPServerEngineFactory serverEngineFactory = bus + .getExtension(JettyHTTPServerEngineFactory.class); + return createJettyHTTPDestination(ATMOSPHERE_WEBSOCKET_JETTY_DESTINATION_CTR, bus, + registry, endpointInfo, serverEngineFactory); + } else if (UNDERTOW_AVAILABLE) { + // use AtmosphereWebSocketUndertowDestination + UndertowHTTPServerEngineFactory undertowServerEngineFactory = bus + .getExtension(UndertowHTTPServerEngineFactory.class); + return createUndertowHTTPDestination(ATMOSPHERE_WEBSOCKET_UNDERTOW_DESTINATION_CTR, bus, + registry, endpointInfo, undertowServerEngineFactory); + } + return null; } else { - return createJettyHTTPDestination(JETTY9_WEBSOCKET_DESTINATION_CTR, - bus, registry, endpointInfo, serverEngineFactory); + // for the embedded mode, we stick to jetty + JettyHTTPServerEngineFactory serverEngineFactory = bus + .getExtension(JettyHTTPServerEngineFactory.class); + return createJettyHTTPDestination(JETTY9_WEBSOCKET_DESTINATION_CTR, bus, registry, + endpointInfo, serverEngineFactory); } } else { - //REVISIT other way of getting the registry of http so that the plain cxf servlet finds the destination? + // REVISIT other way of getting the registry of http so that the plain cxf servlet finds the + // destination? registry = getDestinationRegistry(bus); // choose atmosphere if available, otherwise assume jetty is available if (ATMOSPHERE_AVAILABLE && !atmosphereDisabled) { // use atmosphere if available - return new AtmosphereWebSocketServletDestination(bus, registry, - endpointInfo, endpointInfo.getAddress()); + return new AtmosphereWebSocketServletDestination(bus, registry, endpointInfo, + endpointInfo.getAddress()); } else { // use jetty-websocket - return createJettyHTTPDestination(JETTY9_WEBSOCKET_DESTINATION_CTR, - bus, registry, endpointInfo, null); + return createJettyHTTPDestination(JETTY9_WEBSOCKET_DESTINATION_CTR, bus, registry, + endpointInfo, null); } } } @@ -113,10 +143,27 @@ public class WebSocketDestinationFactory implements HttpDestinationFactory { private AbstractHTTPDestination createJettyHTTPDestination(Constructor ctr, Bus bus, DestinationRegistry registry, EndpointInfo ei, - JettyHTTPServerEngineFactory jhsef) throws IOException { + JettyHTTPServerEngineFactory jhsef) + throws IOException { + if (ctr != null) { + try { + return (AbstractHTTPDestination)ctr.newInstance(bus, registry, ei, jhsef); + } catch (Throwable t) { + // log + t.printStackTrace(); + } + } + return null; + } + + private AbstractHTTPDestination createUndertowHTTPDestination(Constructor ctr, Bus bus, + DestinationRegistry registry, + EndpointInfo ei, + UndertowHTTPServerEngineFactory jhsef) + throws IOException { if (ctr != null) { try { - return (AbstractHTTPDestination) ctr.newInstance(bus, registry, ei, jhsef); + return (AbstractHTTPDestination)ctr.newInstance(bus, registry, ei, jhsef); } catch (Throwable t) { // log t.printStackTrace(); http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketUndertowDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketUndertowDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketUndertowDestination.java new file mode 100644 index 0000000..4d1f427 --- /dev/null +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketUndertowDestination.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.websocket.atmosphere; + +import java.io.IOException; +import java.net.URL; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.cxf.Bus; +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.common.util.StringUtils; +import org.apache.cxf.service.model.EndpointInfo; +import org.apache.cxf.transport.http.DestinationRegistry; +import org.apache.cxf.transport.http_undertow.UndertowHTTPDestination; +import org.apache.cxf.transport.http_undertow.UndertowHTTPHandler; +import org.apache.cxf.transport.http_undertow.UndertowHTTPServerEngineFactory; +import org.apache.cxf.transport.websocket.WebSocketConstants; +import org.apache.cxf.transport.websocket.WebSocketDestinationService; +import org.apache.cxf.transport.websocket.undertow.WebSocketUndertowServletRequest; +import org.apache.cxf.transport.websocket.undertow.WebSocketUndertowServletResponse; +import org.apache.cxf.workqueue.WorkQueueManager; +import org.atmosphere.cpr.ApplicationConfig; +import org.atmosphere.cpr.AtmosphereFramework; +import org.atmosphere.cpr.AtmosphereRequestImpl; +import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.AtmosphereResponseImpl; +import org.atmosphere.handler.AbstractReflectorAtmosphereHandler; +import org.xnio.StreamConnection; + +import io.undertow.server.HttpServerExchange; +import io.undertow.server.HttpUpgradeListener; +import io.undertow.servlet.handlers.ServletRequestContext; +import io.undertow.servlet.spec.HttpServletRequestImpl; +import io.undertow.servlet.spec.HttpServletResponseImpl; +import io.undertow.servlet.spec.ServletContextImpl; +import io.undertow.util.Methods; +import io.undertow.websockets.core.AbstractReceiveListener; +import io.undertow.websockets.core.BufferedBinaryMessage; +import io.undertow.websockets.core.BufferedTextMessage; +import io.undertow.websockets.core.WebSocketChannel; +import io.undertow.websockets.core.protocol.Handshake; +import io.undertow.websockets.core.protocol.version07.Hybi07Handshake; +import io.undertow.websockets.core.protocol.version08.Hybi08Handshake; +import io.undertow.websockets.core.protocol.version13.Hybi13Handshake; +import io.undertow.websockets.spi.AsyncWebSocketHttpServerExchange; + +/** + * + */ +public class AtmosphereWebSocketUndertowDestination extends UndertowHTTPDestination + implements WebSocketDestinationService { + private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketUndertowDestination.class); + private final Executor executor; + private AtmosphereFramework framework; + + public AtmosphereWebSocketUndertowDestination(Bus bus, DestinationRegistry registry, EndpointInfo ei, + UndertowHTTPServerEngineFactory serverEngineFactory) + throws IOException { + super(bus, registry, ei, serverEngineFactory); + framework = new AtmosphereFramework(false, true); + framework.setUseNativeImplementation(false); + framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true"); + framework.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, "true"); + framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true"); + framework.addInitParameter(ApplicationConfig.WEBSOCKET_PROTOCOL_EXECUTION, "true"); + // workaround for atmosphere's jsr356 initialization requiring servletConfig + framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPRESS_JSR356, "true"); + AtmosphereUtils.addInterceptors(framework, bus); + framework.addAtmosphereHandler("/", new DestinationHandler()); + framework.init(); + executor = bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue(); + } + + @Override + public void invokeInternal(ServletConfig config, ServletContext context, HttpServletRequest req, + HttpServletResponse resp) throws IOException { + super.invoke(config, context, req, resp); + } + + private static String getNonWSAddress(EndpointInfo endpointInfo) { + String address = endpointInfo.getAddress(); + if (address.startsWith("ws")) { + address = "http" + address.substring(2); + } + return address; + } + + @Override + protected String getAddress(EndpointInfo endpointInfo) { + return getNonWSAddress(endpointInfo); + } + + @Override + protected String getBasePath(String contextPath) throws IOException { + if (StringUtils.isEmpty(endpointInfo.getAddress())) { + return ""; + } + return new URL(getAddress(endpointInfo)).getPath(); + } + + @Override + protected UndertowHTTPHandler createUndertowHTTPHandler(UndertowHTTPDestination jhd, boolean cmExact) { + return new AtmosphereUndertowWebSocketHandler(jhd, cmExact); + } + + @Override + public void shutdown() { + try { + framework.destroy(); + } catch (Exception e) { + // ignore + } finally { + super.shutdown(); + } + } + + private class AtmosphereUndertowWebSocketHandler extends UndertowHTTPHandler { + private final Set handshakes; + private final Set peerConnections = Collections + .newSetFromMap(new ConcurrentHashMap()); + + AtmosphereUndertowWebSocketHandler(UndertowHTTPDestination jhd, boolean cmExact) { + super(jhd, cmExact); + handshakes = new HashSet<>(); + handshakes.add(new Hybi13Handshake()); + handshakes.add(new Hybi08Handshake()); + handshakes.add(new Hybi07Handshake()); + } + + @Override + public void handleRequest(HttpServerExchange undertowExchange) throws Exception { + if (undertowExchange.isInIoThread()) { + undertowExchange.dispatch(this); + return; + } + if (!undertowExchange.getRequestMethod().equals(Methods.GET)) { + // Only GET is supported to start the handshake + handleNormalRequest(undertowExchange); + return; + } + final AsyncWebSocketHttpServerExchange facade = new AsyncWebSocketHttpServerExchange(undertowExchange, + peerConnections); + Handshake handshaker = null; + for (Handshake method : handshakes) { + if (method.matches(facade)) { + handshaker = method; + break; + } + } + + if (handshaker == null) { + handleNormalRequest(undertowExchange); + } else { + final Handshake selected = handshaker; + undertowExchange.upgradeChannel(new HttpUpgradeListener() { + @Override + public void handleUpgrade(StreamConnection streamConnection, + HttpServerExchange exchange) { + try { + + WebSocketChannel channel = selected.createChannel(facade, streamConnection, + facade.getBufferPool()); + peerConnections.add(channel); + channel.getReceiveSetter().set(new AbstractReceiveListener() { + @Override + protected void onFullTextMessage(WebSocketChannel channel, + BufferedTextMessage message) { + handleReceivedMessage(channel, message); + + } + + protected void onFullBinaryMessage(WebSocketChannel channel, + BufferedBinaryMessage message) + throws IOException { + + handleReceivedMessage(channel, message); + + } + }); + channel.resumeReceives(); + // handleNormalRequest(undertowExchange); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + }); + handshaker.handshake(facade); + } + + } + + public void handleNormalRequest(HttpServerExchange undertowExchange) throws Exception { + HttpServletResponseImpl response = new HttpServletResponseImpl(undertowExchange, + (ServletContextImpl)servletContext); + HttpServletRequestImpl request = new HttpServletRequestImpl(undertowExchange, + (ServletContextImpl)servletContext); + ServletRequestContext servletRequestContext = new ServletRequestContext(((ServletContextImpl)servletContext) + .getDeployment(), request, response, null); + + undertowExchange.putAttachment(ServletRequestContext.ATTACHMENT_KEY, servletRequestContext); + /* + * if (AtmosphereUtils.useAtmosphere(request)) { try { + * framework.doCometSupport(AtmosphereRequestImpl.wrap(request), + * AtmosphereResponseImpl.wrap(response)); } catch (ServletException e) { throw new + * IOException(e); } return; } else { super.handleRequest(undertowExchange); } + */ + try { + framework.doCometSupport(AtmosphereRequestImpl.wrap(request), + AtmosphereResponseImpl.wrap(response)); + + } catch (ServletException e) { + throw new IOException(e); + } + } + + public void handleNormalRequest(HttpServletRequest request, HttpServletResponse response) + throws Exception { + + // if (AtmosphereUtils.useAtmosphere(request)) { + try { + framework.doCometSupport(AtmosphereRequestImpl.wrap(request), + AtmosphereResponseImpl.wrap(response)); + + } catch (ServletException e) { + throw new IOException(e); + } + // } + } + + private void handleReceivedMessage(WebSocketChannel channel, Object message) { + executor.execute(new Runnable() { + + @Override + public void run() { + try { + HttpServletRequest request = new WebSocketUndertowServletRequest(channel, message); + HttpServletResponse response = new WebSocketUndertowServletResponse(channel); + if (request.getHeader(WebSocketConstants.DEFAULT_REQUEST_ID_KEY) != null) { + response.setHeader(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, + request.getHeader(WebSocketConstants.DEFAULT_REQUEST_ID_KEY)); + } + handleNormalRequest(request, response); + } catch (Exception ex) { + ex.printStackTrace(); + } + + } + + }); + + } + } + + private class DestinationHandler extends AbstractReflectorAtmosphereHandler { + + @Override + public void onRequest(final AtmosphereResource resource) throws IOException { + LOG.fine("onRequest"); + try { + invokeInternal(null, resource.getRequest().getServletContext(), resource.getRequest(), + resource.getResponse()); + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to invoke service", e); + } + } + } + + // used for internal tests + AtmosphereFramework getAtmosphereFramework() { + return framework; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/ByteBufferInputStream.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/ByteBufferInputStream.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/ByteBufferInputStream.java new file mode 100644 index 0000000..f32326f --- /dev/null +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/ByteBufferInputStream.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.transport.websocket.undertow; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +public class ByteBufferInputStream extends InputStream { + + ByteBuffer buf; + + public ByteBufferInputStream(ByteBuffer buf) { + this.buf = buf; + } + + public int read() throws IOException { + if (!buf.hasRemaining()) { + return -1; + } + return buf.get() & 0xFF; + } + + public int read(byte[] bytes, int off, int len) + throws IOException { + if (!buf.hasRemaining()) { + return -1; + } + + len = Math.min(len, buf.remaining()); + buf.get(bytes, off, len); + return len; + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletRequest.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletRequest.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletRequest.java new file mode 100644 index 0000000..0b1d884 --- /dev/null +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletRequest.java @@ -0,0 +1,589 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.websocket.undertow; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.UnsupportedEncodingException; +import java.net.MalformedURLException; +import java.net.URL; +import java.security.Principal; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.servlet.AsyncContext; +import javax.servlet.DispatcherType; +import javax.servlet.ReadListener; +import javax.servlet.RequestDispatcher; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.Cookie; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.HttpSession; +import javax.servlet.http.HttpUpgradeHandler; +import javax.servlet.http.Part; + +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.transport.websocket.WebSocketUtils; + +import io.undertow.websockets.core.BufferedBinaryMessage; +import io.undertow.websockets.core.BufferedTextMessage; +import io.undertow.websockets.core.WebSocketChannel; + +/** + * + */ +public class WebSocketUndertowServletRequest implements HttpServletRequest { + private static final Logger LOG = LogUtils.getL7dLogger(WebSocketUndertowServletRequest.class); + + private WebSocketChannel channel; + private Map requestHeaders; + private Map attributes; + private InputStream in; + + public WebSocketUndertowServletRequest(WebSocketChannel channel, Object message) + throws IOException { + this.channel = channel; + if (message instanceof BufferedBinaryMessage) { + in = new ByteBufferInputStream(((BufferedBinaryMessage)message).getData().getResource()[0]); + } else if (message instanceof BufferedTextMessage) { + in = new ByteArrayInputStream(((BufferedTextMessage)message).getData().getBytes()); + } + this.requestHeaders = WebSocketUtils.readHeaders(in); + /*String path = requestHeaders.get(WebSocketUtils.URI_KEY); + String origin = channel.getUrl(); + path = path.substring(0, path.length() - 10); + if (!path.startsWith(origin)) { + throw new InvalidPathException(); + }*/ + this.attributes = new TreeMap(String.CASE_INSENSITIVE_ORDER); + Object v = channel.getAttribute("org.apache.cxf.transport.endpoint.address"); + if (v != null) { + attributes.put("org.apache.cxf.transport.endpoint.address", v); + } + } + + @Override + public AsyncContext getAsyncContext() { + return null; + } + + @Override + public Object getAttribute(String name) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "getAttribute({0}) -> {1}", new Object[] {name, attributes.get(name)}); + } + return attributes.get(name); + } + + @Override + public Enumeration getAttributeNames() { + LOG.log(Level.FINE, "getAttributeNames()"); + return Collections.enumeration(attributes.keySet()); + } + + @Override + public String getCharacterEncoding() { + // TODO Auto-generated method stub + LOG.log(Level.FINE, "getCharacterEncoding()"); + return null; + } + + @Override + public int getContentLength() { + LOG.log(Level.FINE, "getContentLength()"); + return 0; + } + + @Override + public String getContentType() { + LOG.log(Level.FINE, "getContentType()"); + return requestHeaders.get("Content-Type"); + } + + @Override + public DispatcherType getDispatcherType() { + LOG.log(Level.FINE, "getDispatcherType()"); + return null; + } + + @Override + public ServletInputStream getInputStream() throws IOException { + return new ServletInputStream() { + @Override + public int read() throws IOException { + return in.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + + @Override + public boolean isFinished() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReady() { + throw new UnsupportedOperationException(); + } + + @Override + public void setReadListener(ReadListener arg0) { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public String getLocalAddr() { + LOG.log(Level.FINE, "getLocalAddr()"); + try { + return new URL(channel.getUrl()).getHost(); + } catch (MalformedURLException e) { + e.printStackTrace(); + return null; + } + } + + @Override + public String getLocalName() { + LOG.log(Level.FINE, "getLocalName()"); + try { + return new URL(channel.getUrl()).getHost(); + } catch (MalformedURLException e) { + e.printStackTrace(); + return null; + } + } + + @Override + public int getLocalPort() { + LOG.log(Level.FINE, "getLocalPort()"); + try { + return new URL(channel.getUrl()).getPort(); + } catch (MalformedURLException e) { + e.printStackTrace(); + return 0; + } + } + + @Override + public Locale getLocale() { + LOG.log(Level.FINE, "getLocale()"); + return null; + } + + @Override + public Enumeration getLocales() { + LOG.log(Level.FINE, "getLocales()"); + return null; + } + + @Override + public String getParameter(String name) { + // TODO Auto-generated method stub + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "getParameter({0})", name); + } + return null; + } + + @Override + public Map getParameterMap() { + // TODO Auto-generated method stub + LOG.log(Level.FINE, "getParameterMap()"); + return null; + } + + @Override + public Enumeration getParameterNames() { + // TODO Auto-generated method stub + LOG.log(Level.FINE, "getParameterNames()"); + return null; + } + + @Override + public String[] getParameterValues(String name) { + // TODO Auto-generated method stub + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "getParameterValues({0})", name); + } + return null; + } + + @Override + public String getProtocol() { + LOG.log(Level.FINE, "getProtocol"); + try { + return new URL(channel.getUrl()).getProtocol(); + } catch (MalformedURLException e) { + e.printStackTrace(); + return null; + } + } + + @Override + public BufferedReader getReader() throws IOException { + LOG.log(Level.FINE, "getReader"); + return new BufferedReader(new InputStreamReader(in, "utf-8")); + } + + @Override + public String getRealPath(String path) { + // TODO Auto-generated method stub + LOG.log(Level.FINE, "getRealPath"); + return null; + } + + @Override + public String getRemoteAddr() { + LOG.log(Level.FINE, "getRemoteAddr"); + try { + return new URL(channel.getPeerAddress().toString()).getHost(); + } catch (MalformedURLException e) { + e.printStackTrace(); + return null; + } + } + + @Override + public String getRemoteHost() { + LOG.log(Level.FINE, "getRemoteHost"); + try { + return new URL(channel.getPeerAddress().toString()).getHost(); + } catch (MalformedURLException e) { + e.printStackTrace(); + return null; + } + } + + @Override + public int getRemotePort() { + LOG.log(Level.FINE, "getRemotePort"); + try { + return new URL(channel.getPeerAddress().toString()).getPort(); + } catch (MalformedURLException e) { + e.printStackTrace(); + return 0; + } + } + + @Override + public RequestDispatcher getRequestDispatcher(String path) { + // TODO Auto-generated method stub + LOG.log(Level.FINE, "getRequestDispatcher"); + return null; + } + + @Override + public String getScheme() { + LOG.log(Level.FINE, "getScheme"); + try { + return new URL(channel.getUrl()).getProtocol(); + } catch (MalformedURLException e) { + e.printStackTrace(); + return null; + } + } + + @Override + public String getServerName() { + return getLocalName(); + } + + @Override + public int getServerPort() { + LOG.log(Level.FINE, "getServerPort"); + return getLocalPort(); + } + + @Override + public ServletContext getServletContext() { + LOG.log(Level.FINE, "getServletContext"); + return null; + } + + @Override + public boolean isAsyncStarted() { + LOG.log(Level.FINE, "isAsyncStarted"); + return false; + } + + @Override + public boolean isAsyncSupported() { + LOG.log(Level.FINE, "isAsyncSupported"); + return false; + } + + @Override + public boolean isSecure() { + LOG.log(Level.FINE, "isSecure"); + return false; + } + + @Override + public void removeAttribute(String name) { + LOG.log(Level.FINE, "removeAttribute"); + attributes.remove(name); + } + + @Override + public void setAttribute(String name, Object o) { + LOG.log(Level.FINE, "setAttribute"); + attributes.put(name, o); + } + + @Override + public void setCharacterEncoding(String env) throws UnsupportedEncodingException { + LOG.log(Level.FINE, "setCharacterEncoding"); + // ignore as we stick to utf-8. + } + + @Override + public AsyncContext startAsync() { + LOG.log(Level.FINE, "startAsync"); + return null; + } + + @Override + public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse) { + // TODO Auto-generated method stub + LOG.log(Level.FINE, "startAsync"); + return null; + } + + @Override + public boolean authenticate(HttpServletResponse servletResponse) throws IOException, ServletException { + // TODO Auto-generated method stub + LOG.log(Level.FINE, "authenticate"); + return false; + } + + @Override + public String getAuthType() { + LOG.log(Level.FINE, "getAuthType"); + return "null"; + } + + @Override + public String getContextPath() { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "getContextPath -> " + null); + } + return null; + } + + @Override + public Cookie[] getCookies() { + LOG.log(Level.FINE, "getCookies"); + return null; + } + + @Override + public long getDateHeader(String name) { + LOG.log(Level.FINE, "getDateHeader"); + return 0; + } + + @Override + public String getHeader(String name) { + LOG.log(Level.FINE, "getHeader"); + return requestHeaders.get(name); + } + + @Override + public Enumeration getHeaderNames() { + LOG.log(Level.FINE, "getHeaderNames"); + return Collections.enumeration(requestHeaders.keySet()); + } + + @Override + public Enumeration getHeaders(String name) { + LOG.log(Level.FINE, "getHeaders"); + // our protocol assumes no multiple headers + if (requestHeaders.get(name) != null) { + return Collections.enumeration(Arrays.asList(requestHeaders.get(name))); + } else { + return Collections.enumeration(Arrays.asList()); + } + } + + @Override + public int getIntHeader(String name) { + LOG.log(Level.FINE, "getIntHeader"); + String v = requestHeaders.get(name); + return v == null ? -1 : Integer.parseInt(v); + } + + @Override + public String getMethod() { + LOG.log(Level.FINE, "getMethod"); + return requestHeaders.get(WebSocketUtils.METHOD_KEY); + } + + @Override + public Part getPart(String name) throws IOException, ServletException { + LOG.log(Level.FINE, "getPart"); + return null; + } + + @Override + public Collection getParts() throws IOException, ServletException { + LOG.log(Level.FINE, "getParts"); + return null; + } + + @Override + public String getPathInfo() { + return null; + } + + @Override + public String getPathTranslated() { + return null; + } + + @Override + public String getQueryString() { + LOG.log(Level.FINE, "getQueryString"); + return null; + } + + @Override + public String getRemoteUser() { + LOG.log(Level.FINE, "getRemoteUser"); + return null; + } + + @Override + public String getRequestURI() { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "getRequestURI " + requestHeaders.get(WebSocketUtils.URI_KEY)); + } + return requestHeaders.get(WebSocketUtils.URI_KEY); + } + + @Override + public StringBuffer getRequestURL() { + return new StringBuffer(getRequestURI()); + } + + @Override + public String getRequestedSessionId() { + LOG.log(Level.FINE, "getRequestedSessionId"); + return null; + } + + @Override + public String getServletPath() { + return null; + } + + @Override + public HttpSession getSession() { + LOG.log(Level.FINE, "getSession"); + return null; + } + + @Override + public HttpSession getSession(boolean create) { + LOG.log(Level.FINE, "getSession"); + return null; + } + + @Override + public Principal getUserPrincipal() { + LOG.log(Level.FINE, "getUserPrincipal"); + return null; + } + + @Override + public boolean isRequestedSessionIdFromCookie() { + LOG.log(Level.FINE, "isRequestedSessionIdFromCookie"); + return false; + } + + @Override + public boolean isRequestedSessionIdFromURL() { + LOG.log(Level.FINE, "isRequestedSessionIdFromURL"); + return false; + } + + @Override + public boolean isRequestedSessionIdFromUrl() { + LOG.log(Level.FINE, "isRequestedSessionIdFromUrl"); + return false; + } + + @Override + public boolean isRequestedSessionIdValid() { + LOG.log(Level.FINE, "isRequestedSessionIdValid"); + return false; + } + + @Override + public boolean isUserInRole(String role) { + LOG.log(Level.FINE, "isUserInRole"); + return false; + } + + @Override + public void login(String username, String password) throws ServletException { + LOG.log(Level.FINE, "login"); + + } + + @Override + public void logout() throws ServletException { + LOG.log(Level.FINE, "logout"); + } + + @Override + public long getContentLengthLong() { + throw new UnsupportedOperationException(); + } + + @Override + public String changeSessionId() { + throw new UnsupportedOperationException(); + } + + @Override + public T upgrade(Class arg0) throws IOException, ServletException { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletResponse.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletResponse.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletResponse.java new file mode 100644 index 0000000..5404fc6 --- /dev/null +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletResponse.java @@ -0,0 +1,392 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.websocket.undertow; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.servlet.ServletOutputStream; +import javax.servlet.WriteListener; +import javax.servlet.http.Cookie; +import javax.servlet.http.HttpServletResponse; + +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.transport.websocket.WebSocketConstants; +import org.apache.cxf.transport.websocket.WebSocketUtils; + +import io.undertow.websockets.core.WebSocketChannel; +import io.undertow.websockets.core.WebSockets; + +/** + * + */ +public class WebSocketUndertowServletResponse implements HttpServletResponse { + private static final Logger LOG = LogUtils.getL7dLogger(WebSocketUndertowServletResponse.class); + private WebSocketChannel channel; + private Map responseHeaders; + private ServletOutputStream outputStream; + + public WebSocketUndertowServletResponse(WebSocketChannel channel) { + this.channel = channel; + this.responseHeaders = new TreeMap(String.CASE_INSENSITIVE_ORDER); + this.outputStream = createOutputStream(); + } + + @Override + public void flushBuffer() throws IOException { + LOG.log(Level.FINE, "flushBuffer()"); + outputStream.flush(); + } + + @Override + public int getBufferSize() { + LOG.log(Level.FINE, "getBufferSize()"); + return 0; + } + + @Override + public String getCharacterEncoding() { + LOG.log(Level.FINE, "getCharacterEncoding()"); + return null; + } + + @Override + public String getContentType() { + LOG.log(Level.FINE, "getContentType()"); + return responseHeaders.get("Content-Type"); + } + + @Override + public Locale getLocale() { + LOG.log(Level.FINE, "getLocale"); + return null; + } + + @Override + public ServletOutputStream getOutputStream() throws IOException { + return outputStream; + } + + @Override + public PrintWriter getWriter() throws IOException { + LOG.log(Level.FINE, "getWriter()"); + return new PrintWriter(getOutputStream()); + } + + @Override + public boolean isCommitted() { + return false; + } + + @Override + public void reset() { + } + + @Override + public void resetBuffer() { + LOG.log(Level.FINE, "resetBuffer()"); + } + + @Override + public void setBufferSize(int size) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "setBufferSize({0})", size); + } + } + + @Override + public void setCharacterEncoding(String charset) { + // TODO + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "setCharacterEncoding({0})", charset); + } + } + + @Override + public void setContentLength(int len) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "setContentLength({0})", len); + } + responseHeaders.put("Content-Length", Integer.toString(len)); + } + + @Override + public void setContentType(String type) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "setContentType({0})", type); + } + responseHeaders.put("Content-Type", type); + } + + @Override + public void setLocale(Locale loc) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "setLocale({0})", loc); + } + } + + @Override + public void addCookie(Cookie cookie) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "addCookie({0})", cookie); + } + } + + @Override + public void addDateHeader(String name, long date) { + // TODO + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "addDateHeader({0}, {1})", new Object[]{name, date}); + } + } + + @Override + public void addHeader(String name, String value) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "addHeader({0}, {1})", new Object[]{name, value}); + } + responseHeaders.put(name, value); + } + + @Override + public void addIntHeader(String name, int value) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "addIntHeader({0}, {1})", new Object[]{name, value}); + } + responseHeaders.put(name, Integer.toString(value)); + } + + @Override + public boolean containsHeader(String name) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "containsHeader({0})", name); + } + return responseHeaders.containsKey(name); + } + + @Override + public String encodeRedirectURL(String url) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "encodeRedirectURL({0})", url); + } + return null; + } + + @Override + public String encodeRedirectUrl(String url) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "encodeRedirectUrl({0})", url); + } + return null; + } + + @Override + public String encodeURL(String url) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "encodeURL({0})", url); + } + return null; + } + + @Override + public String encodeUrl(String url) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "encodeUrl({0})", url); + } + return null; + } + + @Override + public String getHeader(String name) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "getHeader({0})", name); + } + return null; + } + + @Override + public Collection getHeaderNames() { + LOG.log(Level.FINE, "getHeaderNames()"); + return null; + } + + @Override + public Collection getHeaders(String name) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "getHeaders({0})", name); + } + return null; + } + + @Override + public int getStatus() { + LOG.log(Level.FINE, "getStatus()"); + String v = responseHeaders.get(WebSocketUtils.SC_KEY); + return v == null ? 200 : Integer.parseInt(v); + } + + @Override + public void sendError(int sc) throws IOException { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "sendError{0}", sc); + } + responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc)); + byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0); + WebSockets.sendText(ByteBuffer.wrap(data), channel, null); + } + + @Override + public void sendError(int sc, String msg) throws IOException { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "sendError({0}, {1})", new Object[]{sc, msg}); + } + responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc)); + byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0); + WebSockets.sendText(ByteBuffer.wrap(data), channel, null); + } + + @Override + public void sendRedirect(String location) throws IOException { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "sendRedirect({0})", location); + } + } + + @Override + public void setDateHeader(String name, long date) { + // ignore + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "setDateHeader({0}, {1})", new Object[]{name, date}); + } + } + + @Override + public void setHeader(String name, String value) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "setHeader({0}, {1})", new Object[]{name, value}); + } + responseHeaders.put(name, value); + } + + @Override + public void setIntHeader(String name, int value) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "setIntHeader({0}, {1})", new Object[]{name, value}); + } + } + + @Override + public void setStatus(int sc) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "setStatus({0})", sc); + } + responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc)); + } + + @Override + public void setStatus(int sc, String sm) { + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "setStatus({0}, {1})", new Object[]{sc, sm}); + } + responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc)); + } + + private ServletOutputStream createOutputStream() { + //REVISIT + // This output buffering is needed as the server side websocket does + // not support the fragment transmission mode when sending back a large data. + // And this buffering is only used for the response for the initial service innovation. + // For the subsequently pushed data to the socket are sent back + // unbuffered as individual websocket messages. + // the things to consider : + // - provide a size limit if we are use this buffering + // - add a chunking mode in the cxf websocket's binding. + //CHECKSTYLE:OFF + return new ServletOutputStream() { + private InternalByteArrayOutputStream buffer = new InternalByteArrayOutputStream(); + + @Override + public void write(int b) throws IOException { + byte[] data = new byte[1]; + data[0] = (byte)b; + write(data, 0, 1); + } + + @Override + public void write(byte[] data) throws IOException { + write(data, 0, data.length); + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) { + // buffer the data until it gets flushed + buffer.write(data, offset, length); + } else { + // unbuffered write to the socket + String respid = responseHeaders.get(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY); + byte[] headers = respid != null + ? WebSocketUtils.buildHeaderLine(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, respid) : null; + data = WebSocketUtils.buildResponse(headers, data, offset, length); + WebSockets.sendText(ByteBuffer.wrap(data), channel, null); + } + } + public void close() throws IOException { + if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) { + byte[] data = WebSocketUtils.buildResponse(responseHeaders, buffer.getBytes(), 0, buffer.size()); + WebSockets.sendText(ByteBuffer.wrap(data), channel, null); + responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true"); + } + super.close(); + } + + @Override + public boolean isReady() { + throw new UnsupportedOperationException(); + } + + @Override + public void setWriteListener(WriteListener arg0) { + throw new UnsupportedOperationException(); + } + }; + //CHECKSTYLE:ON + } + + private static class InternalByteArrayOutputStream extends ByteArrayOutputStream { + public byte[] getBytes() { + return buf; + } + } + + @Override + public void setContentLengthLong(long arg0) { + throw new UnsupportedOperationException(); + + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/systests/transport-undertow/pom.xml ---------------------------------------------------------------------- diff --git a/systests/transport-undertow/pom.xml b/systests/transport-undertow/pom.xml index 84f86e3..a1eb3ea 100644 --- a/systests/transport-undertow/pom.xml +++ b/systests/transport-undertow/pom.xml @@ -107,6 +107,67 @@ org.apache.cxf + cxf-rt-transports-websocket + ${project.version} + + + org.apache.cxf + cxf-rt-frontend-jaxrs + ${project.version} + + + org.apache.cxf + cxf-rt-rs-client + ${project.version} + + + com.ning + async-http-client + ${cxf.ahc.version} + test + + + io.netty + netty + + + + + io.netty + netty + ${cxf.netty3.version} + true + + + org.codehaus.jettison + jettison + test + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + + + com.fasterxml.jackson.core + jackson-annotations + + + org.apache.cxf + cxf-rt-rs-extension-providers + ${project.version} + + + org.atmosphere + atmosphere-runtime + ${cxf.atmosphere.version} + + + javax.websocket + javax.websocket-api + 1.1 + + + org.apache.cxf cxf-rt-transports-http-hc ${project.version} http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/Book.java ---------------------------------------------------------------------- diff --git a/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/Book.java b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/Book.java new file mode 100644 index 0000000..0c9e332 --- /dev/null +++ b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/Book.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.http_undertow.websocket; + +import java.util.HashMap; +import java.util.Map; + +import javax.ws.rs.GET; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlSeeAlso; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeInfo.As; +import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; + +@JsonTypeInfo(use = Id.CLASS, include = As.PROPERTY, property = "class") +@XmlRootElement(name = "Book") +@XmlSeeAlso(SuperBook.class) +public class Book { + private String name; + private long id; + private Map chapters = new HashMap(); + + public Book() { + init(); + } + + public Book(String name, long id) { + this.name = name; + this.id = id; + } + + public void setName(String n) { + name = n; + } + + public String getName() { + return name; + } + + public void setId(long i) { + id = i; + } + public long getId() { + return id; + } + + @PUT + public void cloneState(Book book) { + id = book.getId(); + name = book.getName(); + } + + @GET + public Book retrieveState() { + return this; + } + + @GET + @Path("chapters/{chapterid}/") + @Produces("application/xml;charset=ISO-8859-1") + public Chapter getChapter(@PathParam("chapterid")int chapterid) { + return chapters.get(new Long(chapterid)); + } + + @GET + @Path("chapters/acceptencoding/{chapterid}/") + @Produces("application/xml") + public Chapter getChapterAcceptEncoding(@PathParam("chapterid")int chapterid) { + return chapters.get(new Long(chapterid)); + } + + @GET + @Path("chapters/badencoding/{chapterid}/") + @Produces("application/xml;charset=UTF-48") + public Chapter getChapterBadEncoding(@PathParam("chapterid")int chapterid) { + return chapters.get(new Long(chapterid)); + } + + @Path("chapters/sub/{chapterid}/") + public Chapter getSubChapter(@PathParam("chapterid")int chapterid) { + return chapters.get(new Long(chapterid)); + } + + @Path("chaptersobject/sub/{chapterid}/") + public Object getSubChapterObject(@PathParam("chapterid")int chapterid) { + return getSubChapter(chapterid); + } + + + final void init() { + Chapter c1 = new Chapter(); + c1.setId(1); + c1.setTitle("chapter 1"); + chapters.put(c1.getId(), c1); + Chapter c2 = new Chapter(); + c2.setId(2); + c2.setTitle("chapter 2"); + chapters.put(c2.getId(), c2); + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundDetails.java ---------------------------------------------------------------------- diff --git a/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundDetails.java b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundDetails.java new file mode 100644 index 0000000..826cefd --- /dev/null +++ b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundDetails.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.http_undertow.websocket; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement +public class BookNotFoundDetails { + private long id; + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundFault.java ---------------------------------------------------------------------- diff --git a/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundFault.java b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundFault.java new file mode 100644 index 0000000..e901d86 --- /dev/null +++ b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundFault.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.http_undertow.websocket; + +import javax.xml.ws.WebFault; + +@WebFault +public class BookNotFoundFault extends Exception { + private static final long serialVersionUID = 4833573020359208072L; + private BookNotFoundDetails details; + + public BookNotFoundFault(String errorMessage) { + super(errorMessage); + } + + public BookNotFoundFault(BookNotFoundDetails details) { + super(); + this.details = details; + } + + public BookNotFoundDetails getFaultInfo() { + return details; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookServerWebSocket.java ---------------------------------------------------------------------- diff --git a/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookServerWebSocket.java b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookServerWebSocket.java new file mode 100644 index 0000000..4b8cb00 --- /dev/null +++ b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookServerWebSocket.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.http_undertow.websocket; + +import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; +import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; +import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; +import org.apache.cxf.jaxrs.provider.StreamingResponseProvider; +import org.apache.cxf.testutil.common.AbstractBusTestServerBase; + +public class BookServerWebSocket extends AbstractBusTestServerBase { + public static final String PORT = allocatePort(BookServerWebSocket.class, 1); + public static final String PORT_SPRING = allocatePort(BookServerWebSocket.class, 2); + public static final String PORT_WAR = allocatePort(BookServerWebSocket.class, 3); + public static final String PORT2 = allocatePort(BookServerWebSocket.class, 4); + public static final String PORT2_SPRING = allocatePort(BookServerWebSocket.class, 5); + public static final String PORT2_WAR = allocatePort(BookServerWebSocket.class, 6); + + org.apache.cxf.endpoint.Server server; + + private String port; + + public BookServerWebSocket() { + this(PORT); + } + + public BookServerWebSocket(String port) { + this.port = port; + } + + protected void run() { + Bus bus = BusFactory.getDefaultBus(); + setBus(bus); + JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); + sf.setBus(bus); + sf.setResourceClasses(BookStoreWebSocket.class, BookStorePerRequest.class); + sf.setProvider(new StreamingResponseProvider()); + sf.setResourceProvider(BookStoreWebSocket.class, + new SingletonResourceProvider(new BookStoreWebSocket(), true)); + sf.setAddress("ws://localhost:" + port + "/websocket"); + server = sf.create(); + + BusFactory.setDefaultBus(null); + BusFactory.setThreadDefaultBus(null); + } + + public void tearDown() throws Exception { + server.stop(); + server.destroy(); + server = null; + } + + public static void main(String[] args) { + try { + BookServerWebSocket s = new BookServerWebSocket(); + s.start(); + } catch (Exception ex) { + ex.printStackTrace(); + System.exit(-1); + } finally { + System.out.println("done!"); + } + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookStorePerRequest.java ---------------------------------------------------------------------- diff --git a/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookStorePerRequest.java b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookStorePerRequest.java new file mode 100644 index 0000000..67b4d55 --- /dev/null +++ b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookStorePerRequest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.http_undertow.websocket; + + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.ws.rs.ClientErrorException; +import javax.ws.rs.GET; +import javax.ws.rs.HeaderParam; +import javax.ws.rs.Path; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.Response; + +@Path("/bookstore2") +public class BookStorePerRequest { + + private HttpHeaders httpHeaders; + private Map books = new HashMap(); + private List bookIds; + private List setterBookIds; + + public BookStorePerRequest() { + throw new RuntimeException(); + } + + public BookStorePerRequest(@Context HttpHeaders headers) { + throw new RuntimeException(); + } + + public BookStorePerRequest(@Context HttpHeaders headers, Long bar) { + throw new RuntimeException(); + } + + public BookStorePerRequest(@Context HttpHeaders headers, + @HeaderParam("BOOK") List bookIds) { + if (!bookIds.contains("3")) { + throw new ClientErrorException(Response.status(400).type("text/plain") + .entity("Constructor: Header value 3 is required").build()); + } + httpHeaders = headers; + this.bookIds = bookIds; + init(); + } + + @HeaderParam("Book") + public void setBook(List ids) { + if (!ids.equals(bookIds) || ids.size() != 3) { + throw new ClientErrorException(Response.status(400).type("text/plain") + .entity("Param setter: 3 header values are required").build()); + } + setterBookIds = ids; + } + + @Context + public void setHttpHeaders(HttpHeaders headers) { + List ids = httpHeaders.getRequestHeader("BOOK"); + if (ids.contains("4")) { + throw new ClientErrorException(Response.status(400).type("text/plain") + .entity("Context setter: unexpected header value").build()); + } + } + + @GET + @Path("/book%20headers/") + public Book getBookByHeader2() throws Exception { + return getBookByHeader(); + } + + @GET + @Path("/bookheaders/") + public Book getBookByHeader() throws Exception { + + List ids = httpHeaders.getRequestHeader("BOOK"); + if (!ids.equals(bookIds)) { + throw new RuntimeException(); + } + return doGetBook(ids.get(0) + ids.get(1) + ids.get(2)); + } + + @GET + @Path("/bookheaders/injected") + public Book getBookByHeaderInjected() throws Exception { + + return doGetBook(setterBookIds.get(0) + setterBookIds.get(1) + setterBookIds.get(2)); + } + + private Book doGetBook(String id) throws BookNotFoundFault { + Book book = books.get(Long.parseLong(id)); + if (book != null) { + return book; + } else { + BookNotFoundDetails details = new BookNotFoundDetails(); + details.setId(Long.parseLong(id)); + throw new BookNotFoundFault(details); + } + } + + + final void init() { + Book book = new Book(); + book.setId(123); + book.setName("CXF in Action"); + books.put(book.getId(), book); + } + +} + +