cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject cxf git commit: Upgrading to JAX-RS 2.1-m04, updating SSE server-side implementation to accomodate API changes
Date Mon, 13 Feb 2017 01:40:10 GMT
Repository: cxf
Updated Branches:
  refs/heads/master e1d950605 -> 90a74fc75


Upgrading to JAX-RS 2.1-m04, updating SSE server-side implementation to accomodate API changes


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/90a74fc7
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/90a74fc7
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/90a74fc7

Branch: refs/heads/master
Commit: 90a74fc75af41898ee50359c13f298eb483a3d01
Parents: e1d9506
Author: reta <drreta@gmail.com>
Authored: Sun Feb 12 20:39:39 2017 -0500
Committer: reta <drreta@gmail.com>
Committed: Sun Feb 12 20:39:39 2017 -0500

----------------------------------------------------------------------
 .../demo/jaxrs/sse/StatsRestServiceImpl.java    |  51 +++----
 .../demo/jaxrs/sse/StatsRestServiceImpl.java    |  50 +++----
 .../demo/jaxrs/sse/StatsRestServiceImpl.java    |  51 +++----
 parent/pom.xml                                  |   4 +-
 .../cxf/jaxrs/sse/SseBroadcasterImpl.java       |  70 ++++++---
 .../cxf/jaxrs/sse/SseEventOutputProvider.java   |  53 -------
 .../org/apache/cxf/jaxrs/sse/SseFactory.java    |  27 ++++
 .../org/apache/cxf/jaxrs/sse/SseFeature.java    |   5 +-
 .../java/org/apache/cxf/jaxrs/sse/SseImpl.java  |  38 +++++
 .../SseAtmosphereContextProvider.java           |  57 --------
 .../SseAtmosphereEventOutputImpl.java           | 121 ----------------
 .../SseAtmosphereEventSinkContextProvider.java  |  61 ++++++++
 .../atmosphere/SseAtmosphereEventSinkImpl.java  | 144 +++++++++++++++++++
 .../SseAtmosphereResourceContext.java           |  60 --------
 .../AtmosphereSseServletDestination.java        |  36 +++++
 .../jaxrs/sse/AbstractBroadcasterSseTest.java   |   2 +-
 .../apache/cxf/systest/jaxrs/sse/BookStore.java |  69 +++++----
 systests/rs-sse/src/test/resources/logback.xml  |   4 +-
 18 files changed, 471 insertions(+), 432 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
index b8f8608..a156125 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
@@ -29,52 +29,53 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.SseContext;
-import javax.ws.rs.sse.SseEventOutput;
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.jaxrs.sse.SseFactory;
 
 @Path("/stats")
 public class StatsRestServiceImpl {
     private static final Random RANDOM = new Random();
+    private final Sse sse = SseFactory.create();
 
-    private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
-        return builder
-            .id("" + eventId)
-            .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
-            .mediaType(MediaType.APPLICATION_JSON_TYPE)
-            .build();
-    }
-    
     @GET
     @Path("sse/{id}")
-    @Produces("text/event-stream")
-    public SseEventOutput stats(@Context SseContext sseContext, @PathParam("id") final String id) {
-        final SseEventOutput output = sseContext.newOutput();
-        
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    public void stats(@Context SseEventSink sink, @PathParam("id") final String id) {
         new Thread() {
             public void run() {
                 try {
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 1));
+                    final Builder builder = sse.newEventBuilder();
+                    sink.onNext(createStatsEvent(builder.name("stats"), 1));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 2));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 2));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 3));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 3));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 4));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 4));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 5));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 5));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 6));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 6));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 7));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 7));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 8));
-                    output.close();
+                    sink.onNext(createStatsEvent(builder.name("stats"), 8));
+                    sink.close();
                 } catch (final InterruptedException | IOException e) {
                     e.printStackTrace();
                 }
             }
         }.start();
-
-        return output;
+    }
+    
+    private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+        return builder
+            .id("" + eventId)
+            .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
+            .mediaType(MediaType.APPLICATION_JSON_TYPE)
+            .build();
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
index 7112228..005c2bf 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
@@ -29,55 +29,55 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.SseContext;
-import javax.ws.rs.sse.SseEventOutput;
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
 
+import org.apache.cxf.jaxrs.sse.SseFactory;
 import org.springframework.stereotype.Component;
 
 @Path("/stats")
 @Component
 public class StatsRestServiceImpl {
     private static final Random RANDOM = new Random();
-
-    private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
-        return builder
-            .id("" + eventId)
-            .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
-            .mediaType(MediaType.APPLICATION_JSON_TYPE)
-            .build();
-    }
+    private final Sse sse = SseFactory.create();
     
     @GET
     @Path("sse/{id}")
-    @Produces("text/event-stream")
-    public SseEventOutput stats(@Context SseContext sseContext, @PathParam("id") final String id) {
-        final SseEventOutput output = sseContext.newOutput();
-        
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    public void stats(@Context SseEventSink sink, @PathParam("id") final String id) {
         new Thread() {
             public void run() {
                 try {
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 1));
+                    final Builder builder = sse.newEventBuilder();
+                    sink.onNext(createStatsEvent(builder.name("stats"), 1));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 2));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 2));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 3));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 3));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 4));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 4));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 5));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 5));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 6));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 6));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 7));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 7));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 8));
-                    output.close();
+                    sink.onNext(createStatsEvent(builder.name("stats"), 8));
+                    sink.close();
                 } catch (final InterruptedException | IOException e) {
                     e.printStackTrace();
                 }
             }
         }.start();
-
-        return output;
+    }
+    
+    private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+        return builder
+            .id("" + eventId)
+            .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
+            .mediaType(MediaType.APPLICATION_JSON_TYPE)
+            .build();
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
index b8f8608..f937702 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
@@ -29,52 +29,53 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.SseContext;
-import javax.ws.rs.sse.SseEventOutput;
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.jaxrs.sse.SseFactory;
 
 @Path("/stats")
 public class StatsRestServiceImpl {
     private static final Random RANDOM = new Random();
-
-    private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
-        return builder
-            .id("" + eventId)
-            .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
-            .mediaType(MediaType.APPLICATION_JSON_TYPE)
-            .build();
-    }
+    private final Sse sse = SseFactory.create();
     
     @GET
     @Path("sse/{id}")
-    @Produces("text/event-stream")
-    public SseEventOutput stats(@Context SseContext sseContext, @PathParam("id") final String id) {
-        final SseEventOutput output = sseContext.newOutput();
-        
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    public void stats(@Context SseEventSink sink, @PathParam("id") final String id) {
         new Thread() {
             public void run() {
                 try {
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 1));
+                    final Builder builder = sse.newEventBuilder();
+                    sink.onNext(createStatsEvent(builder.name("stats"), 1));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 2));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 2));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 3));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 3));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 4));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 4));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 5));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 5));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 6));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 6));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 7));
+                    sink.onNext(createStatsEvent(builder.name("stats"), 7));
                     Thread.sleep(1000);
-                    output.write(createStatsEvent(sseContext.newEvent().name("stats"), 8));
-                    output.close();
+                    sink.onNext(createStatsEvent(builder.name("stats"), 8));
+                    sink.close();
                 } catch (final InterruptedException | IOException e) {
                     e.printStackTrace();
                 }
             }
         }.start();
-
-        return output;
+    }
+    
+    private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+        return builder
+            .id("" + eventId)
+            .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
+            .mediaType(MediaType.APPLICATION_JSON_TYPE)
+            .build();
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index fadbd8f..0885fc1 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -70,7 +70,7 @@
         <cxf.activemq.version>5.14.2</cxf.activemq.version>
         <cxf.ahc.version>1.9.8</cxf.ahc.version>
         <cxf.apacheds.version>2.0.0-M23</cxf.apacheds.version>
-        <cxf.atmosphere.version>2.4.7</cxf.atmosphere.version>
+        <cxf.atmosphere.version>2.4.9</cxf.atmosphere.version>
         <cxf.atmosphere.version.range>[2.4,3.0)</cxf.atmosphere.version.range>
         <cxf.axiom.version>1.2.14</cxf.axiom.version>
         <cxf.bcprov.version>1.55</cxf.bcprov.version>
@@ -109,7 +109,7 @@
         <cxf.geronimo.transaction.version>1.1.1</cxf.geronimo.transaction.version>
         <cxf.jasypt.bundle.version>1.9.0_1</cxf.jasypt.bundle.version>
         <cxf.javassist.version>3.19.0-GA</cxf.javassist.version>
-        <cxf.javax.ws.rs.version>2.1-m03</cxf.javax.ws.rs.version>
+        <cxf.javax.ws.rs.version>2.1-m04</cxf.javax.ws.rs.version>
         <cxf.jaxb.version>2.2.11</cxf.jaxb.version>
         <cxf.jaxb.impl.version>${cxf.jaxb.version}</cxf.jaxb.impl.version>
         <cxf.jaxb.core.version>${cxf.jaxb.version}</cxf.jaxb.core.version>

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
index 977a6b2..075ae56 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
@@ -18,48 +18,74 @@
  */
 package org.apache.cxf.jaxrs.sse;
 
-import java.io.IOException;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
+import javax.ws.rs.Flow;
+import javax.ws.rs.Flow.Subscriber;
+import javax.ws.rs.Flow.Subscription;
 import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.SseBroadcaster;
-import javax.ws.rs.sse.SseEventOutput;
 
 public class SseBroadcasterImpl implements SseBroadcaster {
-    private final Set<SseEventOutput> outputs = new CopyOnWriteArraySet<>();
-    private final Set<Listener> listeners = new CopyOnWriteArraySet<>();
+    private final Map<Flow.Subscriber<? super OutboundSseEvent>, Subscription> subscribers = 
+            new ConcurrentHashMap<>();
+    
+    private final Set<Consumer<Subscriber<? super OutboundSseEvent>>> closers = 
+            new CopyOnWriteArraySet<>();
+    
+    private final Set<BiConsumer<Subscriber<? super OutboundSseEvent>, Exception>> exceptioners = 
+            new CopyOnWriteArraySet<>();
             
     @Override
-    public boolean register(Listener listener) {
-        return listeners.add(listener);
-    }
+    public void subscribe(Flow.Subscriber<? super OutboundSseEvent> subscriber) {
+        final Subscription subscription =  new Subscription() {
+            public void request(long n) {
+            }
+            
+            @Override
+            public void cancel() {
+            }
+        };
 
-    @Override
-    public boolean register(SseEventOutput output) {
-        return outputs.add(output);
+        try {
+            subscriber.onSubscribe(subscription);
+            subscribers.put(subscriber, subscription);
+        } catch (final Exception ex) {
+            subscriber.onError(ex); 
+        }
     }
 
     @Override
     public void broadcast(OutboundSseEvent event) {
-        for (final SseEventOutput output: outputs) {
+        for (final Flow.Subscriber<? super OutboundSseEvent> subscriber: subscribers.keySet()) {
             try {
-                output.write(event);
-            } catch (final IOException ex) {
-                listeners.forEach(listener -> listener.onException(output, ex));
+                subscriber.onNext(event);
+            } catch (final Exception ex) {
+                exceptioners.forEach(exceptioner -> exceptioner.accept(subscriber, ex));
             }
         }
     }
+    
+    @Override
+    public void onClose(Consumer<Subscriber<? super OutboundSseEvent>> subscriber) {
+        closers.add(subscriber);
+    }
+    
+    @Override
+    public void onException(BiConsumer<Subscriber<? super OutboundSseEvent>, Exception> exceptioner) {
+        exceptioners.add(exceptioner);
+    }
 
     @Override
     public void close() {
-        for (final SseEventOutput output: outputs) {
-            try {
-                output.close();
-                listeners.forEach(listener -> listener.onClose(output));
-            } catch (final IOException ex) {
-                listeners.forEach(listener -> listener.onException(output, ex));
-            }
-        }
+        subscribers.keySet().forEach(subscriber -> {
+            subscriber.onComplete();
+            closers.forEach(closer -> closer.accept(subscriber));
+        });
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java
deleted file mode 100644
index 7f7963f..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.ext.Provider;
-import javax.ws.rs.sse.SseEventOutput;
-
-@Provider
-public class SseEventOutputProvider implements MessageBodyWriter<SseEventOutput> {
-    @Override
-    public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, MediaType mt) {
-        return SseEventOutput.class.isAssignableFrom(cls);
-    }
-    
-    @Override
-    public long getSize(final SseEventOutput output, final Class<?> type, final Type genericType,
-                        final Annotation[] annotations, final MediaType mediaType) {
-        return -1;
-    }
-
-    @Override
-    public void writeTo(final SseEventOutput output, final Class<?> type, final Type genericType,
-                        final Annotation[] annotations, final MediaType mediaType,
-                        final MultivaluedMap<String, Object> httpHeaders, final OutputStream entityStream)
-            throws IOException, WebApplicationException {
-        // do nothing.
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFactory.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFactory.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFactory.java
new file mode 100644
index 0000000..36a0e8e
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import javax.ws.rs.sse.Sse;
+
+public interface SseFactory {
+    static Sse create() {
+        return new SseImpl();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
index da682a0..2a381ea 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
@@ -25,15 +25,14 @@ import org.apache.cxf.Bus;
 import org.apache.cxf.endpoint.Server;
 import org.apache.cxf.feature.AbstractFeature;
 import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereContextProvider;
+import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereEventSinkContextProvider;
 
 public class SseFeature extends AbstractFeature {
     @Override
     public void initialize(Server server, Bus bus) {
         final List<Object> providers = new ArrayList<>();
 
-        providers.add(new SseAtmosphereContextProvider());
-        providers.add(new SseEventOutputProvider());
+        providers.add(new SseAtmosphereEventSinkContextProvider());
 
         ((ServerProviderFactory) server.getEndpoint().get(
             ServerProviderFactory.class.getName())).setUserProviders(providers);

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseImpl.java
new file mode 100644
index 0000000..9327c3f
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseImpl.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.Sse;
+
+class SseImpl implements Sse {
+    SseImpl() {
+    }
+    
+    @Override
+    public Builder newEventBuilder() {
+        return new OutboundSseEventImpl.BuilderImpl();
+    }
+
+    @Override
+    public SseBroadcaster newBroadcaster() {
+        return new SseBroadcasterImpl();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java
deleted file mode 100644
index de2c3a9..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.ext.Provider;
-import javax.ws.rs.sse.SseContext;
-
-import org.apache.cxf.jaxrs.ext.ContextProvider;
-import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.message.Message;
-import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.Broadcaster;
-
-@Provider
-public class SseAtmosphereContextProvider implements ContextProvider<SseContext> {
-    @Override
-    public SseContext createContext(Message message) {
-        final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
-        if (request == null) {
-            throw new IllegalStateException("Unable to retrieve HTTP request from the context");
-        }
-        
-        final AtmosphereResource resource = (AtmosphereResource)request
-            .getAttribute(AtmosphereResource.class.getName());
-        if (resource == null) {
-            throw new IllegalStateException("AtmosphereResource is not present, "
-                    + "is AtmosphereServlet configured properly?");
-        }
-        
-        final Broadcaster broadcaster = resource.getAtmosphereConfig()
-            .getBroadcasterFactory()
-            .lookup(resource.uuid(), true);
-        
-        resource.removeFromAllBroadcasters();
-        resource.setBroadcaster(broadcaster);
-        
-        return new SseAtmosphereResourceContext(ServerProviderFactory.getInstance(message), resource);
-    }
-} 

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java
deleted file mode 100644
index 439c96d..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Logger;
-
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.SseEventOutput;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResponse;
-import org.atmosphere.cpr.Broadcaster;
-
-public class SseAtmosphereEventOutputImpl implements SseEventOutput {
-    private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereEventOutputImpl.class);
-    
-    private final AtmosphereResource resource;
-    private final MessageBodyWriter<OutboundSseEvent> writer;
-    private volatile boolean closed;
-    
-    public SseAtmosphereEventOutputImpl(final MessageBodyWriter<OutboundSseEvent> writer, 
-            final AtmosphereResource resource) {
-        this.writer = writer;
-        this.resource = resource;
-        
-        if (!resource.isSuspended()) {
-            LOG.fine("Atmosphere resource is not suspended, suspending");
-            resource.suspend();
-        }
-    }
-    
-    @Override
-    public void close() throws IOException {
-        if (!closed) {
-            closed = true;
-
-            LOG.fine("Closing Atmosphere SSE event output");
-            if (resource.isSuspended()) {
-                LOG.fine("Atmosphere resource is suspended, resuming");
-                resource.resume();
-            }
-
-            final Broadcaster broadcaster = resource.getBroadcaster();
-            resource.removeFromAllBroadcasters();
-            
-            try {
-                final AtmosphereResponse response = resource.getResponse();
-                if (!response.isCommitted()) {
-                    LOG.fine("Response is not committed, flushing buffer");
-                    response.flushBuffer();
-                }
-                
-                response.closeStreamOrWriter();
-            } finally {
-                resource.close();
-                broadcaster.destroy();
-                LOG.fine("Atmosphere SSE event output is closed");
-            }
-        }
-    }
-
-    @Override
-    public void write(OutboundSseEvent event) throws IOException {
-        if (!closed && writer != null) {
-            try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
-                writer.writeTo(event, event.getClass(), null, new Annotation [] {}, event.getMediaType(), null, os);
-                
-                // Atmosphere broadcasts asynchronously which is acceptable in most cases.
-                // Unfortunately, calling close() may lead to response stream being closed
-                // while there are still some SSE delivery scheduled.
-                final Future<Object> future = resource
-                    .getBroadcaster()
-                    .broadcast(os.toString(StandardCharsets.UTF_8.name()));
-                
-                try {
-                    if (!future.isDone()) {
-                        // Let us wait at least 200 milliseconds before returning to ensure 
-                        // that SSE had the opportunity to be delivered.
-                        LOG.info("Waiting 200ms to ensure SSE Atmosphere response is delivered");
-                        future.get(200, TimeUnit.MILLISECONDS);
-                    }
-                } catch (final ExecutionException | InterruptedException ex) {
-                    throw new IOException(ex);
-                } catch (final TimeoutException ex) {
-                    LOG.warning("SSE Atmosphere response was not delivered within default timeout");
-                }
-            }
-        }
-    }
-
-    @Override
-    public boolean isClosed() {
-        return closed;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java
new file mode 100644
index 0000000..bbbd754
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.atmosphere;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.jaxrs.ext.ContextProvider;
+import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.http.AbstractHTTPDestination;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.Broadcaster;
+
+public class SseAtmosphereEventSinkContextProvider implements ContextProvider<SseEventSink> {
+    @Override
+    public SseEventSink createContext(Message message) {
+        final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
+        if (request == null) {
+            throw new IllegalStateException("Unable to retrieve HTTP request from the context");
+        }
+        
+        final AtmosphereResource resource = (AtmosphereResource)request
+            .getAttribute(AtmosphereResource.class.getName());
+        if (resource == null) {
+            throw new IllegalStateException("AtmosphereResource is not present, "
+                    + "is AtmosphereServlet configured properly?");
+        }
+        
+        final Broadcaster broadcaster = resource.getAtmosphereConfig()
+            .getBroadcasterFactory()
+            .lookup(resource.uuid(), true);
+        
+        resource.removeFromAllBroadcasters();
+        resource.setBroadcaster(broadcaster);
+        
+        final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(
+            ServerProviderFactory.getInstance(message), message.getExchange());
+
+        return new SseAtmosphereEventSinkImpl(writer, resource);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
new file mode 100644
index 0000000..c930589
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.atmosphere;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import javax.ws.rs.Flow.Subscription;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResponse;
+import org.atmosphere.cpr.Broadcaster;
+
+public class SseAtmosphereEventSinkImpl implements SseEventSink {
+    private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereEventSinkImpl.class);
+    
+    private final AtmosphereResource resource;
+    private final MessageBodyWriter<OutboundSseEvent> writer;
+    
+    private volatile boolean closed;
+    
+    public SseAtmosphereEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer, 
+            final AtmosphereResource resource) {
+        this.writer = writer;
+        this.resource = resource;
+        
+        if (!resource.isSuspended()) {
+            LOG.fine("Atmosphere resource is not suspended, suspending");
+            resource.suspend();
+        }
+    }
+    
+    @Override
+    public void close() throws IOException {
+        if (!closed) {
+            closed = true;
+
+            LOG.fine("Closing Atmosphere SSE event output");
+            if (resource.isSuspended()) {
+                LOG.fine("Atmosphere resource is suspended, resuming");
+                resource.resume();
+            }
+
+            final Broadcaster broadcaster = resource.getBroadcaster();
+            resource.removeFromAllBroadcasters();
+            
+            try {
+                final AtmosphereResponse response = resource.getResponse();
+                if (!response.isCommitted()) {
+                    LOG.fine("Response is not committed, flushing buffer");
+                    response.flushBuffer();
+                }
+                
+                response.closeStreamOrWriter();
+            } finally {
+                resource.close();
+                broadcaster.destroy();
+                LOG.fine("Atmosphere SSE event output is closed");
+            }
+        }
+    }
+    
+    @Override
+    public void onNext(OutboundSseEvent event) {
+        if (!closed && writer != null) {
+            try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
+                writer.writeTo(event, event.getClass(), null, new Annotation [] {}, event.getMediaType(), null, os);
+                
+                // Atmosphere broadcasts asynchronously which is acceptable in most cases.
+                // Unfortunately, calling close() may lead to response stream being closed
+                // while there are still some SSE delivery scheduled.
+                final Future<Object> future = resource
+                    .getBroadcaster()
+                    .broadcast(os.toString(StandardCharsets.UTF_8.name()));
+                
+                try {
+                    if (!future.isDone()) {
+                        // Let us wait at least 200 milliseconds before returning to ensure 
+                        // that SSE had the opportunity to be delivered.
+                        LOG.info("Waiting 200ms to ensure SSE Atmosphere response is delivered");
+                        future.get(200, TimeUnit.MILLISECONDS);
+                    }
+                } catch (final ExecutionException | InterruptedException ex) {
+                    throw new IOException(ex);
+                } catch (final TimeoutException ex) {
+                    LOG.warning("SSE Atmosphere response was not delivered within default timeout");
+                }
+            } catch(final IOException ex) {
+                LOG.warning("While writing the SSE event, an exception was raised: " + ex);
+            }
+        }
+    }
+    
+    @Override
+    public void onError(Throwable throwable) {
+        // TODO: Should we close the response?
+    }
+    
+    @Override
+    public void onComplete() {
+        try {
+            close();
+        } catch (final IOException ex) {
+            LOG.warning("While closing the SSE connection, an exception was raised: " + ex);
+        }
+    }
+    
+    @Override
+    public void onSubscribe(Subscription subscription) {
+        subscription.request(Long.MAX_VALUE);
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java
deleted file mode 100644
index c330d6c..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.OutboundSseEvent.Builder;
-import javax.ws.rs.sse.SseBroadcaster;
-import javax.ws.rs.sse.SseContext;
-import javax.ws.rs.sse.SseEventOutput;
-
-import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter;
-import org.apache.cxf.jaxrs.sse.OutboundSseEventImpl;
-import org.apache.cxf.jaxrs.sse.SseBroadcasterImpl;
-import org.apache.cxf.jaxrs.utils.JAXRSUtils;
-import org.atmosphere.cpr.AtmosphereResource;
-
-public class SseAtmosphereResourceContext implements SseContext {
-    private final AtmosphereResource resource;
-    private final ServerProviderFactory factory;
-
-    SseAtmosphereResourceContext(final ServerProviderFactory factory, final AtmosphereResource resource) {
-        this.factory = factory;
-        this.resource = resource;
-    }
-    
-    @Override
-    public SseEventOutput newOutput() {
-        final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(factory, 
-            JAXRSUtils.getCurrentMessage().getExchange());
-        return new SseAtmosphereEventOutputImpl(writer, resource);
-    }
-
-    @Override
-    public Builder newEvent() {
-        return new OutboundSseEventImpl.BuilderImpl();
-    }
-
-    @Override
-    public SseBroadcaster newBroadcaster() {
-        return new SseBroadcasterImpl();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
index 37cfc5e..673051a 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
@@ -20,6 +20,10 @@
 package org.apache.cxf.transport.sse.atmosphere;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -28,13 +32,17 @@ import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.jaxrs.sse.SseFeature;
 import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereInterceptor;
+import org.apache.cxf.message.Message;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.http.DestinationRegistry;
+import org.apache.cxf.transport.http.Headers;
 import org.apache.cxf.transport.servlet.ServletDestination;
 import org.atmosphere.cache.UUIDBroadcasterCache;
 import org.atmosphere.cpr.ApplicationConfig;
@@ -99,4 +107,32 @@ public class AtmosphereSseServletDestination extends ServletDestination {
             }
         }
     }
+    
+    @Override
+    protected OutputStream flushHeaders(Message outMessage, boolean getStream) throws IOException {
+        adjustContentLength(outMessage);
+        return super.flushHeaders(outMessage, getStream);
+    }
+
+    @Override
+    protected OutputStream flushHeaders(Message outMessage) throws IOException {
+        adjustContentLength(outMessage);
+        return super.flushHeaders(outMessage);
+    }
+    
+    /**
+     * It has been noticed that Jetty checks the "Content-Length" header and completes the 
+     * response if its value is 0 (or matches the number of bytes written). However, in case
+     * of SSE the content length is unknown so we are setting it to -1 before flushing the 
+     * response. Otherwise, only the first event is going to be sent and response is going to
+     * be closed.
+     */
+    private void adjustContentLength(Message outMessage) {
+        final String contentType = (String)outMessage.get(Message.CONTENT_TYPE);
+        
+        if (MediaType.SERVER_SENT_EVENTS.equalsIgnoreCase(contentType)) {
+            final Map<String, List<String>> headers = Headers.getSetProtocolHeaders(outMessage);
+            headers.put(HttpHeaders.CONTENT_LENGTH, Collections.singletonList("-1"));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractBroadcasterSseTest.java
----------------------------------------------------------------------
diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractBroadcasterSseTest.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractBroadcasterSseTest.java
index e4fb617..3e2eb96 100644
--- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractBroadcasterSseTest.java
+++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractBroadcasterSseTest.java
@@ -49,7 +49,7 @@ public abstract class AbstractBroadcasterSseTest extends AbstractSseBaseTest {
         createWebClient("/rest/api/bookstore/broadcast/close")
             .async()
             .post(null)
-            .get(5, TimeUnit.SECONDS)
+            .get(10, TimeUnit.SECONDS)
             .close();
 
         for (final Future<Response> result: results) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
----------------------------------------------------------------------
diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index 1d5baff..6aa815d 100644
--- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
@@ -36,10 +35,12 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.Sse;
 import javax.ws.rs.sse.SseBroadcaster;
-import javax.ws.rs.sse.SseContext;
-import javax.ws.rs.sse.SseEventOutput;
+import javax.ws.rs.sse.SseEventSink;
 
+import org.apache.cxf.jaxrs.sse.SseFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,16 +48,12 @@ import org.slf4j.LoggerFactory;
 public class BookStore {
     private static final Logger LOG = LoggerFactory.getLogger(BookStore.class);
 
+    private final Sse sse = SseFactory.create();
     private final CountDownLatch latch = new CountDownLatch(2);
-    private final AtomicReference<SseBroadcaster> broadcaster = 
-        new AtomicReference<SseBroadcaster>();
+    private final SseBroadcaster broadcaster;
     
-    private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
-        return builder
-            .id(Integer.toString(eventId))
-            .data(Book.class, new Book("New Book #" + eventId, eventId))
-            .mediaType(MediaType.APPLICATION_JSON_TYPE)
-            .build();
+    public BookStore() {
+        broadcaster = sse.newBroadcaster();
     }
     
     @GET
@@ -71,50 +68,37 @@ public class BookStore {
     @GET
     @Path("sse/{id}")
     @Produces(MediaType.SERVER_SENT_EVENTS)
-    public SseEventOutput forBook(@Context SseContext sseContext, @PathParam("id") final String id, 
+    public void forBook(@Context SseEventSink sink, @PathParam("id") final String id, 
             @HeaderParam(HttpHeaders.LAST_EVENT_ID_HEADER) @DefaultValue("0") final String lastEventId) {
-        final SseEventOutput output = sseContext.newOutput();
         
         new Thread() {
             public void run() {
                 try {
                     final Integer id = Integer.valueOf(lastEventId);
+                    final Builder builder = sse.newEventBuilder();
 
-                    output.write(createStatsEvent(sseContext.newEvent().name("book"), id + 1));
+                    sink.onNext(createStatsEvent(builder.name("book"), id + 1));
                     Thread.sleep(200);
-                    output.write(createStatsEvent(sseContext.newEvent().name("book"), id + 2));
+                    sink.onNext(createStatsEvent(builder.name("book"), id + 2));
                     Thread.sleep(200);
-                    output.write(createStatsEvent(sseContext.newEvent().name("book"), id + 3));
+                    sink.onNext(createStatsEvent(builder.name("book"), id + 3));
                     Thread.sleep(200);
-                    output.write(createStatsEvent(sseContext.newEvent().name("book"), id + 4));
+                    sink.onNext(createStatsEvent(builder.name("book"), id + 4));
                     Thread.sleep(200);
-                    output.close();
+                    sink.close();
                 } catch (final InterruptedException | IOException ex) {
                     LOG.error("Communication error", ex);
                 }
             }
         }.start();
-
-        return output;
     }
 
     @GET
     @Path("broadcast/sse")
     @Produces(MediaType.SERVER_SENT_EVENTS)
-    public SseEventOutput broadcast(@Context SseContext sseContext) {
-        final SseEventOutput output = sseContext.newOutput();
-        
-        if (broadcaster.get() == null) {
-            broadcaster.compareAndSet(null, sseContext.newBroadcaster());
-        }
-        
+    public void broadcast(@Context SseEventSink sink) {
         try {
-            broadcaster.get().register(output);
-            
-            broadcaster.get().broadcast(createStatsEvent(sseContext.newEvent().name("book"), 1000));
-            broadcaster.get().broadcast(createStatsEvent(sseContext.newEvent().name("book"), 2000));
-            
-            return output;
+            broadcaster.subscribe(sink);
         } finally {
             latch.countDown();
         }
@@ -125,15 +109,28 @@ public class BookStore {
     public void stop() {
         try {
             // Await a least 2 clients to be broadcasted over 
-            if (!latch.await(4, TimeUnit.SECONDS)) {
+            if (!latch.await(10, TimeUnit.SECONDS)) {
                 LOG.warn("Not enough clients have been connected, closing broadcaster anyway");
             }
+
+            final Builder builder = sse.newEventBuilder();
+            broadcaster.broadcast(createStatsEvent(builder.name("book"), 1000));
+            broadcaster.broadcast(createStatsEvent(builder.name("book"), 2000));
+            
         } catch (final InterruptedException ex) {
             LOG.error("Wait has been interrupted", ex);
         }
         
-        if (broadcaster.get() != null) {
-            broadcaster.get().close();
+        if (broadcaster != null) {
+            broadcaster.close();
         }
     }
+    
+    private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+        return builder
+            .id(Integer.toString(eventId))
+            .data(Book.class, new Book("New Book #" + eventId, eventId))
+            .mediaType(MediaType.APPLICATION_JSON_TYPE)
+            .build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/systests/rs-sse/src/test/resources/logback.xml
----------------------------------------------------------------------
diff --git a/systests/rs-sse/src/test/resources/logback.xml b/systests/rs-sse/src/test/resources/logback.xml
index 73cf1ea..430aa64 100644
--- a/systests/rs-sse/src/test/resources/logback.xml
+++ b/systests/rs-sse/src/test/resources/logback.xml
@@ -6,7 +6,7 @@
 		</encoder>
 	</appender>
 
-	<!--<root level="DEBUG">
+	<root level="INFO">
 		<appender-ref ref="STDOUT" />
-	</root>-->
+	</root>
 </configuration>


Mime
View raw message