Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7FFCF18AD1 for ; Mon, 18 Apr 2016 23:24:08 +0000 (UTC) Received: (qmail 72178 invoked by uid 500); 18 Apr 2016 23:24:08 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 71976 invoked by uid 500); 18 Apr 2016 23:24:08 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 71672 invoked by uid 99); 18 Apr 2016 23:24:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Apr 2016 23:24:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D8259DFFA9; Mon, 18 Apr 2016 23:24:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: reta@apache.org To: commits@cxf.apache.org Date: Mon, 18 Apr 2016 23:24:09 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/6] cxf git commit: CXF-5855: Introduce support for Server Sent Events. Initial implementation based on Atmosphere http://git-wip-us.apache.org/repos/asf/cxf/blob/6eddeb19/distribution/src/main/release/samples/pom.xml ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/pom.xml b/distribution/src/main/release/samples/pom.xml index d7c5587..f9c5847 100644 --- a/distribution/src/main/release/samples/pom.xml +++ b/distribution/src/main/release/samples/pom.xml @@ -113,6 +113,7 @@ jax_rs/tracing_htrace clustering/failover_jaxws_osgi clustering/failover_server + jax_rs/sse + + 4.0.0 + cxf-rt-rs-sse + bundle + Apache CXF JAX-RS Server-Side Events Support + Apache CXF JAX-RS Server-Side Events Support + http://cxf.apache.org + + org.apache.cxf + cxf-parent + 3.2.0-SNAPSHOT + ../../../parent/pom.xml + + + + javax.servlet*;version="${cxf.osgi.javax.servlet.version}", + + + + + org.apache.cxf + cxf-rt-frontend-jaxrs + ${project.version} + + + junit + junit + test + + + ${cxf.servlet-api.group} + ${cxf.servlet-api.artifact} + provided + + + org.atmosphere + atmosphere-runtime + ${cxf.atmosphere.version} + + + + + + maven-checkstyle-plugin + + true + + + + + http://git-wip-us.apache.org/repos/asf/cxf/blob/6eddeb19/rt/rs/sse/src/main/java/javax/ws/rs/sse/OutboundSseEvent.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/javax/ws/rs/sse/OutboundSseEvent.java b/rt/rs/sse/src/main/java/javax/ws/rs/sse/OutboundSseEvent.java new file mode 100644 index 0000000..1274640 --- /dev/null +++ b/rt/rs/sse/src/main/java/javax/ws/rs/sse/OutboundSseEvent.java @@ -0,0 +1,286 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright (c) 2012-2015 Oracle and/or its affiliates. All rights reserved. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common Development + * and Distribution License("CDDL") (collectively, the "License"). You + * may not use this file except in compliance with the License. You can + * obtain a copy of the License at + * http://glassfish.java.net/public/CDDL+GPL_1_1.html + * or packager/legal/LICENSE.txt. See the License for the specific + * language governing permissions and limitations under the License. + * + * When distributing the software, include this License Header Notice in each + * file and include the License file at packager/legal/LICENSE.txt. + * + * GPL Classpath Exception: + * Oracle designates this particular file as subject to the "Classpath" + * exception as provided by Oracle in the GPL Version 2 section of the License + * file that accompanied this code. + * + * Modifications: + * If applicable, add the following below the License Header, with the fields + * enclosed by brackets [] replaced by your own identifying information: + * "Portions Copyright [year] [name of copyright owner]" + * + * Contributor(s): + * If you wish your version of this file to be governed by only the CDDL or + * only the GPL Version 2, indicate your decision by adding "[Contributor] + * elects to include this software in this distribution under the [CDDL or GPL + * Version 2] license." If you don't indicate a single choice of license, a + * recipient has the option to distribute your version of this file under + * either the CDDL, the GPL Version 2 or to extend the choice of license to + * its licensees as provided above. However, if you add GPL Version 2 code + * and therefore, elected the GPL Version 2 license, then the option applies + * only if the new code is made subject to such option by the copyright + * holder. + */ +package javax.ws.rs.sse; + +import java.lang.reflect.Type; + +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; + +/** + * Representation of a single outbound SSE event. + * + * @author Marek Potociar + */ +public interface OutboundSseEvent extends SseEvent { + + /** + * A builder class used for creating {@link OutboundSseEvent} instances. + */ + interface Builder { + + /** + * Set the event id. + *

+ * Will be send as a value of the SSE {@code "id"} field. This field is optional. + *

+ * + * @param id event id. + * @return updated builder instance. + */ + Builder id(String id); + + /** + * Set event name. + *

+ * Will be send as a value of the SSE {@code "event"} field. This field is optional. + *

+ * + * @param name event name. + * @return updated builder instance. + */ + public Builder name(String name); + + /** + * Set reconnection delay (in milliseconds) that indicates how long the event receiver should wait + * before attempting to reconnect in case a connection to SSE event source is lost. + *

+ * Will be send as a value of the SSE {@code "retry"} field. This field is optional. + *

+ *

+ * Absence of a value of this field in an {@link OutboundSseEvent} instance + * is indicated by {@link SseEvent#RECONNECT_NOT_SET} value returned from + * {@link #getReconnectDelay()}. + *

+ * + * @param milliseconds reconnection delay in milliseconds. Negative values un-set the reconnection delay. + * @return updated builder instance. + */ + Builder reconnectDelay(long milliseconds); + + /** + * Set the {@link MediaType media type} of the event data. + *

+ * This information is mandatory. The default value is {@link MediaType#TEXT_PLAIN}. + *

+ * + * @param mediaType {@link MediaType} of event data. Must not be {@code null}. + * @return updated builder instance. + * @throws NullPointerException in case the {@code mediaType} parameter is {@code null}. + */ + Builder mediaType(final MediaType mediaType); + + /** + * Set comment string associated with the event. + *

+ * The comment will be serialized with the event, before event data are serialized. If the event + * does not contain any data, a separate "event" that contains only the comment will be sent. + * This information is optional, provided the event data are set. + *

+ * Note that multiple invocations of this method result in a previous comment being replaced with a new one. + * To achieve multi-line comments, a multi-line comment string has to be used. + *

+ * + * @param comment comment string. + * @return updated builder instance. + */ + Builder comment(String comment); + + /** + * Set event data and java type of event data. + *

+ * Type information will be used for {@link javax.ws.rs.ext.MessageBodyWriter} lookup. + *

+ * Note that multiple invocations of this method result in previous even data being replaced with new one. + *

+ * + * @param type java type of supplied data. Must not be {@code null}. + * @param data event data. Must not be {@code null}. + * @return updated builder instance. + * @throws NullPointerException in case either {@code type} or {@code data} parameter is {@code null}. + */ + Builder data(Class type, Object data); + + /** + * Set event data and a generic java type of event data. + *

+ * Type information will be used for {@link javax.ws.rs.ext.MessageBodyWriter} lookup. + *

+ * Note that multiple invocations of this method result in previous even data being replaced with new one. + *

+ * + * @param type generic type of supplied data. Must not be {@code null}. + * @param data event data. Must not be {@code null}. + * @return updated builder instance. + * @throws NullPointerException in case either {@code type} or {@code data} parameter is {@code null}. + */ + Builder data(GenericType type, Object data); + + /** + * Set event data and java type of event data. + *

+ * This is a convenience method that derives the event data type information from the runtime type of + * the event data. The supplied event data may be represented as {@link javax.ws.rs.core.GenericEntity}. + *

+ * Note that multiple invocations of this method result in previous even data being replaced with new one. + *

+ * + * @param data event data. Must not be {@code null}. + * @return updated builder instance. + * @throws NullPointerException in case the {@code data} parameter is {@code null}. + */ + Builder data(Object data); + + /** + * Build {@link OutboundSseEvent}. + *

+ * There are two valid configurations: + *

    + *
  • if a {@link Builder#comment(String) comment} is set, all other parameters are optional. + * If event {@link Builder#data(Class, Object) data} and {@link Builder#mediaType(MediaType) media type} is set, + * event data will be serialized after the comment.
  • + *
  • if a {@link Builder#comment(String) comment} is not set, at least the event + * {@link Builder#data(Class, Object) data} must be set. All other parameters are optional.
  • + *
+ *

+ * + * @return new {@link OutboundSseEvent} instance. + * @throws IllegalStateException when called with invalid configuration (neither a comment nor event data are set). + */ + OutboundSseEvent build(); + } + + /** + * Get event identifier. + *

+ * This field is optional. If specified, the value is send as a value of the SSE {@code "id"} field. + *

+ * + * @return event identifier, or {@code null} if not set. + */ + String getId(); + + /** + * Get event name. + *

+ * This field is optional. If specified, will be send as a value of the SSE {@code "event"} field. + *

+ * + * @return event name, or {@code null} if not set. + */ + String getName(); + + /** + * Get a comment string that accompanies the event. + *

+ * If specified, the comment value is sent with the event as one or more SSE comment lines + * (depending on line breaks in the actual data string), before any actual event data are serialized. + * If the event instance does not contain any data, a separate "event" that contains only the comment + * will be sent. Comment information is optional, provided the event data are set. + *

+ * + * @return comment associated with the event. + */ + String getComment(); + + /** + * Get connection retry time in milliseconds the event receiver should wait before attempting to + * reconnect after a connection to the SSE source is lost. + *

+ * This field is optional. If specified, the value is send as a value of the SSE {@code "retry"} field. + *

+ * + * @return reconnection delay in milliseconds or {@link SseEvent#RECONNECT_NOT_SET} if no value has been set. + */ + long getReconnectDelay(); + + /** + * Check if the connection retry time has been set in the event. + * + * @return {@code true} if reconnection delay in milliseconds has been set in the event, {@code false} otherwise. + */ + boolean isReconnectDelaySet(); + + /** + * Get data type. + *

+ * This information is used to select a proper {@link javax.ws.rs.ext.MessageBodyWriter} to be used for + * serializing the {@link #getData() event data}. + *

+ * + * @return data type. May return {@code null}, if the event does not contain any data. + */ + Class getType(); + + /** + * Get generic data type. + *

+ * This information is used to select a proper {@link javax.ws.rs.ext.MessageBodyWriter} to be used for + * serializing the {@link #getData() event data}. + *

+ * + * @return generic data type. May return {@code null}, if the event does not contain any data. + */ + Type getGenericType(); + + /** + * Get {@link MediaType media type} of the event data. + *

+ * This information is used to a select proper {@link javax.ws.rs.ext.MessageBodyWriter} to be used for + * serializing the {@link #getData() event data}. + *

+ * + * @return data {@link MediaType}. + */ + MediaType getMediaType(); + + /** + * Get event data. + *

+ * The event data, if specified, are serialized and sent as one or more SSE event {@code "data"} fields + * (depending on the line breaks in the actual serialized data content). The data are serialized + * using an available {@link javax.ws.rs.ext.MessageBodyWriter} that is selected based on the event + * {@link #getType() type}, {@link #getGenericType()} generic type} and {@link #getMediaType()} media type}. + *

+ * + * @return event data. May return {@code null}, if the event does not contain any data. + */ + Object getData(); +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6eddeb19/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseBroadcaster.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseBroadcaster.java b/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseBroadcaster.java new file mode 100644 index 0000000..f9d0646 --- /dev/null +++ b/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseBroadcaster.java @@ -0,0 +1,111 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright (c) 2012-2015 Oracle and/or its affiliates. All rights reserved. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common Development + * and Distribution License("CDDL") (collectively, the "License"). You + * may not use this file except in compliance with the License. You can + * obtain a copy of the License at + * http://glassfish.java.net/public/CDDL+GPL_1_1.html + * or packager/legal/LICENSE.txt. See the License for the specific + * language governing permissions and limitations under the License. + * + * When distributing the software, include this License Header Notice in each + * file and include the License file at packager/legal/LICENSE.txt. + * + * GPL Classpath Exception: + * Oracle designates this particular file as subject to the "Classpath" + * exception as provided by Oracle in the GPL Version 2 section of the License + * file that accompanied this code. + * + * Modifications: + * If applicable, add the following below the License Header, with the fields + * enclosed by brackets [] replaced by your own identifying information: + * "Portions Copyright [year] [name of copyright owner]" + * + * Contributor(s): + * If you wish your version of this file to be governed by only the CDDL or + * only the GPL Version 2, indicate your decision by adding "[Contributor] + * elects to include this software in this distribution under the [CDDL or GPL + * Version 2] license." If you don't indicate a single choice of license, a + * recipient has the option to distribute your version of this file under + * either the CDDL, the GPL Version 2 or to extend the choice of license to + * its licensees as provided above. However, if you add GPL Version 2 code + * and therefore, elected the GPL Version 2 license, then the option applies + * only if the new code is made subject to such option by the copyright + * holder. + */ + +package javax.ws.rs.sse; + +/** + * Server-Sent Events broadcasting facility. + *

+ * TODO: more javadoc. + * + * @author Marek Potociar + * @since 2.1 + */ +public interface SseBroadcaster extends AutoCloseable { + + /** + * Listener interface that can be implemented to listen to events fired by {@link SseBroadcaster} object. + *

+ * To listen to events, implementation of this interface needs to register with a particular {@link SseBroadcaster} instance + * using {@link SseBroadcaster#register(Listener)}. + */ + interface Listener { + + /** + * Called when exception was thrown by a given SSE event output when trying to write to it or close it. + * + * @param output output instance that threw exception. + * @param exception thrown exception. + */ + void onException(SseEventOutput output, Exception exception); + + /** + * Called when the SSE event output has been closed (either by client closing the connection or by calling + * {@link SseEventOutput#close()} on the server side. + * + * @param output output instance that has been closed. + */ + void onClose(SseEventOutput output); + } + + /** + * Register {@link SseBroadcaster.Listener} that will receive {@code SseBroadcaster} lifecycle events. + *

+ * This operation is potentially slow, especially if large number of listeners get registered in the broadcaster. + * The {@code Broadcaster} implementation is optimized to efficiently handle small amounts of + * concurrent listener registrations and removals and large amounts of registered listener notifications. + *

+ * + * @param listener listener to be registered. + * @return {@code true} if registered, {@code false} otherwise. + */ + boolean register(Listener listener); + + /** + * Register {@link SseEventOutput} to this {@code SseBroadcaster} instance. + * + * @param output {@link SseEventOutput} to register. + * @return {@code true} if the instance was successfully registered, {@code false} otherwise. + */ + boolean register(final SseEventOutput output); + + /** + * Broadcast an SSE event to all registered {@link SseEventOutput} instances. + * + * @param event SSE event to be broadcast. + */ + void broadcast(final OutboundSseEvent event); + + /** + * Close all registered {@link SseEventOutput} instances. + */ + @Override + void close(); +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6eddeb19/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseContext.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseContext.java b/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseContext.java new file mode 100644 index 0000000..2f4e1ca --- /dev/null +++ b/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseContext.java @@ -0,0 +1,74 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright (c) 2015 Oracle and/or its affiliates. All rights reserved. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common Development + * and Distribution License("CDDL") (collectively, the "License"). You + * may not use this file except in compliance with the License. You can + * obtain a copy of the License at + * http://glassfish.java.net/public/CDDL+GPL_1_1.html + * or packager/legal/LICENSE.txt. See the License for the specific + * language governing permissions and limitations under the License. + * + * When distributing the software, include this License Header Notice in each + * file and include the License file at packager/legal/LICENSE.txt. + * + * GPL Classpath Exception: + * Oracle designates this particular file as subject to the "Classpath" + * exception as provided by Oracle in the GPL Version 2 section of the License + * file that accompanied this code. + * + * Modifications: + * If applicable, add the following below the License Header, with the fields + * enclosed by brackets [] replaced by your own identifying information: + * "Portions Copyright [year] [name of copyright owner]" + * + * Contributor(s): + * If you wish your version of this file to be governed by only the CDDL or + * only the GPL Version 2, indicate your decision by adding "[Contributor] + * elects to include this software in this distribution under the [CDDL or GPL + * Version 2] license." If you don't indicate a single choice of license, a + * recipient has the option to distribute your version of this file under + * either the CDDL, the GPL Version 2 or to extend the choice of license to + * its licensees as provided above. However, if you add GPL Version 2 code + * and therefore, elected the GPL Version 2 license, then the option applies + * only if the new code is made subject to such option by the copyright + * holder. + */ +package javax.ws.rs.sse; + +/** + * Server-side injectable Server-Sent Event Context. + *

+ * TODO: more javadoc. + * + * @author Marek Potociar (marek.potociar at oracle.com) + * @since 2.1 + */ +public interface SseContext { + + /** + * Create new SSE event output stream that will represent a single client connection for a connecting client. + * + * @return new SSE event output stream. + */ + SseEventOutput newOutput(); + + /** + * Get a new SSE outbound event builder. + * + * @return SSE outbound event builder. + */ + OutboundSseEvent.Builder newEvent(); + + /** + * Get a new SSE event broadcaster. + * + * @return SSE event broadcaster. + */ + SseBroadcaster newBroadcaster(); + + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6eddeb19/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseEvent.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseEvent.java b/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseEvent.java new file mode 100644 index 0000000..d03a23a --- /dev/null +++ b/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseEvent.java @@ -0,0 +1,107 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright (c) 2015 Oracle and/or its affiliates. All rights reserved. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common Development + * and Distribution License("CDDL") (collectively, the "License"). You + * may not use this file except in compliance with the License. You can + * obtain a copy of the License at + * http://glassfish.java.net/public/CDDL+GPL_1_1.html + * or packager/legal/LICENSE.txt. See the License for the specific + * language governing permissions and limitations under the License. + * + * When distributing the software, include this License Header Notice in each + * file and include the License file at packager/legal/LICENSE.txt. + * + * GPL Classpath Exception: + * Oracle designates this particular file as subject to the "Classpath" + * exception as provided by Oracle in the GPL Version 2 section of the License + * file that accompanied this code. + * + * Modifications: + * If applicable, add the following below the License Header, with the fields + * enclosed by brackets [] replaced by your own identifying information: + * "Portions Copyright [year] [name of copyright owner]" + * + * Contributor(s): + * If you wish your version of this file to be governed by only the CDDL or + * only the GPL Version 2, indicate your decision by adding "[Contributor] + * elects to include this software in this distribution under the [CDDL or GPL + * Version 2] license." If you don't indicate a single choice of license, a + * recipient has the option to distribute your version of this file under + * either the CDDL, the GPL Version 2 or to extend the choice of license to + * its licensees as provided above. However, if you add GPL Version 2 code + * and therefore, elected the GPL Version 2 license, then the option applies + * only if the new code is made subject to such option by the copyright + * holder. + */ +package javax.ws.rs.sse; + +/** + * TODO: javadoc. + * + * @author Marek Potociar + * @since 2.1 + */ +public interface SseEvent { + + /** + * A "reconnection not set" value for the SSE reconnect delay set via SSE event {@code retry} field. + */ + long RECONNECT_NOT_SET = -1; + + /** + * Get event identifier. + *

+ * Contains value of SSE {@code "id"} field. This field is optional. Method may return {@code null}, if the event + * identifier is not specified. + *

+ * + * @return event id. + */ + String getId(); + + /** + * Get event name. + *

+ * Contains value of SSE {@code "event"} field. This field is optional. Method may return {@code null}, if the event + * name is not specified. + *

+ * + * @return event name, or {@code null} if not set. + */ + String getName(); + + /** + * Get a comment string that accompanies the event. + *

+ * Contains value of the comment associated with SSE event. This field is optional. Method may return {@code null}, + * if the event comment is not specified. + *

+ * + * @return comment associated with the event. + */ + String getComment(); + + /** + * Get new connection retry time in milliseconds the event receiver should wait before attempting to + * reconnect after a connection to the SSE event source is lost. + *

+ * Contains value of SSE {@code "retry"} field. This field is optional. Method returns {@link #RECONNECT_NOT_SET} + * if no value has been set. + *

+ * + * @return reconnection delay in milliseconds or {@link #RECONNECT_NOT_SET} if no value has been set. + */ + long getReconnectDelay(); + + /** + * Check if the connection retry time has been set in the event. + * + * @return {@code true} if new reconnection delay has been set in the event, {@code false} otherwise. + */ + boolean isReconnectDelaySet(); + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6eddeb19/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseEventOutput.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseEventOutput.java b/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseEventOutput.java new file mode 100644 index 0000000..f220dc8 --- /dev/null +++ b/rt/rs/sse/src/main/java/javax/ws/rs/sse/SseEventOutput.java @@ -0,0 +1,74 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright (c) 2012-2015 Oracle and/or its affiliates. All rights reserved. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common Development + * and Distribution License("CDDL") (collectively, the "License"). You + * may not use this file except in compliance with the License. You can + * obtain a copy of the License at + * http://glassfish.java.net/public/CDDL+GPL_1_1.html + * or packager/legal/LICENSE.txt. See the License for the specific + * language governing permissions and limitations under the License. + * + * When distributing the software, include this License Header Notice in each + * file and include the License file at packager/legal/LICENSE.txt. + * + * GPL Classpath Exception: + * Oracle designates this particular file as subject to the "Classpath" + * exception as provided by Oracle in the GPL Version 2 section of the License + * file that accompanied this code. + * + * Modifications: + * If applicable, add the following below the License Header, with the fields + * enclosed by brackets [] replaced by your own identifying information: + * "Portions Copyright [year] [name of copyright owner]" + * + * Contributor(s): + * If you wish your version of this file to be governed by only the CDDL or + * only the GPL Version 2, indicate your decision by adding "[Contributor] + * elects to include this software in this distribution under the [CDDL or GPL + * Version 2] license." If you don't indicate a single choice of license, a + * recipient has the option to distribute your version of this file under + * either the CDDL, the GPL Version 2 or to extend the choice of license to + * its licensees as provided above. However, if you add GPL Version 2 code + * and therefore, elected the GPL Version 2 license, then the option applies + * only if the new code is made subject to such option by the copyright + * holder. + */ +package javax.ws.rs.sse; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Outbound Server-Sent Events stream. + * + * When returned from resource method, the underlying client connection is kept open and the application code + * is able to send events. + * A server-side instance implementing the interface corresponds exactly to a single client HTTP connection. + * + * @author Marek Potociar (marek.potociar at oracle.com) + * @since 2.1 + */ +public interface SseEventOutput extends Closeable { + + /** + * Write a new outbound SSE event. + * + * @param event an outbound SSE event instance to be written. + * @throws IOException if this response is closed or when encountered any problem during serializing or writing a chunk. + */ + void write(final OutboundSseEvent event) throws IOException; + + /** + * Check if the stream has been closed already. + * + * Please note that the client connection represented by this {@code SseEventOutput} can be closed by the client side when + * a client decides to close connection and disconnect from the server. + * + * @return {@code true} when closed, {@code false} otherwise. + */ + boolean isClosed(); +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6eddeb19/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java new file mode 100644 index 0000000..a614a9b --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; + +import javax.ws.rs.InternalServerErrorException; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.ext.Provider; +import javax.ws.rs.ext.Providers; +import javax.ws.rs.sse.OutboundSseEvent; + +import org.apache.cxf.jaxrs.provider.ServerProviderFactory; +import org.apache.cxf.message.Exchange; +import org.apache.cxf.message.Message; +import org.apache.cxf.message.MessageImpl; + +@Provider +public class OutboundSseEventBodyWriter implements MessageBodyWriter { + public static final String SERVER_SENT_EVENTS = "text/event-stream"; + public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS); + + private static final byte[] COMMENT = ": ".getBytes(StandardCharsets.UTF_8); + private static final byte[] EVENT = " ".getBytes(StandardCharsets.UTF_8); + private static final byte[] ID = "id: ".getBytes(StandardCharsets.UTF_8); + private static final byte[] RETRY = "retry: ".getBytes(StandardCharsets.UTF_8); + private static final byte[] DATA = "data: ".getBytes(StandardCharsets.UTF_8); + private static final byte[] NEW_LINE = "\n".getBytes(StandardCharsets.UTF_8); + + @Context private Providers providers; + private ServerProviderFactory factory; + private Exchange exchange; + + public OutboundSseEventBodyWriter() { + } + + public OutboundSseEventBodyWriter(final ServerProviderFactory factory, final Exchange exchange) { + this.factory = factory; + this.exchange = exchange; + } + + + @Override + public boolean isWriteable(Class cls, Type type, Annotation[] anns, MediaType mt) { + return OutboundSseEvent.class.isAssignableFrom(cls) || SERVER_SENT_EVENTS_TYPE.isCompatible(mt); + } + + @Override + public void writeTo(OutboundSseEvent p, Class cls, Type t, Annotation[] anns, + MediaType mt, MultivaluedMap headers, OutputStream os) + throws IOException, WebApplicationException { + + if (p.getName() != null) { + os.write(EVENT); + os.write(p.getName().getBytes(StandardCharsets.UTF_8)); + os.write(NEW_LINE); + } + + if (p.getId() != null) { + os.write(ID); + os.write(p.getId().getBytes(StandardCharsets.UTF_8)); + os.write(NEW_LINE); + } + + if (p.getComment() != null) { + os.write(COMMENT); + os.write(p.getComment().getBytes(StandardCharsets.UTF_8)); + os.write(NEW_LINE); + } + + if (p.getReconnectDelay() > 0) { + os.write(RETRY); + os.write(Long.toString(p.getReconnectDelay()).getBytes(StandardCharsets.UTF_8)); + os.write(NEW_LINE); + } + + if (p.getData() != null) { + Class payloadClass = p.getType(); + Type payloadType = p.getGenericType(); + if (payloadType == null) { + payloadType = payloadClass; + } + + if (payloadType == null && payloadClass == null) { + payloadType = Object.class; + payloadClass = Object.class; + } + + os.write(DATA); + writePayloadTo(payloadClass, payloadType, anns, p.getMediaType(), headers, p.getData(), os); + os.write(NEW_LINE); + } + } + + @SuppressWarnings("unchecked") + private void writePayloadTo(Class cls, Type type, Annotation[] anns, MediaType mt, + MultivaluedMap headers, Object data, OutputStream os) + throws IOException, WebApplicationException { + + MessageBodyWriter writer = null; + + if (providers != null) { + writer = (MessageBodyWriter)providers.getMessageBodyWriter(cls, type, anns, mt); + } + + if (writer == null && factory != null) { + final Message message = new MessageImpl(); + message.setExchange(exchange); + writer = factory.createMessageBodyWriter(cls, type, anns, mt, message); + } + + if (writer == null) { + throw new InternalServerErrorException("No suitable message body writer for class: " + cls.getName()); + } + + writer.writeTo((T)data, cls, type, anns, mt, headers, os); + } + + @Override + public long getSize(OutboundSseEvent t, Class type, Type genericType, Annotation[] annotations, + MediaType mediaType) { + return -1; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6eddeb19/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java new file mode 100644 index 0000000..3f6267d --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse; + +import java.lang.reflect.Type; + +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.sse.OutboundSseEvent; + +public class OutboundSseEventImpl implements OutboundSseEvent { + private String id; + private String name; + private String comment; + private long reconnectDelay = -1; + private Class type; + private Type genericType; + private MediaType mediaType; + private Object data; + + public static class BuilderImpl implements Builder { + private String id; + private String name; + private String comment; + private long reconnectDelay = -1; + private Class type; + private Type genericType; + private MediaType mediaType; + private Object data; + + @Override + public Builder id(String id) { + this.id = id; + return this; + } + + @Override + public Builder name(String name) { + this.name = name; + return this; + } + + @Override + public Builder reconnectDelay(long milliseconds) { + this.reconnectDelay = milliseconds; + return this; + } + + @Override + public Builder mediaType(MediaType mediaType) { + this.mediaType = mediaType; + return this; + } + + @Override + public Builder comment(String comment) { + this.comment = comment; + return this; + } + + @Override + public Builder data(Class type, Object data) { + this.type = type; + this.data= data; + return this; + } + + @Override + public Builder data(GenericType type, Object data) { + this.genericType = type.getType(); + this.data= data; + return this; + } + + @Override + public Builder data(Object data) { + this.data = data; + return this; + } + + @Override + public OutboundSseEvent build() { + return new OutboundSseEventImpl( + id, + name, + comment, + reconnectDelay, + type, + genericType, + mediaType, + data + ); + } + + } + + OutboundSseEventImpl(String id, String name, String comment, long reconnectDelay, + Class type, Type genericType, MediaType mediaType, Object data) { + this.id = id; + this.name = name; + this.comment = comment; + this.reconnectDelay = reconnectDelay; + this.type = type; + this.genericType = genericType; + this.mediaType = mediaType; + this.data = data; + } + + @Override + public String getId() { + return id; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getComment() { + return comment; + } + + @Override + public long getReconnectDelay() { + return reconnectDelay; + } + + @Override + public boolean isReconnectDelaySet() { + return reconnectDelay != -1; + } + + @Override + public Class getType() { + return type; + } + + @Override + public Type getGenericType() { + return genericType; + } + + @Override + public MediaType getMediaType() { + return mediaType; + } + + @Override + public Object getData() { + return data; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6eddeb19/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java new file mode 100644 index 0000000..7f7963f --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.ext.Provider; +import javax.ws.rs.sse.SseEventOutput; + +@Provider +public class SseEventOutputProvider implements MessageBodyWriter { + @Override + public boolean isWriteable(Class cls, Type type, Annotation[] anns, MediaType mt) { + return SseEventOutput.class.isAssignableFrom(cls); + } + + @Override + public long getSize(final SseEventOutput output, final Class type, final Type genericType, + final Annotation[] annotations, final MediaType mediaType) { + return -1; + } + + @Override + public void writeTo(final SseEventOutput output, final Class type, final Type genericType, + final Annotation[] annotations, final MediaType mediaType, + final MultivaluedMap httpHeaders, final OutputStream entityStream) + throws IOException, WebApplicationException { + // do nothing. + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6eddeb19/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java new file mode 100644 index 0000000..041cb12 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.atmosphere; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.ext.Provider; +import javax.ws.rs.sse.SseContext; + +import org.apache.cxf.jaxrs.ext.ContextProvider; +import org.apache.cxf.jaxrs.provider.ServerProviderFactory; +import org.apache.cxf.message.Message; +import org.apache.cxf.transport.http.AbstractHTTPDestination; +import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.Broadcaster; + +@Provider +public class SseAtmosphereContextProvider implements ContextProvider { + @Override + public SseContext createContext(Message message) { + final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST); + if (request == null) { + throw new IllegalStateException("Unable to retrieve HTTP request from the context"); + } + + final AtmosphereResource resource = (AtmosphereResource)request + .getAttribute(AtmosphereResource.class.getName()); + if (resource == null) { + throw new IllegalStateException("AtmosphereResource is not present, " + + "is AtmosphereServlet configured properly?"); + } + + final Broadcaster broadcaster = resource.getAtmosphereConfig() + .getBroadcasterFactory() + .lookup(request.getRequestURI(), true); + resource.removeFromAllBroadcasters(); + resource.setBroadcaster(broadcaster); + + final ServerProviderFactory factory = ServerProviderFactory.getInstance(message); + return new SseAtmosphereResourceContext(factory, resource); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6eddeb19/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java new file mode 100644 index 0000000..f2b2916 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.atmosphere; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; + +import org.atmosphere.cpr.Action; +import org.atmosphere.cpr.AsyncIOInterceptorAdapter; +import org.atmosphere.cpr.AsyncIOWriter; +import org.atmosphere.cpr.AtmosphereInterceptorWriter; +import org.atmosphere.cpr.AtmosphereRequest; +import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.AtmosphereResourceEvent; +import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnPreSuspend; +import org.atmosphere.cpr.AtmosphereResponse; +import org.atmosphere.interceptor.AllowInterceptor; +import org.atmosphere.interceptor.SSEAtmosphereInterceptor; +import org.atmosphere.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter.SERVER_SENT_EVENTS; +import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM; +import static org.atmosphere.cpr.FrameworkConfig.CALLBACK_JAVASCRIPT_PROTOCOL; +import static org.atmosphere.cpr.FrameworkConfig.CONTAINER_RESPONSE; + +public class SseAtmosphereInterceptor extends SSEAtmosphereInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(SseAtmosphereInterceptor.class); + + private static final byte[] PADDING; + private static final String PADDING_TEXT; + private static final byte[] END = "\r\n\r\n".getBytes(); + + static { + StringBuffer whitespace = new StringBuffer(); + for (int i = 0; i < 2000; i++) { + whitespace.append(" "); + } + whitespace.append("\n"); + PADDING_TEXT = whitespace.toString(); + PADDING = PADDING_TEXT.getBytes(); + } + + private boolean writePadding(AtmosphereResponse response) { + if (response.request() != null && response.request().getAttribute("paddingWritten") != null) { + return false; + } + + response.setContentType(SERVER_SENT_EVENTS); + response.setCharacterEncoding("utf-8"); + boolean isUsingStream = (Boolean) response.request().getAttribute(PROPERTY_USE_STREAM); + if (isUsingStream) { + try { + OutputStream stream = response.getResponse().getOutputStream(); + try { + stream.write(PADDING); + stream.flush(); + } catch (IOException ex) { + LOGGER.warn("SSE may not work", ex); + } + } catch (IOException e) { + LOGGER.trace("", e); + } + } else { + try { + PrintWriter w = response.getResponse().getWriter(); + w.println(PADDING_TEXT); + w.flush(); + } catch (IOException e) { + LOGGER.trace("", e); + } + } + response.resource().getRequest().setAttribute("paddingWritten", "true"); + return true; + } + + @Override + public Action inspect(final AtmosphereResource r) { + if (Utils.webSocketMessage(r)) { + return Action.CONTINUE; + } + + final AtmosphereRequest request = r.getRequest(); + final String accept = request.getHeader("Accept") == null ? "text/plain" : request.getHeader("Accept").trim(); + + if (r.transport().equals(AtmosphereResource.TRANSPORT.SSE) || SERVER_SENT_EVENTS.equalsIgnoreCase(accept)) { + final AtmosphereResponse response = r.getResponse(); + if (response.getAsyncIOWriter() == null) { + response.asyncIOWriter(new SseAtmosphereInterceptorWriter()); + } + + r.addEventListener(new P(response)); + + AsyncIOWriter writer = response.getAsyncIOWriter(); + if (AtmosphereInterceptorWriter.class.isAssignableFrom(writer.getClass())) { + AtmosphereInterceptorWriter.class.cast(writer).interceptor(new AsyncIOInterceptorAdapter() { + private boolean padding() { + if (!r.isSuspended()) { + return writePadding(response); + } + return false; + } + + @Override + public void prePayload(AtmosphereResponse response, byte[] data, int offset, int length) { + boolean noPadding = padding(); + // The CALLBACK_JAVASCRIPT_PROTOCOL may be called by a framework running on top of Atmosphere + // In that case, we must pad/protocol indenendently of the state of the AtmosphereResource + if (!noPadding || r.getRequest().getAttribute(CALLBACK_JAVASCRIPT_PROTOCOL) != null) { + //response.write("", true); + } + } + + @Override + public void postPayload(AtmosphereResponse response, byte[] data, int offset, int length) { + // The CALLBACK_JAVASCRIPT_PROTOCOL may be called by a framework running on top of Atmosphere + // In that case, we must pad/protocol indenendently of the state of the AtmosphereResource + if (r.isSuspended() || r.getRequest().getAttribute(CALLBACK_JAVASCRIPT_PROTOCOL) != null + || r.getRequest().getAttribute(CONTAINER_RESPONSE) != null) { + response.write(END, true); + } + + /** + * When used with https://github.com/remy/polyfills/blob/master/EventSource.js , we + * resume after every message. + */ + String ua = r.getRequest().getHeader("User-Agent"); + if (ua != null && ua.contains("MSIE")) { + try { + response.flushBuffer(); + } catch (IOException e) { + LOGGER.trace("", e); + } + r.resume(); + } + } + }); + } else { + LOGGER.warn("Unable to apply {}. Your AsyncIOWriter must implement {}", + getClass().getName(), AtmosphereInterceptorWriter.class.getName()); + } + } + + return Action.CONTINUE; + } + + private final class P extends OnPreSuspend implements AllowInterceptor { + + private final AtmosphereResponse response; + + private P(AtmosphereResponse response) { + this.response = response; + } + + @Override + public void onPreSuspend(AtmosphereResourceEvent event) { + writePadding(response); + } + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6eddeb19/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java new file mode 100644 index 0000000..24ebfd9 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.atmosphere; + +import java.io.IOException; + +import org.atmosphere.cpr.AtmosphereInterceptorWriter; +import org.atmosphere.cpr.AtmosphereResponse; + +public class SseAtmosphereInterceptorWriter extends AtmosphereInterceptorWriter { + @Override + public void close(AtmosphereResponse response) throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6eddeb19/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java new file mode 100644 index 0000000..e1774b3 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.atmosphere; + +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.sse.OutboundSseEvent; +import javax.ws.rs.sse.OutboundSseEvent.Builder; +import javax.ws.rs.sse.SseBroadcaster; +import javax.ws.rs.sse.SseContext; +import javax.ws.rs.sse.SseEventOutput; + +import org.apache.cxf.jaxrs.provider.ServerProviderFactory; +import org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter; +import org.apache.cxf.jaxrs.sse.OutboundSseEventImpl; +import org.apache.cxf.jaxrs.utils.JAXRSUtils; +import org.atmosphere.cpr.AtmosphereResource; + +public class SseAtmosphereResourceContext implements SseContext { + private final AtmosphereResource resource; + private final ServerProviderFactory factory; + + SseAtmosphereResourceContext(final ServerProviderFactory factory, final AtmosphereResource resource) { + this.factory = factory; + this.resource = resource; + } + + @Override + public SseEventOutput newOutput() { + final MessageBodyWriter writer = new OutboundSseEventBodyWriter(factory, + JAXRSUtils.getCurrentMessage().getExchange()); + return new SseAtmostphereEventOutputImpl(writer, resource); + } + + @Override + public Builder newEvent() { + return new OutboundSseEventImpl.BuilderImpl(); + } + + @Override + public SseBroadcaster newBroadcaster() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6eddeb19/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmostphereEventOutputImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmostphereEventOutputImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmostphereEventOutputImpl.java new file mode 100644 index 0000000..cdbfdd5 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmostphereEventOutputImpl.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.atmosphere; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.nio.charset.StandardCharsets; + +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.sse.OutboundSseEvent; +import javax.ws.rs.sse.SseEventOutput; + +import org.atmosphere.cpr.AtmosphereResource; + +public class SseAtmostphereEventOutputImpl implements SseEventOutput { + private final AtmosphereResource resource; + private final MessageBodyWriter writer; + private volatile boolean closed = false; + + public SseAtmostphereEventOutputImpl(final MessageBodyWriter writer, + final AtmosphereResource resource) { + this.writer = writer; + this.resource = resource; + + if (!resource.isSuspended()) { + resource.suspend(); + } + } + + @Override + public void close() throws IOException { + if (!closed) { + closed = true; + + if (resource.isSuspended()) { + resource.resume(); + } + + resource.removeFromAllBroadcasters(); + if (!resource.getResponse().isCommitted()) { + resource.getResponse().flushBuffer(); + } + resource.close(); + } + } + + @Override + public void write(OutboundSseEvent event) throws IOException { + if (!closed && writer != null) { + try (final ByteArrayOutputStream os = new ByteArrayOutputStream()) { + writer.writeTo(event, event.getClass(), null, new Annotation [] {}, event.getMediaType(), null, os); + resource.getBroadcaster().broadcast(os.toString(StandardCharsets.UTF_8.name())); + } + } + } + + @Override + public boolean isClosed() { + return closed; + } +}