Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8DE93CD17 for ; Wed, 12 Mar 2014 19:09:02 +0000 (UTC) Received: (qmail 47640 invoked by uid 500); 12 Mar 2014 19:08:55 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 47095 invoked by uid 500); 12 Mar 2014 19:08:51 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 46735 invoked by uid 99); 12 Mar 2014 19:08:49 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Mar 2014 19:08:49 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 958E0942796; Wed, 12 Mar 2014 19:08:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ay@apache.org To: commits@cxf.apache.org Date: Wed, 12 Mar 2014 19:08:49 -0000 Message-Id: <8d181f390b13498eb3e548fee0452166@git.apache.org> In-Reply-To: <3c9e05ab87f14f02b89ab2a20f54034c@git.apache.org> References: <3c9e05ab87f14f02b89ab2a20f54034c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: [CXF-5604] Add atmosphere based implementation in websocket transport [CXF-5604] Add atmosphere based implementation in websocket transport Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/2d8264af Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/2d8264af Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/2d8264af Branch: refs/heads/master Commit: 2d8264aff8c1b26d60f6d4dee3bcb572927cc81a Parents: 6069d72 Author: Akitoshi Yoshida Authored: Wed Mar 12 20:05:38 2014 +0100 Committer: Akitoshi Yoshida Committed: Wed Mar 12 20:08:11 2014 +0100 ---------------------------------------------------------------------- parent/pom.xml | 1 + rt/transports/websocket/pom.xml | 7 +- .../websocket/WebSocketDestinationFactory.java | 29 +- .../websocket/WebSocketDestinationService.java | 36 + .../websocket/WebSocketServletHolder.java | 58 ++ .../cxf/transport/websocket/WebSocketUtils.java | 144 +++ .../WebSocketVirtualServletRequest.java | 532 ++++++++++ .../WebSocketVirtualServletResponse.java | 379 +++++++ .../atmosphere/AtmosphereWebSocketHandler.java | 264 +++++ .../atmosphere/AtmosphereWebSocketHandler.java~ | 293 ++++++ .../AtmosphereWebSocketServletDestination.java | 87 ++ .../AtmosphereWebSocketStreamHandler.java | 73 ++ .../websocket/jetty/JettyWebSocket.java | 980 ++----------------- .../jetty/JettyWebSocketDestination.java | 13 +- .../websocket/jetty/JettyWebSocketHandler.java | 1 - .../websocket/jetty/JettyWebSocketManager.java | 3 +- .../jetty/JettyWebSocketServletDestination.java | 10 +- .../jetty/JettyWebSocketManagerTest.java | 10 +- 18 files changed, 2008 insertions(+), 912 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index b250214..32fb08f 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -74,6 +74,7 @@ 1.1.3 5.9.0 1.8.1 + 2.1.0 1.2.14 1.50 2.2_2 http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/pom.xml ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/pom.xml b/rt/transports/websocket/pom.xml index 346c32e..616fd25 100644 --- a/rt/transports/websocket/pom.xml +++ b/rt/transports/websocket/pom.xml @@ -47,7 +47,6 @@ - org.apache.aries.blueprint.NamespaceHandler;osgi.service.blueprint.namespace="http://cxf.apache.org/transports/websocket/configuration" @@ -120,6 +119,12 @@ true + org.atmosphere + atmosphere-runtime + ${cxf.atmosphere.version} + true + + org.slf4j slf4j-api runtime http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java index 4c6fdf1..14b5f8f 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java @@ -30,24 +30,44 @@ import org.apache.cxf.transport.http.DestinationRegistry; import org.apache.cxf.transport.http.HTTPTransportFactory; import org.apache.cxf.transport.http.HttpDestinationFactory; import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngineFactory; +import org.apache.cxf.transport.websocket.atmosphere.AtmosphereWebSocketServletDestination; import org.apache.cxf.transport.websocket.jetty.JettyWebSocketDestination; import org.apache.cxf.transport.websocket.jetty.JettyWebSocketServletDestination; @NoJSR250Annotations() public class WebSocketDestinationFactory implements HttpDestinationFactory { - + private static final boolean ATMOSPHERE_AVAILABLE = probeClass("org.atmosphere.cpr.ApplicationConfig"); + + private static boolean probeClass(String name) { + try { + Class.forName(name, true, WebSocketDestinationFactory.class.getClassLoader()); + return true; + } catch (Throwable t) { + return false; + } + } + public AbstractHTTPDestination createDestination(EndpointInfo endpointInfo, Bus bus, DestinationRegistry registry) throws IOException { - if (endpointInfo.getAddress().startsWith("ws")) { - //TODO for now jetty specific, + // for the embedded mode, we stick with jetty. JettyHTTPServerEngineFactory serverEngineFactory = bus .getExtension(JettyHTTPServerEngineFactory.class); return new JettyWebSocketDestination(bus, registry, endpointInfo, serverEngineFactory); } else { //REVISIT other way of getting the registry of http so that the plain cxf servlet finds the destination? registry = getDestinationRegistry(bus); - return new JettyWebSocketServletDestination(bus, registry, endpointInfo, endpointInfo.getAddress()); + + // choose atmosphere if available, otherwise assume jetty is available + if (ATMOSPHERE_AVAILABLE) { + // use atmosphere + return new AtmosphereWebSocketServletDestination(bus, registry, + endpointInfo, endpointInfo.getAddress()); + } else { + // use jetty-websocket + return new JettyWebSocketServletDestination(bus, registry, + endpointInfo, endpointInfo.getAddress()); + } } } @@ -65,5 +85,6 @@ public class WebSocketDestinationFactory implements HttpDestinationFactory { } return null; } + } http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationService.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationService.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationService.java new file mode 100644 index 0000000..701bc00 --- /dev/null +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationService.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.websocket; + +import java.io.IOException; + +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * + */ +public interface WebSocketDestinationService { + void invokeInternal(ServletConfig config, ServletContext context, + HttpServletRequest req, HttpServletResponse resp) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java new file mode 100644 index 0000000..bd4dea9 --- /dev/null +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.websocket; + +import java.io.IOException; +import java.security.Principal; +import java.util.Enumeration; +import java.util.Locale; + +import javax.servlet.DispatcherType; +import javax.servlet.ServletContext; + +/** + * + */ +public interface WebSocketServletHolder { + String getAuthType(); + String getContextPath(); + String getLocalAddr(); + String getLocalName(); + int getLocalPort(); + Locale getLocale(); + Enumeration getLocales(); + String getProtocol(); + String getRemoteAddr(); + String getRemoteHost(); + int getRemotePort(); + String getRequestURI(); + StringBuffer getRequestURL(); + DispatcherType getDispatcherType(); + boolean isSecure(); + String getPathInfo(); + String getPathTranslated(); + String getScheme(); + String getServerName(); + String getServletPath(); + ServletContext getServletContext(); + int getServerPort(); + Principal getUserPrincipal(); + void write(byte[] data, int offset, int length) throws IOException; +} http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java new file mode 100644 index 0000000..188caf5 --- /dev/null +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.websocket; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.TreeMap; + +/** + * + */ +public final class WebSocketUtils { + static final String URI_KEY = "$uri"; + static final String METHOD_KEY = "$method"; + static final String SC_KEY = "$sc"; + static final String SM_KEY = "$sm"; + static final String FLUSHED_KEY = "$flushed"; + private static final String CRLF = "\r\n"; + private static final String DEFAULT_SC = "200"; + + private WebSocketUtils() { + } + + /** + * Read header properties from the specified input stream. + * + * Only a restricted syntax is allowed as the syntax is in our control. + * Not allowed are: + * - multiline or line-wrapped headers are not not + * - charset other than utf-8. (although i would have preferred iso-8859-1 ;-) + * + * @param in the input stream + * @return a map of name value pairs. + * @throws IOException + */ + public static Map readHeaders(InputStream in) throws IOException { + Map headers = new TreeMap(String.CASE_INSENSITIVE_ORDER); + // read the request line + String line = readLine(in); + int del = line.indexOf(' '); + if (del < 0) { + throw new IOException("invalid request: " + line); + } + headers.put(METHOD_KEY, line.substring(0, del).trim()); + headers.put(URI_KEY, line.substring(del + 1).trim()); + + // read headers + while ((line = readLine(in)) != null) { + if (line.length() > 0) { + del = line.indexOf(':'); + if (del < 0) { + headers.put(line.trim(), ""); + } else { + headers.put(line.substring(0, del).trim(), line.substring(del + 1).trim()); + } + } + } + + return headers; + } + + + /** + * Read a line terminated by '\n' optionally preceded by '\r' from the + * specified input stream. + * @param in the input stream + * @return + * @throws IOException + */ + // this is copied from AttachmentDeserializer with a minor change to restrict the line termination rule. + public static String readLine(InputStream in) throws IOException { + StringBuilder buffer = new StringBuilder(128); + + int c; + + while ((c = in.read()) != -1) { + // a linefeed is a terminator, always. + if (c == '\n') { + break; + } else if (c == '\r') { + //just ignore the CR. The next character SHOULD be an NL. If not, we're + //just going to discard this + continue; + } else { + // just add to the buffer + buffer.append((char)c); + } + } + + // no characters found...this was either an eof or a null line. + if (buffer.length() == 0) { + return null; + } + + return buffer.toString(); + } + + public static byte[] buildResponse(Map headers, byte[] data, int offset, int length) { + StringBuilder sb = new StringBuilder(); + String v = headers.get(SC_KEY); + sb.append(v == null ? DEFAULT_SC : v).append(CRLF); + v = headers.get("Content-Type"); + if (v != null) { + sb.append("Content-Type: ").append(v).append(CRLF); + } + sb.append(CRLF); + + byte[] hb = sb.toString().getBytes(); + byte[] longdata = new byte[hb.length + length]; + System.arraycopy(hb, 0, longdata, 0, hb.length); + if (data != null && length > 0) { + System.arraycopy(data, offset, longdata, hb.length, length); + } + return longdata; + } + + public static byte[] buildResponse(byte[] data, int offset, int length) { + byte[] longdata = new byte[length + 2]; + longdata[0] = 0x0d; + longdata[1] = 0x0a; + System.arraycopy(data, offset, longdata, 2, length); + return longdata; + } + + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java new file mode 100644 index 0000000..c9d9e94 --- /dev/null +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java @@ -0,0 +1,532 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.websocket; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.UnsupportedEncodingException; +import java.security.Principal; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.servlet.AsyncContext; +import javax.servlet.DispatcherType; +import javax.servlet.RequestDispatcher; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.Cookie; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.HttpSession; +import javax.servlet.http.Part; + +import org.apache.cxf.common.logging.LogUtils; + +/** + * + */ +public class WebSocketVirtualServletRequest implements HttpServletRequest { + private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletRequest.class); + + private WebSocketServletHolder webSocketHolder; + private InputStream in; + private Map requestHeaders; + private Map attributes; + + public WebSocketVirtualServletRequest(WebSocketServletHolder websocket, InputStream in) + throws IOException { + this.webSocketHolder = websocket; + this.in = in; + + this.requestHeaders = WebSocketUtils.readHeaders(in); + String path = requestHeaders.get(WebSocketUtils.URI_KEY); + String origin = websocket.getRequestURI(); + if (!path.startsWith(origin)) { + //REVISIT for now, log it here and reject the request later. + LOG.log(Level.WARNING, "invalid path: {0} not within {1}", new Object[]{path, origin}); + } + this.attributes = new TreeMap(String.CASE_INSENSITIVE_ORDER); + } + + @Override + public AsyncContext getAsyncContext() { + return null; + } + + @Override + public Object getAttribute(String name) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "getAttribute({0})", name); + } + return attributes.get(name); + } + + @Override + public Enumeration getAttributeNames() { + LOG.log(Level.INFO, "getAttributeNames()"); + return Collections.enumeration(attributes.keySet()); + } + + @Override + public String getCharacterEncoding() { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "getCharacterEncoding()"); + return null; + } + + @Override + public int getContentLength() { + LOG.log(Level.INFO, "getContentLength()"); + return 0; + } + + @Override + public String getContentType() { + LOG.log(Level.INFO, "getContentType()"); + return requestHeaders.get("Content-Type"); + } + + @Override + public DispatcherType getDispatcherType() { + LOG.log(Level.INFO, "getDispatcherType()"); + return webSocketHolder.getDispatcherType(); + } + + @Override + public ServletInputStream getInputStream() throws IOException { + return new ServletInputStream() { + @Override + public int read() throws IOException { + return in.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + }; + } + + @Override + public String getLocalAddr() { + LOG.log(Level.INFO, "getLocalAddr()"); + return webSocketHolder.getLocalAddr(); + } + + @Override + public String getLocalName() { + LOG.log(Level.INFO, "getLocalName()"); + return webSocketHolder.getLocalName(); + } + + @Override + public int getLocalPort() { + LOG.log(Level.INFO, "getLocalPort()"); + return webSocketHolder.getLocalPort(); + } + + @Override + public Locale getLocale() { + LOG.log(Level.INFO, "getLocale()"); + return webSocketHolder.getLocale(); + } + + @Override + public Enumeration getLocales() { + LOG.log(Level.INFO, "getLocales()"); + return webSocketHolder.getLocales(); + } + + @Override + public String getParameter(String name) { + // TODO Auto-generated method stub + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "getParameter({0})", name); + } + return null; + } + + @Override + public Map getParameterMap() { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "getParameterMap()"); + return null; + } + + @Override + public Enumeration getParameterNames() { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "getParameterNames()"); + return null; + } + + @Override + public String[] getParameterValues(String name) { + // TODO Auto-generated method stub + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "getParameterValues({0})", name); + } + return null; + } + + @Override + public String getProtocol() { + LOG.log(Level.INFO, "getProtocol"); + return webSocketHolder.getProtocol(); + } + + @Override + public BufferedReader getReader() throws IOException { + LOG.log(Level.INFO, "getReader"); + return new BufferedReader(new InputStreamReader(in, "utf-8")); + } + + @Override + public String getRealPath(String path) { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "getRealPath"); + return null; + } + + @Override + public String getRemoteAddr() { + LOG.log(Level.INFO, "getRemoteAddr"); + return webSocketHolder.getRemoteAddr(); + } + + @Override + public String getRemoteHost() { + LOG.log(Level.INFO, "getRemoteHost"); + return webSocketHolder.getRemoteHost(); + } + + @Override + public int getRemotePort() { + LOG.log(Level.INFO, "getRemotePort"); + return webSocketHolder.getRemotePort(); + } + + @Override + public RequestDispatcher getRequestDispatcher(String path) { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "getRequestDispatcher"); + return null; + } + + @Override + public String getScheme() { + LOG.log(Level.INFO, "getScheme"); + return webSocketHolder.getScheme(); + } + + @Override + public String getServerName() { + return webSocketHolder.getServerName(); + } + + @Override + public int getServerPort() { + LOG.log(Level.INFO, "getServerPort"); + return webSocketHolder.getServerPort(); + } + + @Override + public ServletContext getServletContext() { + LOG.log(Level.INFO, "getServletContext"); + return webSocketHolder.getServletContext(); + } + + @Override + public boolean isAsyncStarted() { + LOG.log(Level.INFO, "isAsyncStarted"); + return false; + } + + @Override + public boolean isAsyncSupported() { + LOG.log(Level.INFO, "isAsyncSupported"); + return false; + } + + @Override + public boolean isSecure() { + LOG.log(Level.INFO, "isSecure"); + return webSocketHolder.isSecure(); + } + + @Override + public void removeAttribute(String name) { + LOG.log(Level.INFO, "removeAttribute"); + attributes.remove(name); + } + + @Override + public void setAttribute(String name, Object o) { + LOG.log(Level.INFO, "setAttribute"); + attributes.put(name, o); + } + + @Override + public void setCharacterEncoding(String env) throws UnsupportedEncodingException { + LOG.log(Level.INFO, "setCharacterEncoding"); + // ignore as we stick to utf-8. + } + + @Override + public AsyncContext startAsync() { + LOG.log(Level.INFO, "startAsync"); + return null; + } + + @Override + public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse) { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "startAsync"); + return null; + } + + @Override + public boolean authenticate(HttpServletResponse servletResponse) throws IOException, ServletException { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "authenticate"); + return false; + } + + @Override + public String getAuthType() { + LOG.log(Level.INFO, "getAuthType"); + return webSocketHolder.getAuthType(); + } + + @Override + public String getContextPath() { + LOG.log(Level.INFO, "getContextPath"); + return webSocketHolder.getContextPath(); + } + + @Override + public Cookie[] getCookies() { + LOG.log(Level.INFO, "getCookies"); + return null; + } + + @Override + public long getDateHeader(String name) { + LOG.log(Level.INFO, "getDateHeader"); + return 0; + } + + @Override + public String getHeader(String name) { + LOG.log(Level.INFO, "getHeader"); + return requestHeaders.get(name); + } + + @Override + public Enumeration getHeaderNames() { + LOG.log(Level.INFO, "getHeaderNames"); + return Collections.enumeration(requestHeaders.keySet()); + } + + @Override + public Enumeration getHeaders(String name) { + LOG.log(Level.INFO, "getHeaders"); + // our protocol assumes no multiple headers + return Collections.enumeration(Arrays.asList(requestHeaders.get(name))); + } + + @Override + public int getIntHeader(String name) { + LOG.log(Level.INFO, "getIntHeader"); + String v = requestHeaders.get(name); + return v == null ? -1 : Integer.parseInt(v); + } + + @Override + public String getMethod() { + LOG.log(Level.INFO, "getMethod"); + return requestHeaders.get(WebSocketUtils.METHOD_KEY); + } + + @Override + public Part getPart(String name) throws IOException, ServletException { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "getPart"); + return null; + } + + @Override + public Collection getParts() throws IOException, ServletException { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "getParts"); + return null; + } + + @Override + public String getPathInfo() { + LOG.log(Level.INFO, "getPathInfo"); + String uri = requestHeaders.get(WebSocketUtils.URI_KEY); + String servletpath = webSocketHolder.getServletPath(); + //TODO remove the query string part + //REVISIT may cache this value in requstHeaders? + return uri.substring(servletpath.length()); + } + + @Override + public String getPathTranslated() { + LOG.log(Level.INFO, "getPathTranslated"); + String path = getPathInfo(); + String opathtrans = webSocketHolder.getPathTranslated(); + String opathinfo = webSocketHolder.getPathInfo(); + int pos = opathtrans.indexOf(opathinfo); + //REVISIT may cache this value in requstHeaders? + return new StringBuilder().append(opathtrans.substring(0, pos)).append(path).toString(); + } + + @Override + public String getQueryString() { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "getQueryString"); + return null; + } + + @Override + public String getRemoteUser() { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "getRemoteUser"); + return null; + } + + @Override + public String getRequestURI() { + LOG.log(Level.INFO, "getRequestURI"); + return requestHeaders.get(WebSocketUtils.URI_KEY); + } + + @Override + public StringBuffer getRequestURL() { + LOG.log(Level.INFO, "getRequestURL"); + StringBuffer sb = webSocketHolder.getRequestURL(); + String ouri = webSocketHolder.getRequestURI(); + String uri = getRequestURI(); + //REVISIT the way to reject the requeist uri that does not match the original request + if (!uri.startsWith(ouri)) { + sb.append("invalid").append(uri); + } else { + sb.append(uri.substring(ouri.length())); + } + + return sb; + } + + @Override + public String getRequestedSessionId() { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "getRequestedSessionId"); + return null; + } + + @Override + public String getServletPath() { + LOG.log(Level.INFO, "getServletPath"); + return webSocketHolder.getServletPath(); + } + + @Override + public HttpSession getSession() { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "getSession"); + return null; + } + + @Override + public HttpSession getSession(boolean create) { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "getSession"); + return null; + } + + @Override + public Principal getUserPrincipal() { + LOG.log(Level.INFO, "getUserPrincipal"); + return webSocketHolder.getUserPrincipal(); + } + + @Override + public boolean isRequestedSessionIdFromCookie() { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "isRequestedSessionIdFromCookie"); + return false; + } + + @Override + public boolean isRequestedSessionIdFromURL() { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "isRequestedSessionIdFromURL"); + return false; + } + + @Override + public boolean isRequestedSessionIdFromUrl() { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "isRequestedSessionIdFromUrl"); + return false; + } + + @Override + public boolean isRequestedSessionIdValid() { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "isRequestedSessionIdValid"); + return false; + } + + @Override + public boolean isUserInRole(String role) { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "isUserInRole"); + return false; + } + + @Override + public void login(String username, String password) throws ServletException { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "login"); + + } + + @Override + public void logout() throws ServletException { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "logout"); + + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java new file mode 100644 index 0000000..7693e02 --- /dev/null +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.websocket; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.util.Collection; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.servlet.ServletOutputStream; +import javax.servlet.http.Cookie; +import javax.servlet.http.HttpServletResponse; + +import org.apache.cxf.common.logging.LogUtils; + +/** + * + */ +public class WebSocketVirtualServletResponse implements HttpServletResponse { + private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletResponse.class); + private WebSocketServletHolder webSocketHolder; + private Map responseHeaders; + private boolean flushed; + + public WebSocketVirtualServletResponse(WebSocketServletHolder websocket) { + this.webSocketHolder = websocket; + this.responseHeaders = new TreeMap(String.CASE_INSENSITIVE_ORDER); + } + + @Override + public void flushBuffer() throws IOException { + LOG.log(Level.INFO, "flushBuffer()"); + if (!flushed) { + //REVISIT this mechanism to determine if the headers have been flushed + if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) { + byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0); + webSocketHolder.write(data, 0, data.length); + responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true"); + } + flushed = true; + } + } + + @Override + public int getBufferSize() { + LOG.log(Level.INFO, "getBufferSize()"); + return 0; + } + + @Override + public String getCharacterEncoding() { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "getCharacterEncoding()"); + return null; + } + + @Override + public String getContentType() { + LOG.log(Level.INFO, "getContentType()"); + return responseHeaders.get("Content-Type"); + } + + @Override + public Locale getLocale() { + // TODO Auto-generated method stub + LOG.log(Level.INFO, "getLocale"); + return null; + } + + @Override + public ServletOutputStream getOutputStream() throws IOException { + return new ServletOutputStream() { + + @Override + public void write(int b) throws IOException { + byte[] data = new byte[1]; + data[0] = (byte)b; + write(data, 0, 1); + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) { + data = WebSocketUtils.buildResponse(responseHeaders, data, offset, length); + responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true"); + } else { + data = WebSocketUtils.buildResponse(data, offset, length); + } + webSocketHolder.write(data, 0, data.length); + } + + @Override + public void close() throws IOException { + if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) { + byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0); + webSocketHolder.write(data, 0, data.length); + responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true"); + } + super.close(); + } + + }; + } + + @Override + public PrintWriter getWriter() throws IOException { + LOG.log(Level.INFO, "getWriter()"); + return new PrintWriter(new OutputStream() { + + @Override + public void write(int b) throws IOException { + byte[] data = new byte[1]; + data[0] = (byte)b; + write(data, 0, 1); + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) { + data = WebSocketUtils.buildResponse(responseHeaders, data, offset, length); + responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true"); + } else { + data = WebSocketUtils.buildResponse(data, offset, length); + } + webSocketHolder.write(data, 0, data.length); + } + + @Override + public void close() throws IOException { + if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) { + byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0); + webSocketHolder.write(data, 0, data.length); + responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true"); + } + super.close(); + } + }); + } + + @Override + public boolean isCommitted() { + return false; + } + + @Override + public void reset() { + } + + @Override + public void resetBuffer() { + LOG.log(Level.INFO, "resetBuffer()"); + } + + @Override + public void setBufferSize(int size) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "setBufferSize({0})", size); + } + } + + @Override + public void setCharacterEncoding(String charset) { + // TODO + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "setCharacterEncoding({0})", charset); + } + } + + @Override + public void setContentLength(int len) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "setContentLength({0})", len); + } + responseHeaders.put("Content-Length", Integer.toString(len)); + } + + @Override + public void setContentType(String type) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "setContentType({0})", type); + } + responseHeaders.put("Content-Type", type); + } + + @Override + public void setLocale(Locale loc) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "setLocale({0})", loc); + } + } + + @Override + public void addCookie(Cookie cookie) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "addCookie({0})", cookie); + } + } + + @Override + public void addDateHeader(String name, long date) { + // TODO + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "addDateHeader({0}, {1})", new Object[]{name, date}); + } + } + + @Override + public void addHeader(String name, String value) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "addHeader({0}, {1})", new Object[]{name, value}); + } + responseHeaders.put(name, value); + } + + @Override + public void addIntHeader(String name, int value) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "addIntHeader({0}, {1})", new Object[]{name, value}); + } + responseHeaders.put(name, Integer.toString(value)); + } + + @Override + public boolean containsHeader(String name) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "containsHeader({0})", name); + } + return responseHeaders.containsKey(name); + } + + @Override + public String encodeRedirectURL(String url) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "encodeRedirectURL({0})", url); + } + return null; + } + + @Override + public String encodeRedirectUrl(String url) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "encodeRedirectUrl({0})", url); + } + return null; + } + + @Override + public String encodeURL(String url) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "encodeURL({0})", url); + } + return null; + } + + @Override + public String encodeUrl(String url) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "encodeUrl({0})", url); + } + return null; + } + + @Override + public String getHeader(String name) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "getHeader({0})", name); + } + return null; + } + + @Override + public Collection getHeaderNames() { + LOG.log(Level.INFO, "getHeaderNames()"); + return null; + } + + @Override + public Collection getHeaders(String name) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "getHeaders({0})", name); + } + return null; + } + + @Override + public int getStatus() { + LOG.log(Level.INFO, "getStatus()"); + String v = responseHeaders.get(WebSocketUtils.SC_KEY); + return v == null ? 200 : Integer.parseInt(v); + } + + @Override + public void sendError(int sc) throws IOException { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "sendError{0}", sc); + } + responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc)); + } + + @Override + public void sendError(int sc, String msg) throws IOException { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "sendError({0}, {1})", new Object[]{sc, msg}); + } + responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc)); + responseHeaders.put(WebSocketUtils.SM_KEY, msg); + } + + @Override + public void sendRedirect(String location) throws IOException { + // TODO Auto-generated method stub + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "sendRedirect({0})", location); + } + } + + @Override + public void setDateHeader(String name, long date) { + // ignore + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "setDateHeader({0}, {1})", new Object[]{name, date}); + } + } + + @Override + public void setHeader(String name, String value) { + // TODO Auto-generated method stub + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "setHeader({0}, {1})", new Object[]{name, value}); + } + } + + @Override + public void setIntHeader(String name, int value) { + // TODO Auto-generated method stub + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "setIntHeader({0}, {1})", new Object[]{name, value}); + } + } + + @Override + public void setStatus(int sc) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "setStatus({0})", sc); + } + responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc)); + } + + @Override + public void setStatus(int sc, String sm) { + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "setStatus({0}, {1})", new Object[]{sc, sm}); + } + responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc)); + responseHeaders.put(WebSocketUtils.SM_KEY, sm); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java new file mode 100644 index 0000000..f2f066a --- /dev/null +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.websocket.atmosphere; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.Principal; +import java.util.Enumeration; +import java.util.List; +import java.util.Locale; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.servlet.DispatcherType; +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.transport.websocket.WebSocketDestinationService; +import org.apache.cxf.transport.websocket.WebSocketServletHolder; +import org.apache.cxf.transport.websocket.WebSocketVirtualServletRequest; +import org.apache.cxf.transport.websocket.WebSocketVirtualServletResponse; +import org.atmosphere.cpr.AtmosphereConfig; +import org.atmosphere.cpr.AtmosphereRequest; +import org.atmosphere.websocket.WebSocket; +import org.atmosphere.websocket.WebSocketProcessor.WebSocketException; +import org.atmosphere.websocket.WebSocketProtocol; + +/** + * + */ +public class AtmosphereWebSocketHandler implements WebSocketProtocol { + private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketHandler.class); + + protected AtmosphereWebSocketServletDestination destination; + + + public AtmosphereWebSocketServletDestination getDestination() { + return destination; + } + + public void setDestination(AtmosphereWebSocketServletDestination destination) { + this.destination = destination; + } + + /** {@inheritDoc}*/ + @Override + public void configure(AtmosphereConfig config) { + // TODO Auto-generated method stub + + } + + /** {@inheritDoc}*/ + @Override + public List onMessage(WebSocket webSocket, String data) { + LOG.info("onMessage(WebSocket, String)"); + return null; + } + + /** {@inheritDoc}*/ + @Override + public List onMessage(WebSocket webSocket, byte[] data, int offset, int length) { + LOG.info("onMessage(WebSocket, byte[], int, int)"); + + try { + WebSocketServletHolder webSocketHolder = new AtmosphereWebSocketServletHolder(webSocket); + HttpServletRequest request = createServletRequest(webSocketHolder, data, offset, length); + HttpServletResponse response = createServletResponse(webSocketHolder); + if (destination != null) { + ((WebSocketDestinationService)destination).invokeInternal(null, + webSocket.resource().getRequest().getServletContext(), + request, response); + } + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to invoke service", e); + } + return null; + } + + /** {@inheritDoc}*/ + @Override + public void onOpen(WebSocket webSocket) { + LOG.info("onOpen(WebSocket)"); + } + + /** {@inheritDoc}*/ + @Override + public void onClose(WebSocket webSocket) { + LOG.info("onClose(WebSocket)"); + + } + + /** {@inheritDoc}*/ + @Override + public void onError(WebSocket webSocket, WebSocketException t) { + LOG.info("onError(WebSocket, WebSocketException)"); + } + + protected WebSocketVirtualServletRequest createServletRequest(WebSocketServletHolder webSocketHolder, + byte[] data, int offset, int length) + throws IOException { + return new WebSocketVirtualServletRequest(webSocketHolder, + new ByteArrayInputStream(data, offset, length)); + } + + protected WebSocketVirtualServletRequest createServletRequest(WebSocketServletHolder webSocketHolder, + InputStream stream) + throws IOException { + return new WebSocketVirtualServletRequest(webSocketHolder, stream); + } + + protected WebSocketVirtualServletResponse createServletResponse(WebSocketServletHolder webSocketHolder) + throws IOException { + return new WebSocketVirtualServletResponse(webSocketHolder); + } + + protected static class AtmosphereWebSocketServletHolder implements WebSocketServletHolder { + private WebSocket webSocket; + + public AtmosphereWebSocketServletHolder(WebSocket webSocket) { + this.webSocket = webSocket; + } + + @Override + public String getAuthType() { + return webSocket.resource().getRequest().getAuthType(); + } + + @Override + public String getContextPath() { + return webSocket.resource().getRequest().getContextPath(); + } + + @Override + public String getLocalAddr() { + return webSocket.resource().getRequest().getLocalAddr(); + } + + @Override + public String getLocalName() { + return webSocket.resource().getRequest().getLocalName(); + } + + @Override + public int getLocalPort() { + return webSocket.resource().getRequest().getLocalPort(); + } + + @Override + public Locale getLocale() { + return webSocket.resource().getRequest().getLocale(); + } + + @Override + public Enumeration getLocales() { + return webSocket.resource().getRequest().getLocales(); + } + + @Override + public String getProtocol() { + return webSocket.resource().getRequest().getProtocol(); + } + + @Override + public String getRemoteAddr() { + return webSocket.resource().getRequest().getRemoteAddr(); + } + + @Override + public String getRemoteHost() { + return webSocket.resource().getRequest().getRemoteHost(); + } + + @Override + public int getRemotePort() { + return webSocket.resource().getRequest().getRemotePort(); + } + + @Override + public String getRequestURI() { + return webSocket.resource().getRequest().getRequestURI(); + } + + @Override + public StringBuffer getRequestURL() { + return webSocket.resource().getRequest().getRequestURL(); + } + + @Override + public DispatcherType getDispatcherType() { + return webSocket.resource().getRequest().getDispatcherType(); + } + + @Override + public boolean isSecure() { + return webSocket.resource().getRequest().isSecure(); + } + + @Override + public String getPathInfo() { + return webSocket.resource().getRequest().getServletPath(); + } + + @Override + public String getPathTranslated() { + return webSocket.resource().getRequest().getPathTranslated(); + } + + @Override + public String getScheme() { + return webSocket.resource().getRequest().getScheme(); + } + + @Override + public String getServerName() { + return webSocket.resource().getRequest().getServerName(); + } + + @Override + public String getServletPath() { + return webSocket.resource().getRequest().getServletPath(); + } + + @Override + public int getServerPort() { + return webSocket.resource().getRequest().getServerPort(); + } + + @Override + public ServletContext getServletContext() { + return webSocket.resource().getRequest().getServletContext(); + } + + @Override + public Principal getUserPrincipal() { + return webSocket.resource().getRequest().getUserPrincipal(); + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + webSocket.write(data, offset, length); + } + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java~ ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java~ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java~ new file mode 100644 index 0000000..4ce80f1 --- /dev/null +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java~ @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.websocket.atmosphere; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.Principal; +import java.util.Enumeration; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.servlet.DispatcherType; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.transport.websocket.WebSocketDestinationService; +import org.apache.cxf.transport.websocket.WebSocketServletHolder; +import org.apache.cxf.transport.websocket.WebSocketVirtualServletRequest; +import org.apache.cxf.transport.websocket.WebSocketVirtualServletResponse; +import org.atmosphere.cpr.AtmosphereConfig; +import org.atmosphere.cpr.AtmosphereRequest; +import org.atmosphere.websocket.WebSocket; +import org.atmosphere.websocket.WebSocketProcessor.WebSocketException; +import org.atmosphere.websocket.WebSocketProtocol; + +/** + * + */ +public class AtmosphereWebSocketHandler implements WebSocketProtocol { + private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketHandler.class); + + protected AtmosphereWebSocketServletDestination destination; + + + public AtmosphereWebSocketServletDestination getDestination() { + return destination; + } + + public void setDestination(AtmosphereWebSocketServletDestination destination) { + this.destination = destination; + } + + /** {@inheritDoc}*/ + @Override + public void configure(AtmosphereConfig config) { + // TODO Auto-generated method stub + + } + + /** {@inheritDoc}*/ + @Override + public List onMessage(WebSocket webSocket, String data) { + LOG.info("onMessage(WebSocket, String)"); + return null; + } + + /** {@inheritDoc}*/ + @Override + public List onMessage(WebSocket webSocket, byte[] data, int offset, int length) { + LOG.info("onMessage(WebSocket, byte[], int, int)"); + + try { + WebSocketServletHolder webSocketHolder = new AtmosphereWebSocketServletHolder(webSocket); + HttpServletRequest request = createServletRequest(webSocketHolder, data, offset, length); + HttpServletResponse response = createServletResponse(webSocketHolder); + if (destination != null) { + ((WebSocketDestinationService)destination).invokeInternal(null, webSocket.resource().getRequest().getServletContext(), + request, response); + } + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to invoke service", e); + } + return null; + } + + /** {@inheritDoc}*/ + @Override + public void onOpen(WebSocket webSocket) { + LOG.info("onOpen(WebSocket)"); + } + + /** {@inheritDoc}*/ + @Override + public void onClose(WebSocket webSocket) { + LOG.info("onClose(WebSocket)"); + + } + + /** {@inheritDoc}*/ + @Override + public void onError(WebSocket webSocket, WebSocketException t) { + // TODO Auto-generated method stub + LOG.info("onError(WebSocket, WebSocketException)"); + } + + protected WebSocketVirtualServletRequest createServletRequest(WebSocketServletHolder webSocketHolder, byte[] data, int offset, int length) + throws IOException { + return new WebSocketVirtualServletRequest(null, webSocketHolder, new ByteArrayInputStream(data, offset, length)); + } + + protected WebSocketVirtualServletRequest createServletRequest(WebSocketServletHolder webSocketHolder, InputStream stream) + throws IOException { + return new WebSocketVirtualServletRequest(null, webSocketHolder, stream); + } + + protected WebSocketVirtualServletResponse createServletResponse(WebSocketServletHolder webSocketHolder) throws IOException { + return new WebSocketVirtualServletResponse(webSocketHolder); + } + + static class AtmosphereWebSocketServletHolder implements WebSocketServletHolder { + private WebSocket webSocket; + + public AtmosphereWebSocketServletHolder(WebSocket webSocket) { + this.webSocket = webSocket; + } + + @Override + public String getAuthType() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getContextPath() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getLocalAddr() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getLocalName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getLocalPort() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Locale getLocale() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Enumeration getLocales() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getProtocol() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getRemoteAddr() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getRemoteHost() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getRemotePort() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public String getRequestURI() { + // TODO Auto-generated method stub + return null; + } + + @Override + public StringBuffer getRequestURL() { + // TODO Auto-generated method stub + return null; + } + + @Override + public DispatcherType getDispatcherType() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean isSecure() { + // TODO Auto-generated method stub + return false; + } + + @Override + public String getPathInfo() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getPathTranslated() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getScheme() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getServerName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getServletPath() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getServerPort() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Principal getUserPrincipal() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void write(byte[] data, int i, int length) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public ServletOutputStream getServletOutputStream(Map headers) { + // TODO Auto-generated method stub + return null; + } + + @Override + public OutputStream getOutputStream(Map headers) { + // TODO Auto-generated method stub + return null; + } + + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java new file mode 100644 index 0000000..ffeeb5c --- /dev/null +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.websocket.atmosphere; + +import java.io.IOException; + +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.cxf.Bus; +import org.apache.cxf.service.model.EndpointInfo; +import org.apache.cxf.transport.http.DestinationRegistry; +import org.apache.cxf.transport.servlet.ServletDestination; +import org.apache.cxf.transport.websocket.WebSocketDestinationService; +import org.atmosphere.cpr.ApplicationConfig; +import org.atmosphere.cpr.AtmosphereFramework; +import org.atmosphere.cpr.AtmosphereRequest; +import org.atmosphere.cpr.AtmosphereResponse; +import org.atmosphere.util.Utils; +import org.atmosphere.websocket.WebSocketProtocol; + +/** + * + */ +public class AtmosphereWebSocketServletDestination extends ServletDestination implements + WebSocketDestinationService { + private AtmosphereFramework framework; + + public AtmosphereWebSocketServletDestination(Bus bus, DestinationRegistry registry, EndpointInfo ei, + String path) throws IOException { + super(bus, registry, ei, ei.toString()); + this.framework = new AtmosphereFramework(false, true); + + framework.setUseNativeImplementation(false); + framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true"); + //TODO provide a way to switch between the non-stream handler and the stream handler + framework.addInitParameter(ApplicationConfig.WEBSOCKET_PROTOCOL, + AtmosphereWebSocketHandler.class.getName()); + framework.init(); + + WebSocketProtocol wsp = framework.getWebSocketProtocol(); + if (wsp instanceof AtmosphereWebSocketHandler) { + ((AtmosphereWebSocketHandler)wsp).setDestination(this); + } + } + + @Override + public void invoke(ServletConfig config, ServletContext context, HttpServletRequest req, + HttpServletResponse resp) throws IOException { + if (Utils.webSocketEnabled(req)) { + try { + framework.doCometSupport(AtmosphereRequest.wrap(req), AtmosphereResponse.wrap(resp)); + } catch (ServletException e) { + throw new IOException(e); + } + return; + } + super.invoke(config, context, req, resp); + } + + @Override + public void invokeInternal(ServletConfig config, ServletContext context, HttpServletRequest req, + HttpServletResponse resp) throws IOException { + super.invoke(config, context, req, resp); + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/2d8264af/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java new file mode 100644 index 0000000..8574858 --- /dev/null +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.websocket.atmosphere; + +import java.io.InputStream; +import java.io.Reader; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.transport.websocket.WebSocketDestinationService; +import org.apache.cxf.transport.websocket.WebSocketServletHolder; +import org.atmosphere.cpr.AtmosphereRequest; +import org.atmosphere.websocket.WebSocket; +import org.atmosphere.websocket.WebSocketProtocolStream; + +/** + * + */ +public class AtmosphereWebSocketStreamHandler extends AtmosphereWebSocketHandler implements + WebSocketProtocolStream { + private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketStreamHandler.class); + + @Override + public List onTextStream(WebSocket webSocket, Reader r) { + LOG.info("onTextStream(WebSocket, Reader)"); + //TODO add support for Reader + throw new IllegalArgumentException("not implemented"); + } + + @Override + public List onBinaryStream(WebSocket webSocket, InputStream stream) { + LOG.info("onBinaryStream(WebSocket, InputStream)"); + + try { + WebSocketServletHolder webSocketHolder = new AtmosphereWebSocketServletHolder(webSocket); + HttpServletRequest request = createServletRequest(webSocketHolder, stream); + HttpServletResponse response = createServletResponse(webSocketHolder); + if (destination != null) { + ((WebSocketDestinationService)destination).invokeInternal(null, + webSocket.resource().getRequest().getServletContext(), + request, response); + } + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to invoke service", e); + } + return null; + } + + + +}