cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [13/22] cxf git commit: CXF-5855: Introduce support for Server Sent Events. Initial implementation based on Atmosphere
Date Fri, 16 Sep 2016 01:37:04 GMT
http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/distribution/src/main/release/samples/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/pom.xml b/distribution/src/main/release/samples/pom.xml
index ca874f3..156413c 100644
--- a/distribution/src/main/release/samples/pom.xml
+++ b/distribution/src/main/release/samples/pom.xml
@@ -115,6 +115,9 @@
         <module>jax_rs/tracing_htrace</module>
         <module>clustering/failover_jaxws_osgi</module>
         <module>clustering/failover_server</module>
+        <module>jax_rs/sse_cdi</module>
+        <module>jax_rs/sse_tomcat</module>
+        <module>jax_rs/sse_spring</module>
         
         <!--
          These are removed from the build as they currently don't inherit the parent from 

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/integration/cdi/src/main/java/org/apache/cxf/cdi/JAXRSCdiResourceExtension.java
----------------------------------------------------------------------
diff --git a/integration/cdi/src/main/java/org/apache/cxf/cdi/JAXRSCdiResourceExtension.java b/integration/cdi/src/main/java/org/apache/cxf/cdi/JAXRSCdiResourceExtension.java
index e674f45..fa81160 100644
--- a/integration/cdi/src/main/java/org/apache/cxf/cdi/JAXRSCdiResourceExtension.java
+++ b/integration/cdi/src/main/java/org/apache/cxf/cdi/JAXRSCdiResourceExtension.java
@@ -45,6 +45,7 @@ import javax.ws.rs.ext.Provider;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.bus.extension.ExtensionManagerBus;
+import org.apache.cxf.cdi.extension.JAXRSServerFactoryCustomizationExtension;
 import org.apache.cxf.feature.Feature;
 import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
@@ -99,11 +100,13 @@ public class JAXRSCdiResourceExtension implements Extension {
                     loadServices(beanManager, Collections.<Class<?>>emptySet()),
                     loadProviders(beanManager, Collections.<Class<?>>emptySet()),
                     loadFeatures(beanManager, Collections.<Class<?>>emptySet()));
+                customize(beanManager, factory);
                 factory.init();
             } else {
                 // If there is an application with any singletons or classes defined, we will
                 // create a server factory bean with only application singletons and classes.
                 final JAXRSServerFactoryBean factory = createFactoryInstance(instance, beanManager);
+                customize(beanManager, factory);
                 factory.init();
             }
         }
@@ -273,4 +276,24 @@ public class JAXRSCdiResourceExtension implements Extension {
         
         return services;
     }
+    
+    /**
+     * Look and apply the available JAXRSServerFactoryBean extensions to customize its
+     * creation (f.e. add features, providers, assign transport, ...)
+     * @param beanManager bean manager
+     * @param bean JAX-RS server factory bean about to be created
+     */
+    private void customize(final BeanManager beanManager, final JAXRSServerFactoryBean bean) {
+        final Collection<Bean<?>> extensionBeans = beanManager.getBeans(JAXRSServerFactoryCustomizationExtension.class);
+        
+        for (final Bean<?> extensionBean: extensionBeans) {
+            final JAXRSServerFactoryCustomizationExtension extension =
+                (JAXRSServerFactoryCustomizationExtension)beanManager.getReference(
+                    extensionBean, 
+                    extensionBean.getBeanClass(), 
+                    beanManager.createCreationalContext(extensionBean) 
+                );
+            extension.customize(bean);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/integration/cdi/src/main/java/org/apache/cxf/cdi/extension/JAXRSServerFactoryCustomizationExtension.java
----------------------------------------------------------------------
diff --git a/integration/cdi/src/main/java/org/apache/cxf/cdi/extension/JAXRSServerFactoryCustomizationExtension.java b/integration/cdi/src/main/java/org/apache/cxf/cdi/extension/JAXRSServerFactoryCustomizationExtension.java
new file mode 100644
index 0000000..3e311b9
--- /dev/null
+++ b/integration/cdi/src/main/java/org/apache/cxf/cdi/extension/JAXRSServerFactoryCustomizationExtension.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.cdi.extension;
+
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+
+/**
+ * Serves as an extension point in order to allow to customize JAX-RS server 
+ * factory bean creation (f.e. add features, providers, assign transport, ...) 
+ * during the CDI beans discovery and initialization.
+ */
+public interface JAXRSServerFactoryCustomizationExtension {
+    /**
+     * Customize JAX-RS server factory bean before it is being initialized
+     * @param bean JAX-RS server factory bean
+     */
+    void customize(final JAXRSServerFactoryBean bean);
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index afd3a59..c19e9f5 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -110,7 +110,7 @@
         <cxf.geronimo.transaction.version>1.1.1</cxf.geronimo.transaction.version>
         <cxf.jasypt.bundle.version>1.9.0_1</cxf.jasypt.bundle.version>
         <cxf.javassist.version>3.19.0-GA</cxf.javassist.version>
-        <cxf.javax.ws.rs.version>2.0.1</cxf.javax.ws.rs.version>
+        <cxf.javax.ws.rs.version>2.1-m01</cxf.javax.ws.rs.version>
         <cxf.jaxb.version>2.2.11</cxf.jaxb.version>
         <cxf.jaxb.impl.version>${cxf.jaxb.version}</cxf.jaxb.impl.version>
         <cxf.jaxb.core.version>${cxf.jaxb.version}</cxf.jaxb.core.version>
@@ -2185,4 +2185,19 @@
             </build>
         </profile>
     </profiles>
+
+    <!-- Temporarily only till JAX-RS 2.1 artifacts become available -->
+    <repositories>
+        <repository>
+            <id>maven.java.net</id>
+            <name>java.net snapshots</name>
+            <url>https://maven.java.net/content/repositories/snapshots/</url>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
 </project>

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSServerFactoryBean.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSServerFactoryBean.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSServerFactoryBean.java
index d4e53ac..0f481a3 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSServerFactoryBean.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSServerFactoryBean.java
@@ -193,6 +193,7 @@ public class JAXRSServerFactoryBean extends AbstractJAXRSFactoryBean {
             checkPrivateEndpoint(ep);
             
             factory.applyDynamicFeatures(getServiceFactory().getClassResourceInfo());
+            applyBusFeatures(getBus());
             applyFeatures();
 
             getServiceFactory().sendEvent(FactoryBeanListener.Event.SERVER_CREATED,
@@ -246,6 +247,14 @@ public class JAXRSServerFactoryBean extends AbstractJAXRSFactoryBean {
         
     }
     
+    protected void applyBusFeatures(final Bus bus) {
+        if (bus.getFeatures() != null) {
+            for (Feature feature : bus.getFeatures()) {
+                feature.initialize(server, bus);
+            }
+        }
+    }
+    
     protected void applyFeatures() {
         if (getFeatures() != null) {
             for (Feature feature : getFeatures()) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
index 0914854..e57f3e0 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
@@ -32,6 +32,9 @@ import javax.ws.rs.HttpMethod;
 import javax.ws.rs.core.EntityTag;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.NioCompletionHandler;
+import javax.ws.rs.core.NioErrorHandler;
+import javax.ws.rs.core.NioReaderHandler;
 import javax.ws.rs.core.Request;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.ResponseBuilder;
@@ -382,4 +385,24 @@ public class RequestImpl implements Request {
             return 0;
         }
     }
+
+    @Override
+    public void entity(NioReaderHandler arg0) {
+        // TODO: Implementation required (JAX-RS 2.1)
+    }
+
+    @Override
+    public void entity(NioReaderHandler arg0, NioCompletionHandler arg1) {
+        // TODO: Implementation required (JAX-RS 2.1)
+    }
+
+    @Override
+    public void entity(NioReaderHandler arg0, NioErrorHandler arg1) {
+        // TODO: Implementation required (JAX-RS 2.1)
+    }
+
+    @Override
+    public void entity(NioReaderHandler arg0, NioCompletionHandler arg1, NioErrorHandler arg2) {
+        // TODO: Implementation required (JAX-RS 2.1)
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
index cee5d36..29c5c42 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
@@ -34,6 +34,8 @@ import javax.ws.rs.core.Link;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.NewCookie;
+import javax.ws.rs.core.NioErrorHandler;
+import javax.ws.rs.core.NioWriterHandler;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.UriInfo;
@@ -314,4 +316,16 @@ public class ResponseBuilderImpl extends ResponseBuilder implements Cloneable {
         }
         return variants(Arrays.asList(variants));
     }
+
+    @Override
+    public ResponseBuilder entity(NioWriterHandler arg0) {
+        // TODO: Not Implemented
+        return this;
+    }
+
+    @Override
+    public ResponseBuilder entity(NioWriterHandler arg0, NioErrorHandler arg1) {
+        // TODO: Not Implemented
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
index 8bcdbb6..44a26f9 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
@@ -23,6 +23,9 @@ import java.util.Date;
 import java.util.List;
 
 import javax.ws.rs.core.EntityTag;
+import javax.ws.rs.core.NioCompletionHandler;
+import javax.ws.rs.core.NioErrorHandler;
+import javax.ws.rs.core.NioReaderHandler;
 import javax.ws.rs.core.Request;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.Variant;
@@ -54,4 +57,24 @@ public class ThreadLocalRequest extends AbstractThreadLocalProxy<Request>
         return get().evaluatePreconditions();
     }
 
+    @Override
+    public void entity(NioReaderHandler reader) {
+        get().entity(reader);
+    }
+
+    @Override
+    public void entity(NioReaderHandler reader, NioCompletionHandler completion) {
+        get().entity(reader, completion);
+    }
+
+    @Override
+    public void entity(NioReaderHandler reader, NioErrorHandler error) {
+        get().entity(reader, error);
+    }
+
+    @Override
+    public void entity(NioReaderHandler reader, NioCompletionHandler completion, NioErrorHandler error) {
+        get().entity(reader, completion, error);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
index c5d4318..10f5d9b 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
@@ -22,14 +22,18 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.client.AsyncInvoker;
+import javax.ws.rs.client.CompletionStageRxInvoker;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.Invocation;
 import javax.ws.rs.client.Invocation.Builder;
 import javax.ws.rs.client.InvocationCallback;
+import javax.ws.rs.client.NioInvoker;
+import javax.ws.rs.client.RxInvoker;
 import javax.ws.rs.client.SyncInvoker;
 import javax.ws.rs.core.CacheControl;
 import javax.ws.rs.core.Cookie;
@@ -369,6 +373,35 @@ public class InvocationBuilderImpl implements Invocation.Builder {
         public <T> Future<T> submit(InvocationCallback<T> callback) {
             return invBuilder.async().method(httpMethod, entity, callback);
         }
+    }
+
+    @Override
+    public CompletionStageRxInvoker rx() {
+        // TODO: Implementation required (JAX-RS 2.1) 
+        return null;
+    }
+
+    @Override
+    public CompletionStageRxInvoker rx(ExecutorService executorService) {
+        // TODO: Implementation required (JAX-RS 2.1)
+        return null;
+    }
 
+    @Override
+    public <T extends RxInvoker> T rx(Class<T> clazz) {
+        // TODO: Implementation required (JAX-RS 2.1)
+        return null;
+    }
+
+    @Override
+    public <T extends RxInvoker> T rx(Class<T> clazz, ExecutorService executorService) {
+        // TODO: Implementation required (JAX-RS 2.1)
+        return null;
+    }
+
+    @Override
+    public NioInvoker nio() {
+        // TODO: Implementation required (JAX-RS 2.1)
+        return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/pom.xml b/rt/rs/pom.xml
index 0348e4b..cae173e 100644
--- a/rt/rs/pom.xml
+++ b/rt/rs/pom.xml
@@ -39,5 +39,6 @@
         <module>extensions/search</module>
         <module>extensions/rx</module>
         <module>security</module>
+        <module>sse</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/sse/pom.xml b/rt/rs/sse/pom.xml
new file mode 100644
index 0000000..d4c9f17
--- /dev/null
+++ b/rt/rs/sse/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>cxf-rt-rs-sse</artifactId>
+    <packaging>bundle</packaging>
+    <name>Apache CXF JAX-RS Server-Side Events Support</name>
+    <description>Apache CXF JAX-RS Server-Side Events Support</description>
+    <url>http://cxf.apache.org</url>
+    <parent>
+        <groupId>org.apache.cxf</groupId>
+        <artifactId>cxf-parent</artifactId>
+        <version>3.2.0-SNAPSHOT</version>
+        <relativePath>../../../parent/pom.xml</relativePath>
+    </parent>
+    <properties>
+        <cxf.osgi.import>
+            javax.servlet*;version="${cxf.osgi.javax.servlet.version}",
+        </cxf.osgi.import>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-integration-cdi</artifactId>
+            <version>${project.version}</version>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${cxf.servlet-api.group}</groupId>
+            <artifactId>${cxf.servlet-api.artifact}</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.atmosphere</groupId>
+            <artifactId>atmosphere-runtime</artifactId>
+            <version>${cxf.atmosphere.version}</version>
+        </dependency>
+    </dependencies>
+    <build>
+    <plugins>
+        <plugin>
+            <artifactId>maven-checkstyle-plugin</artifactId>
+            <configuration>
+                <skip>true</skip>
+            </configuration>
+        </plugin>
+    </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
new file mode 100644
index 0000000..4a9b3aa
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
+import javax.ws.rs.sse.OutboundSseEvent;
+
+import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+
+@Provider
+public class OutboundSseEventBodyWriter implements MessageBodyWriter<OutboundSseEvent> {
+    public static final String SERVER_SENT_EVENTS = "text/event-stream";
+    public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS);
+
+    private static final byte[] COMMENT = ": ".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] EVENT = "    ".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] ID = "id: ".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] RETRY = "retry: ".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] DATA = "data: ".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] NEW_LINE = "\n".getBytes(StandardCharsets.UTF_8);
+
+    private ServerProviderFactory factory;
+    private Message message;
+    
+    protected OutboundSseEventBodyWriter() {
+    }
+
+    public OutboundSseEventBodyWriter(final ServerProviderFactory factory, final Exchange exchange) {
+        this.factory = factory;
+        this.message = new MessageImpl();
+        this.message.setExchange(exchange);
+    }
+
+    
+    @Override
+    public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, MediaType mt) {
+        return OutboundSseEvent.class.isAssignableFrom(cls) || SERVER_SENT_EVENTS_TYPE.isCompatible(mt);
+    }
+    
+    @Override
+    public void writeTo(OutboundSseEvent p, Class<?> cls, Type t, Annotation[] anns,
+            MediaType mt, MultivaluedMap<String, Object> headers, OutputStream os) 
+                throws IOException, WebApplicationException {
+        
+        if (p.getName() != null) {
+            os.write(EVENT);
+            os.write(p.getName().getBytes(StandardCharsets.UTF_8));
+            os.write(NEW_LINE);
+        }
+        
+        if (p.getId() != null) {
+            os.write(ID);
+            os.write(p.getId().getBytes(StandardCharsets.UTF_8));
+            os.write(NEW_LINE);
+        }
+        
+        if (p.getComment() != null) {
+            os.write(COMMENT);
+            os.write(p.getComment().getBytes(StandardCharsets.UTF_8));
+            os.write(NEW_LINE);
+        }
+        
+        if (p.getReconnectDelay() > 0) {
+            os.write(RETRY);
+            os.write(Long.toString(p.getReconnectDelay()).getBytes(StandardCharsets.UTF_8));
+            os.write(NEW_LINE);
+        }
+
+        if (p.getData() != null) {
+            Class<?> payloadClass = p.getType();
+            Type payloadType = p.getGenericType();
+            if (payloadType == null) {
+                payloadType = payloadClass;
+            }
+            
+            if (payloadType == null && payloadClass == null) {
+                payloadType = Object.class;
+                payloadClass = Object.class;
+            }
+            
+            os.write(DATA);
+            writePayloadTo(payloadClass, payloadType, anns, p.getMediaType(), headers, p.getData(), os);
+            os.write(NEW_LINE);
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    private<T> void writePayloadTo(Class<T> cls, Type type, Annotation[] anns, MediaType mt, 
+            MultivaluedMap<String, Object> headers, Object data, OutputStream os) 
+                throws IOException, WebApplicationException {
+        
+        MessageBodyWriter<T> writer = null;
+        if (message != null && factory != null) {
+            writer = factory.createMessageBodyWriter(cls, type, anns, mt, message);
+        }
+        
+        if (writer == null) {
+            throw new InternalServerErrorException("No suitable message body writer for class: " + cls.getName());
+        }
+        
+        writer.writeTo((T)data, cls, type, anns, mt, headers, os);
+    }
+    
+    @Override
+    public long getSize(OutboundSseEvent t, Class<?> type, Type genericType, Annotation[] annotations, 
+            MediaType mediaType) {
+        return -1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java
new file mode 100644
index 0000000..f852637
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import java.lang.reflect.Type;
+
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.sse.OutboundSseEvent;
+
+public class OutboundSseEventImpl implements OutboundSseEvent {
+    private String id;
+    private String name;
+    private String comment;
+    private long reconnectDelay = -1;
+    private Class<?> type;
+    private Type genericType;
+    private MediaType mediaType;
+    private Object data;
+
+    public static class BuilderImpl implements Builder {
+        private String id;
+        private String name;
+        private String comment;
+        private long reconnectDelay = -1;
+        private Class<?> type;
+        private Type genericType;
+        private MediaType mediaType;
+        private Object data;
+
+        @Override
+        public Builder id(String id) {
+            this.id = id;
+            return this;
+        }
+
+        @Override
+        public Builder name(String name) {
+            this.name = name;
+            return this;
+        }
+
+        @Override
+        public Builder reconnectDelay(long milliseconds) {
+            this.reconnectDelay = milliseconds;
+            return this;
+        }
+
+        @Override
+        public Builder mediaType(MediaType mediaType) {
+            this.mediaType = mediaType;
+            return this;
+        }
+
+        @Override
+        public Builder comment(String comment) {
+            this.comment = comment;
+            return this;
+        }
+
+        @Override
+        @SuppressWarnings("rawtypes")
+        public Builder data(Class type, Object data) {
+            this.type = type;
+            this.data= data;
+            return this;
+        }
+
+        @Override
+        @SuppressWarnings("rawtypes")
+        public Builder data(GenericType type, Object data) {
+            this.genericType = type.getType();
+            this.data= data;
+            return this;
+        }
+
+        @Override
+        public Builder data(Object data) {
+            this.data = data;
+            return this;
+        }
+
+        @Override
+        public OutboundSseEvent build() {
+            return new OutboundSseEventImpl(
+                id,
+                name,
+                comment,
+                reconnectDelay,
+                type,
+                genericType,
+                mediaType,
+                data
+            );
+        }
+        
+    }
+    
+    OutboundSseEventImpl(String id, String name, String comment, long reconnectDelay, 
+            Class<?> type, Type genericType, MediaType mediaType, Object data) {
+        this.id = id;
+        this.name = name;
+        this.comment = comment;
+        this.reconnectDelay = reconnectDelay;
+        this.type = type;
+        this.genericType = genericType;
+        this.mediaType = mediaType;
+        this.data = data;
+    }
+    
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public String getComment() {
+        return comment;
+    }
+
+    @Override
+    public long getReconnectDelay() {
+        return reconnectDelay;
+    }
+
+    @Override
+    public boolean isReconnectDelaySet() {
+        return reconnectDelay != -1;
+    }
+
+    @Override
+    public Class<?> getType() {
+        return type;
+    }
+
+    @Override
+    public Type getGenericType() {
+        return genericType;
+    }
+
+    @Override
+    public MediaType getMediaType() {
+        return mediaType;
+    }
+
+    @Override
+    public Object getData() {
+        return data;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
new file mode 100644
index 0000000..977a6b2
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.SseEventOutput;
+
+public class SseBroadcasterImpl implements SseBroadcaster {
+    private final Set<SseEventOutput> outputs = new CopyOnWriteArraySet<>();
+    private final Set<Listener> listeners = new CopyOnWriteArraySet<>();
+            
+    @Override
+    public boolean register(Listener listener) {
+        return listeners.add(listener);
+    }
+
+    @Override
+    public boolean register(SseEventOutput output) {
+        return outputs.add(output);
+    }
+
+    @Override
+    public void broadcast(OutboundSseEvent event) {
+        for (final SseEventOutput output: outputs) {
+            try {
+                output.write(event);
+            } catch (final IOException ex) {
+                listeners.forEach(listener -> listener.onException(output, ex));
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        for (final SseEventOutput output: outputs) {
+            try {
+                output.close();
+                listeners.forEach(listener -> listener.onClose(output));
+            } catch (final IOException ex) {
+                listeners.forEach(listener -> listener.onException(output, ex));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java
new file mode 100644
index 0000000..7f7963f
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
+import javax.ws.rs.sse.SseEventOutput;
+
+@Provider
+public class SseEventOutputProvider implements MessageBodyWriter<SseEventOutput> {
+    @Override
+    public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, MediaType mt) {
+        return SseEventOutput.class.isAssignableFrom(cls);
+    }
+    
+    @Override
+    public long getSize(final SseEventOutput output, final Class<?> type, final Type genericType,
+                        final Annotation[] annotations, final MediaType mediaType) {
+        return -1;
+    }
+
+    @Override
+    public void writeTo(final SseEventOutput output, final Class<?> type, final Type genericType,
+                        final Annotation[] annotations, final MediaType mediaType,
+                        final MultivaluedMap<String, Object> httpHeaders, final OutputStream entityStream)
+            throws IOException, WebApplicationException {
+        // do nothing.
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
new file mode 100644
index 0000000..da682a0
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.feature.AbstractFeature;
+import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereContextProvider;
+
+public class SseFeature extends AbstractFeature {
+    @Override
+    public void initialize(Server server, Bus bus) {
+        final List<Object> providers = new ArrayList<>();
+
+        providers.add(new SseAtmosphereContextProvider());
+        providers.add(new SseEventOutputProvider());
+
+        ((ServerProviderFactory) server.getEndpoint().get(
+            ServerProviderFactory.class.getName())).setUserProviders(providers);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java
new file mode 100644
index 0000000..de2c3a9
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.atmosphere;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.ext.Provider;
+import javax.ws.rs.sse.SseContext;
+
+import org.apache.cxf.jaxrs.ext.ContextProvider;
+import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.http.AbstractHTTPDestination;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.Broadcaster;
+
+@Provider
+public class SseAtmosphereContextProvider implements ContextProvider<SseContext> {
+    @Override
+    public SseContext createContext(Message message) {
+        final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
+        if (request == null) {
+            throw new IllegalStateException("Unable to retrieve HTTP request from the context");
+        }
+        
+        final AtmosphereResource resource = (AtmosphereResource)request
+            .getAttribute(AtmosphereResource.class.getName());
+        if (resource == null) {
+            throw new IllegalStateException("AtmosphereResource is not present, "
+                    + "is AtmosphereServlet configured properly?");
+        }
+        
+        final Broadcaster broadcaster = resource.getAtmosphereConfig()
+            .getBroadcasterFactory()
+            .lookup(resource.uuid(), true);
+        
+        resource.removeFromAllBroadcasters();
+        resource.setBroadcaster(broadcaster);
+        
+        return new SseAtmosphereResourceContext(ServerProviderFactory.getInstance(message), resource);
+    }
+} 

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java
new file mode 100644
index 0000000..cbf1a26
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.atmosphere;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseEventOutput;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.Broadcaster;
+
+public class SseAtmosphereEventOutputImpl implements SseEventOutput {
+    private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereEventOutputImpl.class);
+    
+    private final AtmosphereResource resource;
+    private final MessageBodyWriter<OutboundSseEvent> writer;
+    private volatile boolean closed = false;
+    
+    public SseAtmosphereEventOutputImpl(final MessageBodyWriter<OutboundSseEvent> writer, 
+            final AtmosphereResource resource) {
+        this.writer = writer;
+        this.resource = resource;
+        
+        if (!resource.isSuspended()) {
+            LOG.fine("Atmosphere resource is not suspended, suspending");
+            resource.suspend();
+        }
+    }
+    
+    @Override
+    public void close() throws IOException {
+        if (!closed) {
+            closed = true;
+
+            LOG.fine("Closing Atmosphere SSE event output");
+            if (resource.isSuspended()) {
+                LOG.fine("Atmosphere resource is suspended, resuming");
+                resource.resume();
+            }
+
+            final Broadcaster broadcaster = resource.getBroadcaster();
+            resource.removeFromAllBroadcasters();
+            
+            try {
+                if (!resource.getResponse().isCommitted()) {
+                    LOG.fine("Response is not committed, flushing buffer");
+                    resource.getResponse().flushBuffer();
+                }
+            } finally {
+                resource.close();
+                broadcaster.destroy();
+                LOG.fine("Atmosphere SSE event output is closed");
+            }
+        }
+    }
+
+    @Override
+    public void write(OutboundSseEvent event) throws IOException {
+        if (!closed && writer != null) {
+            try (final ByteArrayOutputStream os = new ByteArrayOutputStream()) {
+                writer.writeTo(event, event.getClass(), null, new Annotation [] {}, event.getMediaType(), null, os);
+                
+                // Atmosphere broadcasts asynchronously which is acceptable in most cases.
+                // Unfortunately, calling close() may lead to response stream being closed
+                // while there are still some SSE delivery scheduled.
+                final Future<Object> future = resource
+                    .getBroadcaster()
+                    .broadcast(os.toString(StandardCharsets.UTF_8.name()));
+                
+                try {
+                    if (!future.isDone()) {
+                        // Let us wait at least 200 milliseconds before returning to ensure 
+                        // that SSE had the opportunity to be delivered.
+                        LOG.info("Waiting 200ms to ensure SSE Atmosphere response is delivered");
+                        future.get(200, TimeUnit.MILLISECONDS);
+                    }
+                } catch (final ExecutionException | InterruptedException ex) {
+                    throw new IOException(ex);
+                } catch (final TimeoutException ex) {
+                    LOG.warning("SSE Atmosphere response was not delivered within default timeout");
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java
new file mode 100644
index 0000000..d8bbabc
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.atmosphere;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.atmosphere.cpr.Action;
+import org.atmosphere.cpr.AsyncIOInterceptorAdapter;
+import org.atmosphere.cpr.AsyncIOWriter;
+import org.atmosphere.cpr.AtmosphereInterceptorWriter;
+import org.atmosphere.cpr.AtmosphereRequest;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResourceEvent;
+import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnPreSuspend;
+import org.atmosphere.cpr.AtmosphereResponse;
+import org.atmosphere.interceptor.AllowInterceptor;
+import org.atmosphere.interceptor.SSEAtmosphereInterceptor;
+import org.atmosphere.util.Utils;
+
+import static org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter.SERVER_SENT_EVENTS;
+import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM;
+import static org.atmosphere.cpr.FrameworkConfig.CALLBACK_JAVASCRIPT_PROTOCOL;
+import static org.atmosphere.cpr.FrameworkConfig.CONTAINER_RESPONSE;
+
+/**
+ * Most of this class implementation is borrowed from SSEAtmosphereInterceptor. The original
+ * implementation does two things which do not fit well into SSE support:
+ *  - closes the response stream (overridden by SseAtmosphereInterceptorWriter)
+ *  - wraps the whatever object is being written to SSE payload (overridden using 
+ *    the complete SSE protocol) 
+ */
+public class SseAtmosphereInterceptor extends SSEAtmosphereInterceptor {
+    private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereInterceptor.class);
+
+    private static final byte[] PADDING;
+    private static final String PADDING_TEXT;
+    private static final byte[] END = "\r\n\r\n".getBytes();
+    
+    static {
+        StringBuffer whitespace = new StringBuffer();
+        for (int i = 0; i < 2000; i++) {
+            whitespace.append(" ");
+        }
+        whitespace.append("\n");
+        PADDING_TEXT = whitespace.toString();
+        PADDING = PADDING_TEXT.getBytes();
+    }
+    
+    private boolean writePadding(AtmosphereResponse response) {
+        if (response.request() != null && response.request().getAttribute("paddingWritten") != null) {
+            return false;
+        }
+
+        response.setContentType(SERVER_SENT_EVENTS);
+        response.setCharacterEncoding("utf-8");
+        boolean isUsingStream = (Boolean) response.request().getAttribute(PROPERTY_USE_STREAM);
+        if (isUsingStream) {
+            try {
+                OutputStream stream = response.getResponse().getOutputStream();
+                try {
+                    stream.write(PADDING);
+                    stream.flush();
+                } catch (IOException ex) {
+                    LOG.log(Level.WARNING, "SSE may not work", ex);
+                }
+            } catch (IOException e) {
+                LOG.log(Level.FINEST, "", e);
+            }
+        } else {
+            try {
+                PrintWriter w = response.getResponse().getWriter();
+                w.println(PADDING_TEXT);
+                w.flush();
+            } catch (IOException e) {
+                LOG.log(Level.FINEST, "", e);
+            }
+        }
+        response.resource().getRequest().setAttribute("paddingWritten", "true");
+        return true;
+    }
+    
+    @Override
+    public Action inspect(final AtmosphereResource r) {
+        if (Utils.webSocketMessage(r)) {
+            return Action.CONTINUE;
+        }
+
+        final AtmosphereRequest request = r.getRequest();
+        final String accept = request.getHeader("Accept") == null ? "text/plain" : request.getHeader("Accept").trim();
+
+        if (r.transport().equals(AtmosphereResource.TRANSPORT.SSE) || SERVER_SENT_EVENTS.equalsIgnoreCase(accept)) {
+            final AtmosphereResponse response = r.getResponse();
+            if (response.getAsyncIOWriter() == null) {
+                response.asyncIOWriter(new SseAtmosphereInterceptorWriter());
+            }
+            
+            r.addEventListener(new P(response));
+
+            AsyncIOWriter writer = response.getAsyncIOWriter();
+            if (AtmosphereInterceptorWriter.class.isAssignableFrom(writer.getClass())) {
+                AtmosphereInterceptorWriter.class.cast(writer).interceptor(new AsyncIOInterceptorAdapter() {
+                    private boolean padding() {
+                        if (!r.isSuspended()) {
+                            return writePadding(response);
+                        }
+                        return false;
+                    }
+
+                    @Override
+                    public void prePayload(AtmosphereResponse response, byte[] data, int offset, int length) {
+                        padding();
+                    }
+
+                    @Override
+                    public void postPayload(AtmosphereResponse response, byte[] data, int offset, int length) {
+                        // The CALLBACK_JAVASCRIPT_PROTOCOL may be called by a framework running on top of Atmosphere
+                        // In that case, we must pad/protocol indenendently of the state of the AtmosphereResource
+                        if (r.isSuspended() || r.getRequest().getAttribute(CALLBACK_JAVASCRIPT_PROTOCOL) != null
+                                || r.getRequest().getAttribute(CONTAINER_RESPONSE) != null) {
+                            response.write(END, true);
+                        }
+
+                        /**
+                         * When used with https://github.com/remy/polyfills/blob/master/EventSource.js , we
+                         * resume after every message.
+                         */
+                        String ua = r.getRequest().getHeader("User-Agent");
+                        if (ua != null && ua.contains("MSIE")) {
+                            try {
+                                response.flushBuffer();
+                            } catch (IOException e) {
+                                LOG.log(Level.FINEST, "", e);
+                            }
+                            r.resume();
+                        }
+                    }
+                });
+            } else {
+                LOG.warning(String.format("Unable to apply %s. Your AsyncIOWriter must implement %s", 
+                    getClass().getName(), AtmosphereInterceptorWriter.class.getName()));
+            }
+        }
+        
+        return Action.CONTINUE;
+    }
+    
+    private final class P extends OnPreSuspend implements AllowInterceptor {
+
+        private final AtmosphereResponse response;
+
+        private P(AtmosphereResponse response) {
+            this.response = response;
+        }
+
+        @Override
+        public void onPreSuspend(AtmosphereResourceEvent event) {
+            writePadding(response);
+        }
+    }    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java
new file mode 100644
index 0000000..cfafe87
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.atmosphere;
+
+import java.io.IOException;
+
+import org.atmosphere.cpr.AtmosphereInterceptorWriter;
+import org.atmosphere.cpr.AtmosphereResponse;
+
+public class SseAtmosphereInterceptorWriter extends AtmosphereInterceptorWriter {
+    @Override
+    public void close(AtmosphereResponse response) throws IOException {
+        // Do not close the response, keep output stream open
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java
new file mode 100644
index 0000000..c330d6c
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.atmosphere;
+
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.SseContext;
+import javax.ws.rs.sse.SseEventOutput;
+
+import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter;
+import org.apache.cxf.jaxrs.sse.OutboundSseEventImpl;
+import org.apache.cxf.jaxrs.sse.SseBroadcasterImpl;
+import org.apache.cxf.jaxrs.utils.JAXRSUtils;
+import org.atmosphere.cpr.AtmosphereResource;
+
+public class SseAtmosphereResourceContext implements SseContext {
+    private final AtmosphereResource resource;
+    private final ServerProviderFactory factory;
+
+    SseAtmosphereResourceContext(final ServerProviderFactory factory, final AtmosphereResource resource) {
+        this.factory = factory;
+        this.resource = resource;
+    }
+    
+    @Override
+    public SseEventOutput newOutput() {
+        final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(factory, 
+            JAXRSUtils.getCurrentMessage().getExchange());
+        return new SseAtmosphereEventOutputImpl(writer, resource);
+    }
+
+    @Override
+    public Builder newEvent() {
+        return new OutboundSseEventImpl.BuilderImpl();
+    }
+
+    @Override
+    public SseBroadcaster newBroadcaster() {
+        return new SseBroadcasterImpl();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/cdi/SseTransportCustomizationExtension.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/cdi/SseTransportCustomizationExtension.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/cdi/SseTransportCustomizationExtension.java
new file mode 100644
index 0000000..f4ea862
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/cdi/SseTransportCustomizationExtension.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.cdi;
+
+import org.apache.cxf.cdi.extension.JAXRSServerFactoryCustomizationExtension;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.transport.sse.SseHttpTransportFactory;
+
+public class SseTransportCustomizationExtension implements JAXRSServerFactoryCustomizationExtension{
+    @Override
+    public void customize(final JAXRSServerFactoryBean bean) {
+        bean.setTransportId(SseHttpTransportFactory.TRANSPORT_ID);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java
new file mode 100644
index 0000000..af59327
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.sse;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.ConduitInitiator;
+import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.transport.http.DestinationRegistry;
+import org.apache.cxf.transport.http.HTTPTransportFactory;
+import org.apache.cxf.transport.sse.atmosphere.AtmosphereSseServletDestination;
+
+@NoJSR250Annotations
+public class SseHttpTransportFactory extends HTTPTransportFactory  
+        implements ConduitInitiator, DestinationFactory {
+    
+    public static final String TRANSPORT_ID = "http://cxf.apache.org/transports/http/sse";
+    public static final List<String> DEFAULT_NAMESPACES = Arrays.asList(
+        TRANSPORT_ID,
+        "http://cxf.apache.org/transports/http/sse/configuration"
+    );
+    
+    public SseHttpTransportFactory() {
+        this(null);
+    }
+    
+    public SseHttpTransportFactory(DestinationRegistry registry) {
+        super(DEFAULT_NAMESPACES, registry);
+    }
+    
+    @Override
+    public Destination getDestination(EndpointInfo endpointInfo, Bus bus) throws IOException {
+        return new AtmosphereSseServletDestination(bus, getRegistry(), endpointInfo, endpointInfo.getAddress());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
new file mode 100644
index 0000000..15e5c9f
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.sse.atmosphere;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.jaxrs.sse.SseFeature;
+import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereInterceptor;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.DestinationRegistry;
+import org.apache.cxf.transport.servlet.ServletDestination;
+import org.atmosphere.cpr.ApplicationConfig;
+import org.atmosphere.cpr.AtmosphereFramework;
+import org.atmosphere.cpr.AtmosphereRequestImpl;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResponseImpl;
+import org.atmosphere.handler.AbstractReflectorAtmosphereHandler;
+
+public class AtmosphereSseServletDestination extends ServletDestination {
+    private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereSseServletDestination.class);
+
+    private AtmosphereFramework framework;
+
+    public AtmosphereSseServletDestination(Bus bus, DestinationRegistry registry, 
+            EndpointInfo ei, String path) throws IOException {
+        super(bus, registry, ei, path);
+        
+        framework = new AtmosphereFramework(true, false);
+        framework.interceptor(new SseAtmosphereInterceptor());
+        framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true");
+        framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");
+        framework.addInitParameter(ApplicationConfig.DISABLE_ATMOSPHEREINTERCEPTOR, "true");
+        framework.addInitParameter(ApplicationConfig.CLOSE_STREAM_ON_CANCEL, "true");
+        framework.addAtmosphereHandler("/", new DestinationHandler());
+        framework.init();
+        
+        bus.getFeatures().add(new SseFeature());
+    }
+
+    @Override
+    public void invoke(ServletConfig config, ServletContext context, HttpServletRequest req,
+            HttpServletResponse resp) throws IOException {
+        try {
+            framework.doCometSupport(AtmosphereRequestImpl.wrap(req), AtmosphereResponseImpl.wrap(resp));
+        } catch (ServletException e) {
+            throw new IOException(e);
+        }
+    }
+    
+    @Override
+    public void shutdown() {
+        try {
+            framework.destroy();
+        } catch (Exception ex) {
+            LOG.warning("Graceful shutdown was not successful: " + ex.getMessage());
+        } finally {
+            super.shutdown();
+        }
+    }
+
+    private class DestinationHandler extends AbstractReflectorAtmosphereHandler {
+        @Override
+        public void onRequest(final AtmosphereResource resource) throws IOException {
+            LOG.fine("onRequest");
+            try {
+                AtmosphereSseServletDestination.super.invoke(null, resource.getRequest().getServletContext(),
+                    resource.getRequest(), resource.getResponse());
+            } catch (Exception e) {
+                LOG.log(Level.WARNING, "Failed to invoke service", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/src/main/resources/META-INF/beans.xml
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/resources/META-INF/beans.xml b/rt/rs/sse/src/main/resources/META-INF/beans.xml
new file mode 100644
index 0000000..aff28d9
--- /dev/null
+++ b/rt/rs/sse/src/main/resources/META-INF/beans.xml
@@ -0,0 +1,5 @@
+<?xml version="1.0"?>
+<beans xmlns="http://java.sun.com/xml/ns/javaee"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+	xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://jboss.org/schema/cdi/beans_1_1.xsd">
+</beans>

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt b/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
new file mode 100644
index 0000000..643b51c
--- /dev/null
+++ b/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
@@ -0,0 +1 @@
+org.apache.cxf.transport.sse.SseHttpTransportFactory::true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPTransportFactory.java
----------------------------------------------------------------------
diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPTransportFactory.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPTransportFactory.java
index 706c8c1..e09d549 100644
--- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPTransportFactory.java
+++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPTransportFactory.java
@@ -87,13 +87,19 @@ public class HTTPTransportFactory
     public HTTPTransportFactory() {
         this(new DestinationRegistryImpl());
     }
+    
     public HTTPTransportFactory(DestinationRegistry registry) {
-        super(DEFAULT_NAMESPACES);
+        this(DEFAULT_NAMESPACES, registry);
+    }
+    
+    protected HTTPTransportFactory(List<String> transportIds, DestinationRegistry registry) {
+        super(transportIds);
         if (registry == null) {
             registry = new DestinationRegistryImpl();
         }
         this.registry = registry;
     }
+    
     public DestinationRegistry getRegistry() {
         return registry;
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/rt/transports/http/src/main/java/org/apache/cxf/transport/servlet/CXFNonSpringServlet.java
----------------------------------------------------------------------
diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/servlet/CXFNonSpringServlet.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/servlet/CXFNonSpringServlet.java
index fe46f4f..37d56b8 100644
--- a/rt/transports/http/src/main/java/org/apache/cxf/transport/servlet/CXFNonSpringServlet.java
+++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/servlet/CXFNonSpringServlet.java
@@ -38,6 +38,7 @@ import org.apache.cxf.BusException;
 import org.apache.cxf.BusFactory;
 import org.apache.cxf.common.classloader.ClassLoaderUtils;
 import org.apache.cxf.common.classloader.ClassLoaderUtils.ClassLoaderHolder;
+import org.apache.cxf.common.util.StringUtils;
 import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.resource.ResourceManager;
 import org.apache.cxf.transport.DestinationFactory;
@@ -48,6 +49,8 @@ import org.apache.cxf.transport.http.HTTPTransportFactory;
 import org.apache.cxf.transport.servlet.servicelist.ServiceListGeneratorServlet;
 
 public class CXFNonSpringServlet extends AbstractHTTPServlet {
+    public static final String TRANSPORT_ID = "transportId";
+    
     private static final long serialVersionUID = -2437897227486327166L;
     private static final String IGNORE_SERVLET_CONTEXT_RESOLVER = "ignore.servlet.context.resolver";
     
@@ -80,7 +83,7 @@ public class CXFNonSpringServlet extends AbstractHTTPServlet {
             loader = initClassLoader();
             registerServletContextResolver(sc);
             if (destinationRegistry == null) {
-                this.destinationRegistry = getDestinationRegistryFromBus();
+                this.destinationRegistry = getDestinationRegistryFromBusOrDefault(sc.getInitParameter(TRANSPORT_ID));
             }
         }
 
@@ -101,11 +104,12 @@ public class CXFNonSpringServlet extends AbstractHTTPServlet {
         return bus.getExtension(ClassLoader.class);
     }
     
-    protected DestinationRegistry getDestinationRegistryFromBus() {
+    protected DestinationRegistry getDestinationRegistryFromBusOrDefault(final String transportId) {
         DestinationFactoryManager dfm = bus.getExtension(DestinationFactoryManager.class);
         try {
-            DestinationFactory df = dfm
-                .getDestinationFactory("http://cxf.apache.org/transports/http/configuration");
+            DestinationFactory df = StringUtils.isEmpty(transportId)
+                ? dfm.getDestinationFactory("http://cxf.apache.org/transports/http/configuration")
+                    : dfm.getDestinationFactory(transportId);
             if (df instanceof HTTPTransportFactory) {
                 HTTPTransportFactory transportFactory = (HTTPTransportFactory)df;
                 return transportFactory.getRegistry();

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/systests/pom.xml
----------------------------------------------------------------------
diff --git a/systests/pom.xml b/systests/pom.xml
index 3a5ba50..66163b2 100644
--- a/systests/pom.xml
+++ b/systests/pom.xml
@@ -52,5 +52,6 @@
         <module>tracing</module>
         <module>jibx</module>
         <module>ws-transfer</module>
+        <module>rs-sse</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/systests/rs-sse/pom.xml
----------------------------------------------------------------------
diff --git a/systests/rs-sse/pom.xml b/systests/rs-sse/pom.xml
new file mode 100644
index 0000000..dc3da1a
--- /dev/null
+++ b/systests/rs-sse/pom.xml
@@ -0,0 +1,166 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <parent>
+        <artifactId>cxf-parent</artifactId>
+        <groupId>org.apache.cxf</groupId>
+        <version>3.2.0-SNAPSHOT</version>
+        <relativePath>../../parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.cxf.systests</groupId>
+    <artifactId>cxf-systests-rs-sse</artifactId>
+    <name>Apache CXF SSE Integration System Tests</name>
+    <description>Apache CXF SSE Integration System Tests</description>
+    <url>http://cxf.apache.org</url>
+    <properties>
+        <cxf.surefire.fork.vmargs>-XX:MaxPermSize=192m</cxf.surefire.fork.vmargs>
+        <cxf.server.launcher.vmargs>-XX:MaxPermSize=192m</cxf.server.launcher.vmargs>
+        <cxf.jetty.version>${cxf.jetty9.version}</cxf.jetty.version>
+        <cxf.tomcat.version>8.0.32</cxf.tomcat.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-annotations</artifactId>
+            <version>${cxf.jetty.version}</version>
+        </dependency>    
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-plus</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty.websocket</groupId>
+            <artifactId>websocket-server</artifactId>
+            <version>${cxf.jetty.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-webapp</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-jdk14</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-transports-http-jetty</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-sse</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tomcat.embed</groupId>
+            <artifactId>tomcat-embed-core</artifactId>
+            <version>${cxf.tomcat.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tomcat.embed</groupId>
+            <artifactId>tomcat-embed-logging-juli</artifactId>
+            <version>${cxf.tomcat.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tomcat</groupId>
+            <artifactId>tomcat-jasper</artifactId>
+            <version>${cxf.tomcat.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-testutils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>javax.annotation</groupId>
+            <artifactId>jsr250-api</artifactId>
+            <version>1.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.jaxrs</groupId>
+            <artifactId>jackson-jaxrs-json-provider</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.ning</groupId>
+            <artifactId>async-http-client</artifactId>
+            <version>${cxf.ahc.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/cxf/blob/41733786/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
----------------------------------------------------------------------
diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
new file mode 100644
index 0000000..0cf3087
--- /dev/null
+++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jaxrs.sse;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.hasItems;
+
+public abstract class AbstractSseTest extends AbstractBusClientServerTestBase {
+    private final ObjectMapper mapper = new ObjectMapper();
+    
+    @Test
+    public void testBooksStreamIsReturned() throws JsonProcessingException {
+        Response r = createWebClient("/rest/api/bookstore/sse/100").get();
+        assertEquals(Status.OK.getStatusCode(), r.getStatus());
+        
+        final String response = r.readEntity(String.class);
+        assertThat(response, containsString("id: 1"));
+        assertThat(response, containsString("data: " + toJson("New Book #1", 1)));
+        
+        assertThat(response, containsString("id: 2"));
+        assertThat(response, containsString("data: " + toJson("New Book #2", 2)));
+        
+        assertThat(response, containsString("id: 3"));
+        assertThat(response, containsString("data: " + toJson("New Book #3", 3)));
+        
+        assertThat(response, containsString("id: 4"));
+        assertThat(response, containsString("data: " + toJson("New Book #4", 4)));
+    }
+    
+    @Test
+    public void testBooksStreamIsReturnedFromLastEventId() throws JsonProcessingException {
+        Response r = createWebClient("/rest/api/bookstore/sse/100")
+            .header(HttpHeaders.LAST_EVENT_ID_HEADER, 150)
+            .get();
+        assertEquals(Status.OK.getStatusCode(), r.getStatus());
+        
+        final String response = r.readEntity(String.class);
+        assertThat(response, containsString("id: 151"));
+        assertThat(response, containsString("data: " + toJson("New Book #151", 151)));
+        
+        assertThat(response, containsString("id: 152"));
+        assertThat(response, containsString("data: " + toJson("New Book #152", 152)));
+        
+        assertThat(response, containsString("id: 152"));
+        assertThat(response, containsString("data: " + toJson("New Book #153", 153)));
+        
+        assertThat(response, containsString("id: 152"));
+        assertThat(response, containsString("data: " + toJson("New Book #154", 154)));
+    }
+    
+    @Test
+    public void testBooksAreReturned() throws JsonProcessingException {
+        Response r = createWebClient("/rest/api/bookstore", MediaType.APPLICATION_JSON).get();
+        assertEquals(Status.OK.getStatusCode(), r.getStatus());
+        
+        final Book[] books = r.readEntity(Book[].class);
+        assertThat(Arrays.asList(books), hasItems(new Book("New Book #1", 1), new Book("New Book #2", 2)));
+    }
+    
+    @Test
+    public void testBooksStreamIsBroadcasted() throws Exception {
+        final Collection<Future<Response>> results = new ArrayList<>();
+        
+        for (int i = 0; i < 2; ++i) {
+            results.add(
+                createWebClient("/rest/api/bookstore/broadcast/sse").async().get()
+            );
+        }
+
+        createWebClient("/rest/api/bookstore/broadcast/close")
+            .async()
+            .post(null)
+            .get(4, TimeUnit.SECONDS)
+            .close();
+
+        for (final Future<Response> result: results) {
+            final Response r = result.get(1, TimeUnit.SECONDS);
+            assertEquals(Status.OK.getStatusCode(), r.getStatus());
+    
+            final String response = r.readEntity(String.class);
+            assertThat(response, containsString("id: 1000"));
+            assertThat(response, containsString("data: " + toJson("New Book #1000", 1000)));
+            
+            assertThat(response, containsString("id: 2000"));
+            assertThat(response, containsString("data: " + toJson("New Book #2000", 2000)));
+            
+            r.close();
+        }
+    }
+
+    private String toJson(final String name, final Integer id) throws JsonProcessingException {
+        return mapper.writeValueAsString(new Book(name, id));
+    }
+    
+    protected WebClient createWebClient(final String url, final String media) {
+        final List< ? > providers = Arrays.asList(new JacksonJsonProvider());
+        
+        final WebClient wc = WebClient
+            .create("http://localhost:" + getPort() + url, providers)
+            .accept(media);
+        
+        WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(5000L);
+        return wc;
+    }
+    
+    protected WebClient createWebClient(final String url) {
+        return createWebClient(url, MediaType.SERVER_SENT_EVENTS);
+    }
+    
+    protected abstract int getPort();
+}


Mime
View raw message