cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject cxf git commit: CXF-7085: Introduce support for Server Sent Events (Client). Implemented reconnect support (with basic test cases added)
Date Sun, 09 Jul 2017 17:47:58 GMT
Repository: cxf
Updated Branches:
  refs/heads/master a055d7bd3 -> dffa0df6c


CXF-7085: Introduce support for Server Sent Events (Client). Implemented reconnect support
(with basic test cases added)


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/dffa0df6
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/dffa0df6
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/dffa0df6

Branch: refs/heads/master
Commit: dffa0df6ce47291364baecbfe78cb131d20be6b7
Parents: a055d7b
Author: reta <drreta@gmail.com>
Authored: Sun Jul 9 13:47:15 2017 -0400
Committer: reta <drreta@gmail.com>
Committed: Sun Jul 9 13:47:15 2017 -0400

----------------------------------------------------------------------
 .../jaxrs/sse/client/InboundSseEventImpl.java   |   2 +-
 .../sse/client/InboundSseEventProcessor.java    |  35 +++--
 .../jaxrs/sse/client/SseEventSourceImpl.java    | 150 ++++++++++++++-----
 .../cxf/systest/jaxrs/sse/AbstractSseTest.java  |  32 ++++
 4 files changed, 161 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/dffa0df6/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
index 4d4eab4..9479e1c 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
@@ -86,7 +86,7 @@ public class InboundSseEventImpl implements InboundSseEvent {
         }
 
         InboundSseEvent build(ClientProviderFactory factory, Message message) {
-            return new InboundSseEventImpl(id, name, comment, reconnectDelay.orElse(0), 
+            return new InboundSseEventImpl(id, name, comment, reconnectDelay.orElse(RECONNECT_NOT_SET),

                 reconnectDelay.isPresent(), data, factory, message);
         }
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/dffa0df6/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
index cf2346a..e5659f5 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
@@ -50,6 +50,7 @@ public class InboundSseEventProcessor {
     private final Endpoint endpoint;
     private final InboundSseEventListener listener;
     private final ExecutorService executor;
+    
     private volatile boolean closed = false;
     
     protected InboundSseEventProcessor(Endpoint endpoint, InboundSseEventListener listener)
{
@@ -59,6 +60,10 @@ public class InboundSseEventProcessor {
     }
     
     void run(final Response response) {
+        if (closed) {
+            throw new IllegalStateException("The SSE Event Processor is already closed");
+        }
+        
         final InputStream is = response.readEntity(InputStream.class);
         final ClientProviderFactory factory = ClientProviderFactory.getInstance(endpoint);
         
@@ -83,10 +88,7 @@ public class InboundSseEventProcessor {
                         } else {
                             final InboundSseEvent event = builder.build(factory, message);
                             builder = new InboundSseEventImpl.Builder(line.substring(EVENT.length()));
-                            
-                            if (listener != null) {
-                                listener.onNext(event);
-                            }
+                            listener.onNext(event);
                         }
                     } else if (builder != null) {
                         if (line.startsWith(ID)) {
@@ -101,18 +103,14 @@ public class InboundSseEventProcessor {
                     }
                 }
                 
-                if (listener != null) {
-                    if (builder != null) {
-                        listener.onNext(builder.build(factory, message));
-                    }
-
-                    // complete the stream
-                    listener.onComplete();
+                if (builder != null) {
+                    listener.onNext(builder.build(factory, message));
                 }
+
+                // complete the stream
+                listener.onComplete();
             } catch (final Exception ex) {
-                if (listener != null) {
-                    listener.onError(ex);
-                }
+                listener.onError(ex);
             }
 
             if (response != null) {
@@ -125,12 +123,13 @@ public class InboundSseEventProcessor {
     }
     
     boolean close(long timeout, TimeUnit unit) {
-        if (closed) {
-            return true;
-        }
-        
         try {
             closed = true;
+            
+            if (executor.isShutdown()) {
+                return true;
+            }
+            
             executor.shutdown();
             return executor.awaitTermination(timeout, unit);
         } catch (final InterruptedException ex) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/dffa0df6/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
index a8385a9..22f365e 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
@@ -20,6 +20,8 @@ package org.apache.cxf.jaxrs.sse.client;
 
 import java.util.Collection;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -45,12 +47,46 @@ public class SseEventSourceImpl implements SseEventSource {
     private static final Logger LOG = LogUtils.getL7dLogger(SseEventSourceImpl.class);
     
     private final WebTarget target;
-    private final long delay;
-    private final TimeUnit unit;
     private final Collection<InboundSseEventListener> listeners = new CopyOnWriteArrayList<>();
+    private final AtomicReference<SseSourceState> state = new AtomicReference<>(SseSourceState.CLOSED);
     
     // It may happen that open() and close() could be called on separate threads
+    private volatile ScheduledExecutorService executor;
     private volatile InboundSseEventProcessor processor; 
+    private volatile TimeUnit unit;
+    private volatile long delay;
+
+    private class InboundSseEventListenerDelegate implements InboundSseEventListener {
+        private String lastEventId;
+        
+        @Override
+        public void onNext(InboundSseEvent event) {
+            lastEventId = event.getId();
+            listeners.forEach(listener -> listener.onNext(event));
+            
+            // Reconnect delay is set in milliseconds
+            if (event.isReconnectDelaySet()) {
+                unit = TimeUnit.MILLISECONDS;
+                delay = event.getReconnectDelay();
+            }
+        }
+
+        @Override
+        public void onError(Throwable ex) {
+            listeners.forEach(listener -> listener.onError(ex));
+            if (delay > 0 && unit != null) {
+                scheduleReconnect(delay, unit, lastEventId);
+            }
+        }
+
+        @Override
+        public void onComplete() {
+            listeners.forEach(InboundSseEventListener::onComplete);
+            if (delay > 0 && unit != null) {
+                scheduleReconnect(delay, unit, lastEventId);
+            }
+        }
+    }
     
     private class InboundSseEventListenerImpl implements InboundSseEventListener {
         private final Consumer<InboundSseEvent> onEvent;
@@ -86,13 +122,13 @@ public class SseEventSourceImpl implements SseEventSource {
             onComplete.run();
         }
     }
-
-    private final AtomicReference<SseSourceState> state = 
-        new AtomicReference<>(SseSourceState.CLOSED);
     
+    /**
+     * https://www.w3.org/TR/2012/WD-eventsource-20120426/#dom-eventsource-connecting
+     */
     private enum SseSourceState {
-        OPENING,
-        OPENED,
+        CONNECTING,
+        OPEN,
         CLOSED
     }
     
@@ -119,14 +155,23 @@ public class SseEventSourceImpl implements SseEventSource {
 
     @Override
     public void open() {
-        if (!state.compareAndSet(SseSourceState.CLOSED, SseSourceState.OPENING)) {
+        if (!state.compareAndSet(SseSourceState.CLOSED, SseSourceState.CONNECTING)) {
             throw new IllegalStateException("The SseEventSource is already in " + state.get()
+ " state");
         }
 
-        Response response = null; 
+        // Create the executor for scheduling the reconnect tasks 
+        executor = Executors.newSingleThreadScheduledExecutor();
+        
+        final Object lastEventId = target.getConfiguration().getProperty(HttpHeaders.LAST_EVENT_ID_HEADER);
+        connect(lastEventId != null ? lastEventId.toString() : null);
+    }
+
+    private void connect(String lastEventId) {
+        final InboundSseEventListenerDelegate delegate = new InboundSseEventListenerDelegate();
+        Response response = null;
+        
         try {
             final MultivaluedMap<String, Object> headers = new MultivaluedHashMap<>();
-            final Object lastEventId = target.getConfiguration().getProperty(HttpHeaders.LAST_EVENT_ID_HEADER);
             if (lastEventId != null) {
                 headers.putSingle(HttpHeaders.LAST_EVENT_ID_HEADER, lastEventId);
             }
@@ -135,49 +180,41 @@ public class SseEventSourceImpl implements SseEventSource {
                 .request(MediaType.SERVER_SENT_EVENTS)
                 .headers(headers)
                 .get();
+            
+            // A client can be told to stop reconnecting using the HTTP 204 No Content 
+            // response code. In this case, we should stop here.
+            if (response.getStatus() == 204) {
+                LOG.fine("SSE endpoint " + target.getUri() + " returns no data, disconnecting");
+                state.compareAndSet(SseSourceState.CONNECTING, SseSourceState.CLOSED);
+                response.close();
+                return;
+            }
 
             final Endpoint endpoint = WebClient.getConfig(target).getEndpoint();
-            processor = new InboundSseEventProcessor(endpoint,
-                new InboundSseEventListener() {
-                    @Override
-                    public void onNext(InboundSseEvent event) {
-                        listeners.forEach(listener -> listener.onNext(event));
-                    }
-        
-                    @Override
-                    public void onError(Throwable ex) {
-                        listeners.forEach(listener -> listener.onError(ex));
-                        if (delay > 0 && unit != null) {
-                            // TODO: Schedule reconnect here
-                        }
-                    }
-        
-                    @Override
-                    public void onComplete() {
-                        listeners.forEach(InboundSseEventListener::onComplete);
-                    }
-                }
-            );
-
+            processor = new InboundSseEventProcessor(endpoint, delegate);
             processor.run(response);
-            state.compareAndSet(SseSourceState.OPENING, SseSourceState.OPENED);
             
+            state.compareAndSet(SseSourceState.CONNECTING, SseSourceState.OPEN);
             LOG.fine("Opened SSE connection to " + target.getUri());
         } catch (final Exception ex) {
-            state.compareAndSet(SseSourceState.OPENING, SseSourceState.CLOSED);
-            LOG.fine("Failed to open SSE connection to " + target.getUri() + ". " + ex.getMessage());
+            if (processor != null) {
+                processor.close(1, TimeUnit.SECONDS);
+                processor = null;
+            }
             
             if (response != null) {
                 response.close();
             }
-            
-            listeners.forEach(listener -> listener.onError(ex));
+
+            // We don't change the state here as the reconnection will be scheduled (if configured)
+            LOG.fine("Failed to open SSE connection to " + target.getUri() + ". " + ex.getMessage());
+            delegate.onError(ex);
         }
     }
 
     @Override
     public boolean isOpen() {
-        return state.get() == SseSourceState.OPENED;
+        return state.get() == SseSourceState.OPEN;
     }
 
     @Override
@@ -186,9 +223,15 @@ public class SseEventSourceImpl implements SseEventSource {
             return true;
         }
         
-        if (!state.compareAndSet(SseSourceState.OPENED, SseSourceState.CLOSED)) {
+        if (state.compareAndSet(SseSourceState.CONNECTING, SseSourceState.CLOSED)) {
+            LOG.fine("The SseEventSource was not connected, closing anyway");
+        } else if (!state.compareAndSet(SseSourceState.OPEN, SseSourceState.CLOSED)) {
             throw new IllegalStateException("The SseEventSource is not opened, but in " +
state.get() + " state");
         }
+        
+        if (executor != null) {
+            executor.shutdown();
+        }
 
         // Should never happen
         if (processor == null) {
@@ -197,4 +240,33 @@ public class SseEventSourceImpl implements SseEventSource {
         
         return processor.close(timeout, unit); 
     }
+    
+    private void scheduleReconnect(long delay, TimeUnit unit, String lastEventId) {
+        // If delay == RECONNECT_NOT_SET, no reconnection attempt should be performed
+        if (delay <= 0 || executor == null) {
+            return;
+        }
+        
+        // If the event source is already closed, do nothing
+        if (state.get() == SseSourceState.CLOSED) {
+            return;
+        }
+        
+        // If the connection was still on connecting state, just try to reconnect
+        if (state.get() != SseSourceState.CONNECTING && !state.compareAndSet(SseSourceState.OPEN,
SseSourceState.CONNECTING)) {
+            throw new IllegalStateException("The SseEventSource is not opened, but in " +
state.get() + 
+                " state, unable to reconnect");
+        }
+                
+        executor.schedule(() -> {
+            // If we are still in connecting state (not closed/open), let's try to reconnect
+            if (state.get() == SseSourceState.CONNECTING) {
+                LOG.fine("Reestablishing SSE connection to " + target.getUri());
+                connect(lastEventId);
+            }
+        }, delay, unit);
+        
+        LOG.fine("The reconnection attempt to " + target.getUri() + " is scheduled in " +

+            unit.toMillis(delay) + "ms");
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/dffa0df6/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
----------------------------------------------------------------------
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index bf48362..bfd5e26 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -33,6 +33,7 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.sse.InboundSseEvent;
 import javax.ws.rs.sse.SseEventSource;
+import javax.ws.rs.sse.SseEventSource.Builder;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 
@@ -104,6 +105,37 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
     }
     
     @Test
+    public void testBooksStreamIsReconnectedFromInboundSseEvents() throws InterruptedException
{
+        final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0");
+        final Collection<Book> books = new ArrayList<>();
+        
+        final Builder builder = SseEventSource.target(target).reconnectingEvery(1, TimeUnit.SECONDS);
+        try (final SseEventSource eventSource = builder.build()) {
+            eventSource.register(collect(books), System.out::println);
+            eventSource.open();
+            // Give the SSE stream some time to collect all events
+            awaitEvents(5000, books, 12);
+        }
+
+        assertThat(books, 
+            hasItems(
+                new Book("New Book #1", 1), 
+                new Book("New Book #2", 2), 
+                new Book("New Book #3", 3), 
+                new Book("New Book #4", 4),
+                new Book("New Book #5", 5), 
+                new Book("New Book #6", 6), 
+                new Book("New Book #7", 7), 
+                new Book("New Book #8", 8),
+                new Book("New Book #9", 9), 
+                new Book("New Book #10", 10), 
+                new Book("New Book #11", 11), 
+                new Book("New Book #12", 12)
+            )
+        );
+    }
+    
+    @Test
     public void testBooksStreamIsBroadcasted() throws Exception {
         final Collection<Future<Response>> results = new ArrayList<>();
 


Mime
View raw message