cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject cxf git commit: CXF-7085: Introduce support for Server Sent Events (Client). Initial implementation.
Date Fri, 23 Jun 2017 21:37:43 GMT
Repository: cxf
Updated Branches:
  refs/heads/master cf11aa95e -> feba80454


CXF-7085: Introduce support for Server Sent Events (Client). Initial implementation.


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/feba8045
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/feba8045
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/feba8045

Branch: refs/heads/master
Commit: feba8045486cd910b238065cbb3ee4a741198d76
Parents: cf11aa9
Author: reta <drreta@gmail.com>
Authored: Fri Jun 23 17:37:23 2017 -0400
Committer: reta <drreta@gmail.com>
Committed: Fri Jun 23 17:37:23 2017 -0400

----------------------------------------------------------------------
 parent/pom.xml                                  |   2 +-
 rt/rs/sse/pom.xml                               |   7 +-
 .../jaxrs/sse/client/InboundSseEventImpl.java   | 184 ++++++++++++++++++
 .../sse/client/InboundSseEventListener.java     |  31 +++
 .../sse/client/InboundSseEventProcessor.java    | 140 ++++++++++++++
 .../sse/client/SseEventSourceBuilderImpl.java   |  51 +++++
 .../jaxrs/sse/client/SseEventSourceImpl.java    | 190 +++++++++++++++++++
 .../javax.ws.rs.sse.SseEventSource$Builder      |   1 +
 .../systest/jaxrs/sse/AbstractSseBaseTest.java  |  23 +++
 .../cxf/systest/jaxrs/sse/AbstractSseTest.java  |  29 +++
 10 files changed, 656 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index bd95eab..e26fee6 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -111,7 +111,7 @@
         <cxf.geronimo.transaction.version>3.1.4</cxf.geronimo.transaction.version>
         <cxf.jasypt.bundle.version>1.9.0_1</cxf.jasypt.bundle.version>
         <cxf.javassist.version>3.19.0-GA</cxf.javassist.version>
-        <cxf.javax.ws.rs.version>2.1-m08</cxf.javax.ws.rs.version>
+        <cxf.javax.ws.rs.version>2.1-m09</cxf.javax.ws.rs.version>
         <cxf.jaxb.version>2.2.11</cxf.jaxb.version>
         <cxf.jaxb.impl.version>${cxf.jaxb.version}</cxf.jaxb.impl.version>
         <cxf.jaxb.core.version>${cxf.jaxb.version}</cxf.jaxb.core.version>

http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/rt/rs/sse/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/sse/pom.xml b/rt/rs/sse/pom.xml
index d4c9f17..745a493 100644
--- a/rt/rs/sse/pom.xml
+++ b/rt/rs/sse/pom.xml
@@ -43,6 +43,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-client</artifactId>
+            <version>${project.version}</version>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-integration-cdi</artifactId>
             <version>${project.version}</version>
             <optional>true</optional>
@@ -60,7 +66,6 @@
         <dependency>
             <groupId>org.atmosphere</groupId>
             <artifactId>atmosphere-runtime</artifactId>
-            <version>${cxf.atmosphere.version}</version>
         </dependency>
     </dependencies>
     <build>

http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
new file mode 100644
index 0000000..4d4eab4
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.client;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+import java.util.logging.Logger;
+
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.sse.InboundSseEvent;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.jaxrs.client.ClientProviderFactory;
+import org.apache.cxf.message.Message;
+
+public class InboundSseEventImpl implements InboundSseEvent {
+    private final String id;
+    private final String name;
+    private final String comment;
+    private final long reconnectDelay;
+    private final boolean reconnectDelaySet;
+    private final String data;
+    private final ClientProviderFactory factory;
+    private final Message message;
+    
+    static class Builder {
+        private static final Logger LOG = LogUtils.getL7dLogger(Builder.class);
+
+        private final String name;
+        private String id;
+        private String comment;
+        private OptionalLong reconnectDelay = OptionalLong.empty();
+        private String data;
+
+        Builder(String name) {
+            this.name = name;
+        }
+
+        Builder id(String id) {
+            this.id = id;
+            return this;
+        }
+
+        Builder comment(String comment) {
+            this.comment = comment;
+            return this;
+        }
+
+        Builder reconnectDelay(String reconnectDelay) {
+            try {
+                this.reconnectDelay = OptionalLong.of(Long.parseLong(reconnectDelay));
+            } catch (final NumberFormatException ex) {
+                LOG.warning("Unable to parse reconnectDelay, long number expected: " + ex.getMessage());
+            }
+            
+            return this;
+        }
+        
+        Builder data(String data) {
+            this.data = data;
+            return this;
+        }
+
+        InboundSseEvent build(ClientProviderFactory factory, Message message) {
+            return new InboundSseEventImpl(id, name, comment, reconnectDelay.orElse(0), 
+                reconnectDelay.isPresent(), data, factory, message);
+        }
+    }
+    
+    InboundSseEventImpl(String id, String name, String comment, long reconnectDelay, boolean
reconnectDelaySet, 
+            String data, ClientProviderFactory factory, Message message) {
+        this.id = id;
+        this.name = name;
+        this.comment = comment;
+        this.reconnectDelay = reconnectDelay;
+        this.reconnectDelaySet = reconnectDelaySet;
+        this.data = data;
+        this.factory = factory;
+        this.message = message;
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public String getComment() {
+        return comment;
+    }
+
+    @Override
+    public long getReconnectDelay() {
+        return reconnectDelay;
+    }
+
+    @Override
+    public boolean isReconnectDelaySet() {
+        return reconnectDelaySet;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return false;
+    }
+
+    @Override
+    public String readData() {
+        return data;
+    }
+
+    @Override
+    public <T> T readData(Class<T> type) {
+        return read(type, type, MediaType.WILDCARD_TYPE);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T readData(GenericType<T> type) {
+        return read((Class<T>)type.getRawType(), type.getType(), MediaType.WILDCARD_TYPE);
+    }
+
+    @Override
+    public <T> T readData(Class<T> messageType, MediaType mediaType) {
+        return read(messageType, messageType, mediaType);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T readData(GenericType<T> type, MediaType mediaType) {
+        return read((Class<T>)type.getRawType(), type.getType(), mediaType);
+    }
+    
+    private <T> T read(Class<T> messageType, Type type, MediaType mediaType)
{
+        if (data == null) {
+            return null;
+        }
+
+        final Annotation[] annotations = new Annotation[0];
+        final MultivaluedMap<String, String> headers = new MultivaluedHashMap<>(0);
+        
+        final MessageBodyReader<T> reader = factory.createMessageBodyReader(messageType,
type, 
+            annotations, mediaType, message);
+            
+        if (reader == null) {
+            throw new RuntimeException("No suitable message body reader for class: " + messageType.getName());
+        }
+
+        try (final ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)))
{
+            return reader.readFrom(messageType, type, annotations, mediaType, headers, is);
+        } catch (final IOException ex) {
+            throw new RuntimeException("Unable to read data of type " + messageType.getName(),
ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventListener.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventListener.java
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventListener.java
new file mode 100644
index 0000000..b61d51a
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventListener.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.sse.client;
+
+import javax.ws.rs.sse.InboundSseEvent;
+
+/**
+ * Flow/RxJava like listener for processing SSE events 
+ */
+interface InboundSseEventListener {
+    void onNext(InboundSseEvent event);
+    void onError(Throwable ex);
+    void onComplete();
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
new file mode 100644
index 0000000..cf2346a
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.client;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.sse.InboundSseEvent;
+
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.jaxrs.client.ClientProviderFactory;
+import org.apache.cxf.jaxrs.impl.ResponseImpl;
+import org.apache.cxf.message.Message;
+
+public class InboundSseEventProcessor {
+    public static final String SERVER_SENT_EVENTS = "text/event-stream";
+    public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS);
+
+    private static final String COMMENT = ": ";
+    private static final String EVENT = "    ";
+    private static final String ID = "id: ";
+    private static final String RETRY = "retry: ";
+    private static final String DATA = "data: ";
+
+    private final Endpoint endpoint;
+    private final InboundSseEventListener listener;
+    private final ExecutorService executor;
+    private volatile boolean closed = false;
+    
+    protected InboundSseEventProcessor(Endpoint endpoint, InboundSseEventListener listener)
{
+        this.endpoint = endpoint;
+        this.listener = listener;
+        this.executor = Executors.newSingleThreadExecutor();
+    }
+    
+    void run(final Response response) {
+        final InputStream is = response.readEntity(InputStream.class);
+        final ClientProviderFactory factory = ClientProviderFactory.getInstance(endpoint);
+        
+        Message message = null;
+        if (response instanceof ResponseImpl) {
+            message = ((ResponseImpl)response).getOutMessage();
+        }
+        
+        executor.submit(process(response, is, factory, message));
+    }
+    
+    private Callable<?> process(Response response, InputStream is, ClientProviderFactory
factory, Message message) {
+        return () -> {
+            try (final BufferedReader reader = new BufferedReader(new InputStreamReader(is,
StandardCharsets.UTF_8))) {
+                String line = null;
+                InboundSseEventImpl.Builder builder = null;
+
+                while ((line = reader.readLine()) != null && !Thread.interrupted()
&& !closed) {
+                    if (!StringUtils.isEmpty(line) && line.startsWith(EVENT)) {
+                        if (builder == null) {
+                            builder = new InboundSseEventImpl.Builder(line.substring(EVENT.length()));
+                        } else {
+                            final InboundSseEvent event = builder.build(factory, message);
+                            builder = new InboundSseEventImpl.Builder(line.substring(EVENT.length()));
+                            
+                            if (listener != null) {
+                                listener.onNext(event);
+                            }
+                        }
+                    } else if (builder != null) {
+                        if (line.startsWith(ID)) {
+                            builder.id(line.substring(ID.length()));
+                        } else if (line.startsWith(COMMENT)) {
+                            builder.id(line.substring(COMMENT.length()));
+                        } else if (line.startsWith(RETRY)) {
+                            builder.reconnectDelay(line.substring(RETRY.length()));
+                        } else if (line.startsWith(DATA)) {
+                            builder.data(line.substring(DATA.length()));
+                        }
+                    }
+                }
+                
+                if (listener != null) {
+                    if (builder != null) {
+                        listener.onNext(builder.build(factory, message));
+                    }
+
+                    // complete the stream
+                    listener.onComplete();
+                }
+            } catch (final Exception ex) {
+                if (listener != null) {
+                    listener.onError(ex);
+                }
+            }
+
+            if (response != null) {
+                response.close();
+            }
+
+            closed = true;
+            return null;
+        };
+    }
+    
+    boolean close(long timeout, TimeUnit unit) {
+        if (closed) {
+            return true;
+        }
+        
+        try {
+            closed = true;
+            executor.shutdown();
+            return executor.awaitTermination(timeout, unit);
+        } catch (final InterruptedException ex) {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceBuilderImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceBuilderImpl.java
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceBuilderImpl.java
new file mode 100644
index 0000000..b3fa0dc
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceBuilderImpl.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.client;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.sse.SseEventSource;
+import javax.ws.rs.sse.SseEventSource.Builder;
+
+public class SseEventSourceBuilderImpl extends SseEventSource.Builder {
+    private static final long DEFAULT_RECONNECT_DELAY_IN_MS = 500;
+    
+    private long delay = DEFAULT_RECONNECT_DELAY_IN_MS;
+    private TimeUnit unit = TimeUnit.MILLISECONDS;
+    private WebTarget target;
+    
+    @Override
+    public SseEventSource build() {
+        return new SseEventSourceImpl(target, delay, unit);
+    }
+
+    @Override
+    public Builder reconnectingEvery(long delay, TimeUnit unit) {
+        this.delay = delay;
+        this.unit = unit;
+        return this;
+    }
+
+    @Override
+    protected Builder target(WebTarget target) {
+        this.target = target;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
new file mode 100644
index 0000000..e9e5c1d
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.client;
+
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.logging.Logger;
+
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.sse.InboundSseEvent;
+import javax.ws.rs.sse.SseEventSource;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.jaxrs.client.WebClient;
+
+/**
+ * SSE Event Source implementation 
+ */
+public class SseEventSourceImpl implements SseEventSource {
+    private static final Logger LOG = LogUtils.getL7dLogger(SseEventSourceImpl.class);
+    
+    private final WebTarget target;
+    private final long delay;
+    private final TimeUnit unit;
+    private final Collection<InboundSseEventListener> listeners = new CopyOnWriteArrayList<>();
+    
+    // It may happen that open() and close() could be called on separate threads
+    private volatile InboundSseEventProcessor processor; 
+    
+    private class InboundSseEventListenerImpl implements InboundSseEventListener {
+        private final Consumer<InboundSseEvent> onEvent;
+        private final Consumer<Throwable> onError;
+        private final Runnable onComplete;
+        
+        InboundSseEventListenerImpl(Consumer<InboundSseEvent> e) {
+            this(e, ex -> {}, () -> {});
+        }
+        
+        InboundSseEventListenerImpl(Consumer<InboundSseEvent> e, Consumer<Throwable>
t) {
+            this(e, t, () -> {});
+        }
+
+        InboundSseEventListenerImpl(Consumer<InboundSseEvent> e, Consumer<Throwable>
t, Runnable c) {
+            this.onEvent = e;
+            this.onError = t;
+            this.onComplete = c;
+        }
+
+        @Override
+        public void onNext(InboundSseEvent event) {
+            onEvent.accept(event);
+        }
+
+        @Override
+        public void onError(Throwable ex) {
+            onError.accept(ex);
+        }
+
+        @Override
+        public void onComplete() {
+            onComplete.run();
+        }
+    }
+
+    private final AtomicReference<SseSourceState> state = 
+        new AtomicReference<>(SseSourceState.CLOSED);
+    
+    private enum SseSourceState {
+        OPENING,
+        OPENED,
+        CLOSED
+    }
+    
+    SseEventSourceImpl(WebTarget target, long delay, TimeUnit unit) {
+        this.target = target;
+        this.delay = delay;
+        this.unit = unit;
+    }
+
+    @Override
+    public void register(Consumer<InboundSseEvent> onEvent) {
+        listeners.add(new InboundSseEventListenerImpl(onEvent));
+    }
+
+    @Override
+    public void register(Consumer<InboundSseEvent> onEvent, Consumer<Throwable>
onError) {
+        listeners.add(new InboundSseEventListenerImpl(onEvent, onError));
+    }
+
+    @Override
+    public void register(Consumer<InboundSseEvent> onEvent, Consumer<Throwable>
onError, Runnable onComplete) {
+        listeners.add(new InboundSseEventListenerImpl(onEvent, onError, onComplete));
+    }
+
+    @Override
+    public void open() {
+        if (!state.compareAndSet(SseSourceState.CLOSED, SseSourceState.OPENING)) {
+            throw new IllegalStateException("The SseEventSource is already in " + state.get()
+ " state");
+        }
+
+        Response response = null; 
+        try {
+            response = target
+                .request(MediaType.SERVER_SENT_EVENTS)
+                .get();
+
+            final Endpoint endpoint = WebClient.getConfig(target).getEndpoint();
+            processor = new InboundSseEventProcessor(endpoint,
+                new InboundSseEventListener() {
+                    @Override
+                    public void onNext(InboundSseEvent event) {
+                        listeners.forEach(listener -> listener.onNext(event));
+                    }
+        
+                    @Override
+                    public void onError(Throwable ex) {
+                        listeners.forEach(listener -> listener.onError(ex));
+                        if (delay > 0 && unit != null) {
+                            // TODO: Schedule reconnect here
+                        }
+                    }
+        
+                    @Override
+                    public void onComplete() {
+                        listeners.forEach(InboundSseEventListener::onComplete);
+                    }
+                }
+            );
+
+            processor.run(response);
+            state.compareAndSet(SseSourceState.OPENING, SseSourceState.OPENED);
+            
+            LOG.fine("Opened SSE connection to " + target.getUri());
+        } catch (final Exception ex) {
+            state.compareAndSet(SseSourceState.OPENING, SseSourceState.CLOSED);
+            LOG.fine("Failed to open SSE connection to " + target.getUri() + ". " + ex.getMessage());
+            
+            if (response != null) {
+                response.close();
+            }
+            
+            listeners.forEach(listener -> listener.onError(ex));
+        }
+    }
+
+    @Override
+    public boolean isOpen() {
+        return state.get() == SseSourceState.OPENED;
+    }
+
+    @Override
+    public boolean close(long timeout, TimeUnit unit) {
+        if (state.get() == SseSourceState.CLOSED) {
+            return true;
+        }
+        
+        if (!state.compareAndSet(SseSourceState.OPENED, SseSourceState.CLOSED)) {
+            throw new IllegalStateException("The SseEventSource is not opened, but in " +
state.get() + " state");
+        }
+
+        // Should never happen
+        if (processor == null) {
+            return true;
+        }
+        
+        return processor.close(timeout, unit); 
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/rt/rs/sse/src/main/resources/META-INF/services/javax.ws.rs.sse.SseEventSource$Builder
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/resources/META-INF/services/javax.ws.rs.sse.SseEventSource$Builder
b/rt/rs/sse/src/main/resources/META-INF/services/javax.ws.rs.sse.SseEventSource$Builder
new file mode 100644
index 0000000..ad2d85a
--- /dev/null
+++ b/rt/rs/sse/src/main/resources/META-INF/services/javax.ws.rs.sse.SseEventSource$Builder
@@ -0,0 +1 @@
+org.apache.cxf.jaxrs.sse.client.SseEventSourceBuilderImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseBaseTest.java
----------------------------------------------------------------------
diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseBaseTest.java
b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseBaseTest.java
index aa80351..87425b5 100644
--- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseBaseTest.java
+++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseBaseTest.java
@@ -19,8 +19,11 @@
 package org.apache.cxf.systest.jaxrs.sse;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -52,5 +55,25 @@ public abstract class AbstractSseBaseTest extends AbstractBusClientServerTestBas
         return createWebClient(url, MediaType.SERVER_SENT_EVENTS);
     }
 
+    protected WebTarget createWebTarget(final String url) {
+        return ClientBuilder
+            .newClient()
+            .property("http.receive.timeout", 8000)
+            .register(JacksonJsonProvider.class)
+            .target("http://localhost:" + getPort() + url);
+    }
+    
+    protected void awaitEvents(long timeout, final Collection<?> events, int size)
throws InterruptedException {
+        final long sleep = timeout / 10;
+        
+        for (int i = 0; i < timeout; i += sleep) {
+            if (events.size() == size) {
+                break;
+            } else {
+                Thread.sleep(sleep);
+            }
+        }
+    }
+
     protected abstract int getPort();
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
----------------------------------------------------------------------
diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index 0e2e723..98c530b 100644
--- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -18,15 +18,24 @@
  */
 package org.apache.cxf.systest.jaxrs.sse;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Consumer;
+
+import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.sse.InboundSseEvent;
+import javax.ws.rs.sse.SseEventSource;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.hasItems;
 
 public abstract class AbstractSseTest extends AbstractSseBaseTest {
     @Test
@@ -51,4 +60,24 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
 
         r.close();
     }
+    
+    @Test
+    public void testBooksStreamIsReturnedFromInboundSseEvents() throws JsonProcessingException,
InterruptedException {
+        final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0");
+        final Collection<Book> books = new ArrayList<>();
+        
+        try (final SseEventSource eventSource = SseEventSource.target(target).build()) {
+            eventSource.register(collect(books), System.out::println);
+            eventSource.open();
+            // Give the SSE stream some time to collect all events
+            awaitEvents(3000, books, 4);
+        }
+
+        assertThat(books, hasItems(new Book("New Book #1", 1), new Book("New Book #2", 2),

+            new Book("New Book #3", 3), new Book("New Book #4", 4)));
+    }
+
+    private static Consumer<InboundSseEvent> collect(final Collection< Book >
books) {
+        return event -> books.add(event.readData(Book.class, MediaType.APPLICATION_JSON_TYPE));
+    }
 }


Mime
View raw message