cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From serg...@apache.org
Subject cxf git commit: [CXF-7487] Basic RxJava2 Observable support, Flowable to follow later
Date Mon, 28 Aug 2017 16:36:51 GMT
Repository: cxf
Updated Branches:
  refs/heads/master a55635031 -> 0c5d4032d


[CXF-7487] Basic RxJava2 Observable support, Flowable to follow later


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0c5d4032
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0c5d4032
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0c5d4032

Branch: refs/heads/master
Commit: 0c5d4032d3ba7837e662abc192a27ae2814b21c9
Parents: a556350
Author: Sergey Beryozkin <sberyozkin@gmail.com>
Authored: Mon Aug 28 17:36:37 2017 +0100
Committer: Sergey Beryozkin <sberyozkin@gmail.com>
Committed: Mon Aug 28 17:36:37 2017 +0100

----------------------------------------------------------------------
 parent/pom.xml                                  |  10 +-
 rt/rs/extensions/rx/pom.xml                     |   6 +
 .../jaxrs/rx2/client/ObservableRxInvoker.java   | 107 +++++++++++
 .../rx2/client/ObservableRxInvokerImpl.java     | 180 +++++++++++++++++++
 .../rx2/client/ObservableRxInvokerProvider.java |  45 +++++
 .../cxf/jaxrs/rx2/server/ObservableInvoker.java |  43 +++++
 systests/jaxrs/pom.xml                          |   7 +-
 .../jaxrs/reactive/JAXRSObservableTest.java     | 141 ---------------
 .../jaxrs/reactive/JAXRSRxJava2Test.java        |  68 +++++++
 .../systest/jaxrs/reactive/JAXRSRxJavaTest.java | 141 +++++++++++++++
 .../jaxrs/reactive/ObservableServer.java        |  79 --------
 .../jaxrs/reactive/ObservableService.java       | 122 -------------
 .../reactive/RxJava2ObservableService.java      |  43 +++++
 .../systest/jaxrs/reactive/RxJava2Server.java   |  73 ++++++++
 .../jaxrs/reactive/RxJavaObservableService.java | 122 +++++++++++++
 .../systest/jaxrs/reactive/RxJavaServer.java    |  79 ++++++++
 16 files changed, 921 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 424e28a..8d8e77b 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -101,7 +101,8 @@
         <cxf.log4j.version>1.2.17</cxf.log4j.version>
         <cxf.lucene.version>4.9.0</cxf.lucene.version>
         <cxf.mina.version>2.0.14</cxf.mina.version>
-        <cxf.rx.java.version>1.2.7</cxf.rx.java.version>
+        <cxf.rxjava.version>1.3.0</cxf.rxjava.version>
+        <cxf.rxjava2.version>2.1.3</cxf.rxjava2.version>
         <cxf.javax.annotation-api.version>1.3</cxf.javax.annotation-api.version>
         <cxf.jcache.version>1.0.0</cxf.jcache.version>
         <cxf.geronimo.jms.version>1.1.1</cxf.geronimo.jms.version>
@@ -819,7 +820,12 @@
             <dependency>
                 <groupId>io.reactivex</groupId>
                 <artifactId>rxjava</artifactId>
-                <version>${cxf.rx.java.version}</version>
+                <version>${cxf.rxjava.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.reactivex.rxjava2</groupId>
+                <artifactId>rxjava</artifactId>
+                <version>${cxf.rxjava2.version}</version>
             </dependency>
             <dependency>
                 <groupId>io.netty</groupId>

http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/rt/rs/extensions/rx/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/rx/pom.xml
index 13bf166..d6b0360 100644
--- a/rt/rs/extensions/rx/pom.xml
+++ b/rt/rs/extensions/rx/pom.xml
@@ -48,6 +48,12 @@
           <optional>true</optional>
         </dependency>
         <dependency>
+          <groupId>io.reactivex.rxjava2</groupId>
+          <artifactId>rxjava</artifactId>
+          <scope>provided</scope>
+          <optional>true</optional>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java
new file mode 100644
index 0000000..41d1ec9
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.client;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.RxInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import io.reactivex.Observable;
+
+
+@SuppressWarnings("rawtypes")
+public interface ObservableRxInvoker extends RxInvoker<Observable> {
+
+    @Override
+    Observable<Response> get();
+
+    @Override
+    <T> Observable<T> get(Class<T> responseType);
+
+    @Override
+    <T> Observable<T> get(GenericType<T> responseType);
+
+    @Override
+    Observable<Response> put(Entity<?> entity);
+
+    @Override
+    <T> Observable<T> put(Entity<?> entity, Class<T> clazz);
+
+    @Override
+    <T> Observable<T> put(Entity<?> entity, GenericType<T> type);
+
+    @Override
+    Observable<Response> post(Entity<?> entity);
+
+    @Override
+    <T> Observable<T> post(Entity<?> entity, Class<T> clazz);
+
+    @Override
+    <T> Observable<T> post(Entity<?> entity, GenericType<T> type);
+
+    @Override
+    Observable<Response> delete();
+
+    @Override
+    <T> Observable<T> delete(Class<T> responseType);
+
+    @Override
+    <T> Observable<T> delete(GenericType<T> responseType);
+
+    @Override
+    Observable<Response> head();
+
+    @Override
+    Observable<Response> options();
+
+    @Override
+    <T> Observable<T> options(Class<T> responseType);
+
+    @Override
+    <T> Observable<T> options(GenericType<T> responseType);
+
+    @Override
+    Observable<Response> trace();
+
+    @Override
+    <T> Observable<T> trace(Class<T> responseType);
+
+    @Override
+    <T> Observable<T> trace(GenericType<T> responseType);
+
+    @Override
+    Observable<Response> method(String name);
+
+    @Override
+    <T> Observable<T> method(String name, Class<T> responseType);
+
+    @Override
+    <T> Observable<T> method(String name, GenericType<T> responseType);
+
+    @Override
+    Observable<Response> method(String name, Entity<?> entity);
+
+    @Override
+    <T> Observable<T> method(String name, Entity<?> entity, Class<T> responseType);
+
+    @Override
+    <T> Observable<T> method(String name, Entity<?> entity, GenericType<T> responseType);
+}
+

http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
new file mode 100644
index 0000000..2c1f966
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.client;
+
+import java.util.concurrent.ExecutorService;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+
+import io.reactivex.Observable;
+import io.reactivex.Scheduler;
+import io.reactivex.schedulers.Schedulers;
+
+
+public class ObservableRxInvokerImpl implements ObservableRxInvoker {
+    private Scheduler sc;
+    private WebClient wc;
+    public ObservableRxInvokerImpl(WebClient wc, ExecutorService ex) {
+        this.wc = wc;
+        this.sc = ex == null ? null : Schedulers.from(ex);
+    }
+
+    @Override
+    public Observable<Response> get() {
+        return get(Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> get(Class<T> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> get(GenericType<T> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public Observable<Response> put(Entity<?> entity) {
+        return put(entity, Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> put(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> put(Entity<?> entity, GenericType<T> responseType) {
+        return method(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public Observable<Response> post(Entity<?> entity) {
+        return post(entity, Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> post(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> post(Entity<?> entity, GenericType<T> responseType) {
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public Observable<Response> delete() {
+        return delete(Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> delete(Class<T> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> delete(GenericType<T> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public Observable<Response> head() {
+        return method(HttpMethod.HEAD);
+    }
+
+    @Override
+    public Observable<Response> options() {
+        return options(Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> options(Class<T> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> options(GenericType<T> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public Observable<Response> trace() {
+        return trace(Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> trace(Class<T> responseType) {
+        return method("TRACE", responseType);
+    }
+
+    @Override
+    public <T> Observable<T> trace(GenericType<T> responseType) {
+        return method("TRACE", responseType);
+    }
+
+    @Override
+    public Observable<Response> method(String name) {
+        return method(name, Response.class);
+    }
+
+    @Override
+    public Observable<Response> method(String name, Entity<?> entity) {
+        return method(name, entity, Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> method(String name, Entity<?> entity, Class<T> responseType) {
+        if (sc == null) {
+            return Observable.fromFuture(wc.async().method(name, entity, responseType));
+        }
+        return Observable.fromFuture(wc.async().method(name, entity, responseType), sc);
+    }
+
+    @Override
+    public <T> Observable<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
+        if (sc == null) {
+            return Observable.fromFuture(wc.async().method(name, entity, responseType));
+        }
+        return Observable.fromFuture(wc.async().method(name, entity, responseType), sc);
+    }
+
+    @Override
+    public <T> Observable<T> method(String name, Class<T> responseType) {
+        if (sc == null) {
+            return Observable.fromFuture(wc.async().method(name, responseType));
+        }
+        return Observable.fromFuture(wc.async().method(name, responseType), sc);
+    }
+
+    @Override
+    public <T> Observable<T> method(String name, GenericType<T> responseType) {
+        if (sc == null) {
+            return Observable.fromFuture(wc.async().method(name, responseType));
+        }
+        return Observable.fromFuture(wc.async().method(name, responseType), sc);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
new file mode 100644
index 0000000..221bc48
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.client;
+
+import java.util.concurrent.ExecutorService;
+
+import javax.ws.rs.client.RxInvokerProvider;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.cxf.jaxrs.client.SyncInvokerImpl;
+
+@Provider
+public class ObservableRxInvokerProvider implements RxInvokerProvider<ObservableRxInvoker> {
+
+    @Override
+    public ObservableRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
+        // TODO: At the moment we still delegate if possible to the async HTTP conduit.
+        // Investigate if letting the RxJava thread pool deal with the sync invocation
+        // is indeed more effective
+        return new ObservableRxInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), executorService);
+    }
+
+    @Override
+    public boolean isProviderFor(Class<?> rxCls) {
+        return ObservableRxInvoker.class == rxCls;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java
new file mode 100644
index 0000000..8047c6a
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.server;
+
+import org.apache.cxf.jaxrs.JAXRSInvoker;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.message.Message;
+
+import io.reactivex.Observable;
+
+public class ObservableInvoker extends JAXRSInvoker {
+    protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) {
+        if (result instanceof Observable) {
+            final Observable<?> obs = (Observable<?>)result;
+            final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+            obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
+            return asyncResponse;
+        }
+        return null;
+    }
+
+    private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) {
+        //TODO: if it is a Cancelation exception => asyncResponse.cancel(); 
+        asyncResponse.resume(t);
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/pom.xml
----------------------------------------------------------------------
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index b515011..542201a 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -60,7 +60,12 @@
         <dependency>
           <groupId>io.reactivex</groupId>
           <artifactId>rxjava</artifactId>
-          <version>${cxf.rx.java.version}</version>
+          <version>${cxf.rxjava.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>io.reactivex.rxjava2</groupId>
+          <artifactId>rxjava</artifactId>
+          <version>${cxf.rxjava2.version}</version>
         </dependency>
         <!--
           <dependency>

http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java
deleted file mode 100644
index 39d8fd5..0000000
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.systest.jaxrs.reactive;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-import javax.ws.rs.NotFoundException;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Invocation;
-import javax.ws.rs.core.GenericType;
-
-import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
-import org.apache.cxf.jaxrs.client.WebClient;
-import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
-import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker;
-import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider;
-import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import rx.Observable;
-
-public class JAXRSObservableTest extends AbstractBusClientServerTestBase {
-    public static final String PORT = ObservableServer.PORT;
-    @BeforeClass
-    public static void startServers() throws Exception {
-        AbstractResourceInfo.clearAllMaps();
-        assertTrue("server did not launch correctly",
-                   launchServer(ObservableServer.class, true));
-        createStaticBus();
-    }
-    @Test
-    public void testGetHelloWorldText() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/text";
-        WebClient wc = WebClient.create(address);
-        String text = wc.accept("text/plain").get(String.class);
-        assertEquals("Hello, world!", text);
-    }
-    @Test
-    public void testGetHelloWorldAsyncText() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/textAsync";
-        WebClient wc = WebClient.create(address);
-        String text = wc.accept("text/plain").get(String.class);
-        assertEquals("Hello, world!", text);
-    }
-
-    @Test
-    public void testGetHelloWorldJson() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/textJson";
-        WebClient wc = WebClient.create(address,
-                                        Collections.singletonList(new JacksonJsonProvider()));
-        HelloWorldBean bean = wc.accept("application/json").get(HelloWorldBean.class);
-        assertEquals("Hello", bean.getGreeting());
-        assertEquals("World", bean.getAudience());
-    }
-    @Test
-    public void testGetHelloWorldJsonList() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/textJsonList";
-        doTestGetHelloWorldJsonList(address);
-    }
-    @Test
-    public void testGetHelloWorldJsonImplicitListAsync() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsync";
-        doTestGetHelloWorldJsonList(address);
-    }
-    @Test
-    public void testGetHelloWorldJsonImplicitListAsyncStream() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsyncStream";
-        doTestGetHelloWorldJsonList(address);
-    }
-    private void doTestGetHelloWorldJsonList(String address) throws Exception {
-        WebClient wc = WebClient.create(address,
-                                        Collections.singletonList(new JacksonJsonProvider()));
-        WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
-        GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() {
-        };
-
-        List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType);
-        assertEquals(2, beans.size());
-        assertEquals("Hello", beans.get(0).getGreeting());
-        assertEquals("World", beans.get(0).getAudience());
-        assertEquals("Ciao", beans.get(1).getGreeting());
-        assertEquals("World", beans.get(1).getAudience());
-    }
-
-    @Test
-    public void testGetHelloWorldAsyncObservable() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/textAsync";
-        WebClient wc = WebClient.create(address,
-                                        Collections.singletonList(new ObservableRxInvokerProvider()));
-        Observable<String> obs = wc.accept("text/plain")
-            .rx(ObservableRxInvoker.class)
-            .get(String.class);
-        obs.map(s -> {
-            return s + s;
-        });
-
-        Thread.sleep(3000);
-
-        obs.subscribe(s -> assertDuplicateResponse(s));
-    }
-    @Test
-    public void testGetHelloWorldAsyncObservable404() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/textAsync404";
-        Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider())
-            .target(address).request();
-        b.rx(ObservableRxInvoker.class).get(String.class).subscribe(
-            s -> {
-                fail("Exception expected");
-            },
-            t -> validateT((ExecutionException)t));
-    }
-
-    private void validateT(ExecutionException t) {
-        assertTrue(t.getCause() instanceof NotFoundException);
-    }
-    private void assertDuplicateResponse(String s) {
-        assertEquals("Hello, world!Hello, world!", s);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java
new file mode 100644
index 0000000..ded2799
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.xml.ws.Holder;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvoker;
+import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.reactivex.Observable;
+
+public class JAXRSRxJava2Test extends AbstractBusClientServerTestBase {
+    public static final String PORT = RxJava2Server.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly",
+                   launchServer(RxJava2Server.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/observable2/textJson";
+        List<Object> providers = new LinkedList<>();
+        providers.add(new JacksonJsonProvider());
+        providers.add(new ObservableRxInvokerProvider());
+        WebClient wc = WebClient.create(address, providers);
+        Observable<HelloWorldBean> obs = wc.accept("application/json")
+            .rx(ObservableRxInvoker.class)
+            .get(HelloWorldBean.class);
+        
+        Holder<HelloWorldBean> holder = new Holder<HelloWorldBean>();
+        obs.subscribe(v -> {
+            holder.value = v;
+        });
+        Thread.sleep(3000);
+        assertEquals("Hello", holder.value.getGreeting());
+        assertEquals("World", holder.value.getAudience());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java
new file mode 100644
index 0000000..9f197d8
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.core.GenericType;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker;
+import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import rx.Observable;
+
+public class JAXRSRxJavaTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = RxJavaServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly",
+                   launchServer(RxJavaServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldText() throws Exception {
+        String address = "http://localhost:" + PORT + "/observable/text";
+        WebClient wc = WebClient.create(address);
+        String text = wc.accept("text/plain").get(String.class);
+        assertEquals("Hello, world!", text);
+    }
+    @Test
+    public void testGetHelloWorldAsyncText() throws Exception {
+        String address = "http://localhost:" + PORT + "/observable/textAsync";
+        WebClient wc = WebClient.create(address);
+        String text = wc.accept("text/plain").get(String.class);
+        assertEquals("Hello, world!", text);
+    }
+
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/observable/textJson";
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new JacksonJsonProvider()));
+        HelloWorldBean bean = wc.accept("application/json").get(HelloWorldBean.class);
+        assertEquals("Hello", bean.getGreeting());
+        assertEquals("World", bean.getAudience());
+    }
+    @Test
+    public void testGetHelloWorldJsonList() throws Exception {
+        String address = "http://localhost:" + PORT + "/observable/textJsonList";
+        doTestGetHelloWorldJsonList(address);
+    }
+    @Test
+    public void testGetHelloWorldJsonImplicitListAsync() throws Exception {
+        String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsync";
+        doTestGetHelloWorldJsonList(address);
+    }
+    @Test
+    public void testGetHelloWorldJsonImplicitListAsyncStream() throws Exception {
+        String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsyncStream";
+        doTestGetHelloWorldJsonList(address);
+    }
+    private void doTestGetHelloWorldJsonList(String address) throws Exception {
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new JacksonJsonProvider()));
+        WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
+        GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() {
+        };
+
+        List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType);
+        assertEquals(2, beans.size());
+        assertEquals("Hello", beans.get(0).getGreeting());
+        assertEquals("World", beans.get(0).getAudience());
+        assertEquals("Ciao", beans.get(1).getGreeting());
+        assertEquals("World", beans.get(1).getAudience());
+    }
+
+    @Test
+    public void testGetHelloWorldAsyncObservable() throws Exception {
+        String address = "http://localhost:" + PORT + "/observable/textAsync";
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new ObservableRxInvokerProvider()));
+        Observable<String> obs = wc.accept("text/plain")
+            .rx(ObservableRxInvoker.class)
+            .get(String.class);
+        obs.map(s -> {
+            return s + s;
+        });
+
+        Thread.sleep(3000);
+
+        obs.subscribe(s -> assertDuplicateResponse(s));
+    }
+    @Test
+    public void testGetHelloWorldAsyncObservable404() throws Exception {
+        String address = "http://localhost:" + PORT + "/observable/textAsync404";
+        Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider())
+            .target(address).request();
+        b.rx(ObservableRxInvoker.class).get(String.class).subscribe(
+            s -> {
+                fail("Exception expected");
+            },
+            t -> validateT((ExecutionException)t));
+    }
+
+    private void validateT(ExecutionException t) {
+        assertTrue(t.getCause() instanceof NotFoundException);
+    }
+    private void assertDuplicateResponse(String s) {
+        assertEquals("Hello, world!Hello, world!", s);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java
deleted file mode 100644
index 03f89ef..0000000
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.systest.jaxrs.reactive;
-
-import java.util.Collections;
-
-import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.BusFactory;
-import org.apache.cxf.ext.logging.LoggingOutInterceptor;
-import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
-import org.apache.cxf.jaxrs.rx.server.ObservableInvoker;
-import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-
-
-public class ObservableServer extends AbstractBusTestServerBase {
-    public static final String PORT = allocatePort(ObservableServer.class);
-
-    org.apache.cxf.endpoint.Server server;
-    public ObservableServer() {
-    }
-
-    protected void run() {
-        Bus bus = BusFactory.getDefaultBus();
-        // Make sure default JSONProvider is not loaded
-        bus.setProperty("skip.default.json.provider.registration", true);
-        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setInvoker(new ObservableInvoker());
-        sf.setProvider(new JacksonJsonProvider());
-        StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
-        streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
-        sf.setProvider(streamProvider);
-        sf.getOutInterceptors().add(new LoggingOutInterceptor());
-        sf.setResourceClasses(ObservableService.class);
-        sf.setResourceProvider(ObservableService.class,
-                               new SingletonResourceProvider(new ObservableService(), true));
-        sf.setAddress("http://localhost:" + PORT + "/");
-        server = sf.create();
-    }
-
-    public void tearDown() throws Exception {
-        server.stop();
-        server.destroy();
-        server = null;
-    }
-
-    public static void main(String[] args) {
-        try {
-            ObservableServer s = new ObservableServer();
-            s.start();
-        } catch (Exception ex) {
-            ex.printStackTrace();
-            System.exit(-1);
-        } finally {
-            System.out.println("done!");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java
deleted file mode 100644
index 00783fd..0000000
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.systest.jaxrs.reactive;
-
-
-import java.util.Arrays;
-import java.util.List;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.Suspended;
-
-import org.apache.cxf.jaxrs.rx.server.AbstractAsyncSubscriber;
-import org.apache.cxf.jaxrs.rx.server.JsonStreamingAsyncSubscriber;
-import org.apache.cxf.jaxrs.rx.server.ListAsyncSubscriber;
-
-import rx.Observable;
-import rx.schedulers.Schedulers;
-
-
-@Path("/observable")
-public class ObservableService {
-
-    @GET
-    @Produces("text/plain")
-    @Path("text")
-    public Observable<String> getText() {
-        return Observable.just("Hello, world!");
-    }
-
-    @GET
-    @Produces("text/plain")
-    @Path("textAsync")
-    public void getTextAsync(@Suspended final AsyncResponse ar) {
-        Observable.just("Hello, ").map(s -> s + "world!")
-            .subscribe(new StringAsyncSubscriber(ar));
-
-    }
-
-    @GET
-    @Produces("application/json")
-    @Path("textJson")
-    public Observable<HelloWorldBean> getJson() {
-        return Observable.just(new HelloWorldBean());
-    }
-
-    @GET
-    @Produces("application/json")
-    @Path("textJsonImplicitListAsync")
-    public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
-        final HelloWorldBean bean1 = new HelloWorldBean();
-        final HelloWorldBean bean2 = new HelloWorldBean("Ciao");
-        new Thread(new Runnable() {
-            public void run() {
-                try {
-                    Thread.sleep(2000);
-                } catch (InterruptedException ex) {
-                    // ignore
-                }
-                Observable.just(bean1, bean2).subscribe(new ListAsyncSubscriber<HelloWorldBean>(ar));
-            }
-        }).start();
-
-    }
-    @GET
-    @Produces("application/json")
-    @Path("textJsonImplicitListAsyncStream")
-    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
-        Observable.just("Hello", "Ciao")
-            .map(s -> new HelloWorldBean(s))
-            .subscribeOn(Schedulers.computation())
-            .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar));
-    }
-    @GET
-    @Produces("application/json")
-    @Path("textJsonList")
-    public Observable<List<HelloWorldBean>> getJsonList() {
-        HelloWorldBean bean1 = new HelloWorldBean();
-        HelloWorldBean bean2 = new HelloWorldBean();
-        bean2.setGreeting("Ciao");
-        return Observable.just(Arrays.asList(bean1, bean2));
-    }
-
-    private class StringAsyncSubscriber extends AbstractAsyncSubscriber<String> {
-
-        private StringBuilder sb = new StringBuilder();
-        StringAsyncSubscriber(AsyncResponse ar) {
-            super(ar);
-        }
-        @Override
-        public void onCompleted() {
-            super.resume(sb.toString());
-        }
-
-        @Override
-        public void onNext(String s) {
-            sb.append(s);
-        }
-
-    }
-}
-
-

http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
new file mode 100644
index 0000000..28d053e
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+import io.reactivex.Observable;
+
+
+@Path("/observable2")
+public class RxJava2ObservableService {
+
+    
+    @GET
+    @Produces("application/json")
+    @Path("textJson")
+    public Observable<HelloWorldBean> getJson() {
+        return Observable.just(new HelloWorldBean());
+    }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java
new file mode 100644
index 0000000..f9ab3ae
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.rx2.server.ObservableInvoker;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava2Server extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(RxJava2Server.class);
+
+    org.apache.cxf.endpoint.Server server;
+    public RxJava2Server() {
+    }
+
+    protected void run() {
+        Bus bus = BusFactory.getDefaultBus();
+        // Make sure default JSONProvider is not loaded
+        bus.setProperty("skip.default.json.provider.registration", true);
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setInvoker(new ObservableInvoker());
+        sf.setProvider(new JacksonJsonProvider());
+        sf.getOutInterceptors().add(new LoggingOutInterceptor());
+        sf.setResourceClasses(RxJava2ObservableService.class);
+        sf.setResourceProvider(RxJava2ObservableService.class,
+                               new SingletonResourceProvider(new RxJava2ObservableService(), true));
+        sf.setAddress("http://localhost:" + PORT + "/");
+        server = sf.create();
+    }
+
+    public void tearDown() throws Exception {
+        server.stop();
+        server.destroy();
+        server = null;
+    }
+
+    public static void main(String[] args) {
+        try {
+            RxJava2Server s = new RxJava2Server();
+            s.start();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            System.exit(-1);
+        } finally {
+            System.out.println("done!");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java
new file mode 100644
index 0000000..de0f91f
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+
+import org.apache.cxf.jaxrs.rx.server.AbstractAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.server.JsonStreamingAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.server.ListAsyncSubscriber;
+
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+
+@Path("/observable")
+public class RxJavaObservableService {
+
+    @GET
+    @Produces("text/plain")
+    @Path("text")
+    public Observable<String> getText() {
+        return Observable.just("Hello, world!");
+    }
+
+    @GET
+    @Produces("text/plain")
+    @Path("textAsync")
+    public void getTextAsync(@Suspended final AsyncResponse ar) {
+        Observable.just("Hello, ").map(s -> s + "world!")
+            .subscribe(new StringAsyncSubscriber(ar));
+
+    }
+
+    @GET
+    @Produces("application/json")
+    @Path("textJson")
+    public Observable<HelloWorldBean> getJson() {
+        return Observable.just(new HelloWorldBean());
+    }
+
+    @GET
+    @Produces("application/json")
+    @Path("textJsonImplicitListAsync")
+    public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
+        final HelloWorldBean bean1 = new HelloWorldBean();
+        final HelloWorldBean bean2 = new HelloWorldBean("Ciao");
+        new Thread(new Runnable() {
+            public void run() {
+                try {
+                    Thread.sleep(2000);
+                } catch (InterruptedException ex) {
+                    // ignore
+                }
+                Observable.just(bean1, bean2).subscribe(new ListAsyncSubscriber<HelloWorldBean>(ar));
+            }
+        }).start();
+
+    }
+    @GET
+    @Produces("application/json")
+    @Path("textJsonImplicitListAsyncStream")
+    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
+        Observable.just("Hello", "Ciao")
+            .map(s -> new HelloWorldBean(s))
+            .subscribeOn(Schedulers.computation())
+            .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar));
+    }
+    @GET
+    @Produces("application/json")
+    @Path("textJsonList")
+    public Observable<List<HelloWorldBean>> getJsonList() {
+        HelloWorldBean bean1 = new HelloWorldBean();
+        HelloWorldBean bean2 = new HelloWorldBean();
+        bean2.setGreeting("Ciao");
+        return Observable.just(Arrays.asList(bean1, bean2));
+    }
+
+    private class StringAsyncSubscriber extends AbstractAsyncSubscriber<String> {
+
+        private StringBuilder sb = new StringBuilder();
+        StringAsyncSubscriber(AsyncResponse ar) {
+            super(ar);
+        }
+        @Override
+        public void onCompleted() {
+            super.resume(sb.toString());
+        }
+
+        @Override
+        public void onNext(String s) {
+            sb.append(s);
+        }
+
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaServer.java
new file mode 100644
index 0000000..70f58b3
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaServer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
+import org.apache.cxf.jaxrs.rx.server.ObservableInvoker;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJavaServer extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(RxJavaServer.class);
+
+    org.apache.cxf.endpoint.Server server;
+    public RxJavaServer() {
+    }
+
+    protected void run() {
+        Bus bus = BusFactory.getDefaultBus();
+        // Make sure default JSONProvider is not loaded
+        bus.setProperty("skip.default.json.provider.registration", true);
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setInvoker(new ObservableInvoker());
+        sf.setProvider(new JacksonJsonProvider());
+        StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
+        streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
+        sf.setProvider(streamProvider);
+        sf.getOutInterceptors().add(new LoggingOutInterceptor());
+        sf.setResourceClasses(RxJavaObservableService.class);
+        sf.setResourceProvider(RxJavaObservableService.class,
+                               new SingletonResourceProvider(new RxJavaObservableService(), true));
+        sf.setAddress("http://localhost:" + PORT + "/");
+        server = sf.create();
+    }
+
+    public void tearDown() throws Exception {
+        server.stop();
+        server.destroy();
+        server = null;
+    }
+
+    public static void main(String[] args) {
+        try {
+            RxJavaServer s = new RxJavaServer();
+            s.start();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            System.exit(-1);
+        } finally {
+            System.out.println("done!");
+        }
+    }
+
+}


Mime
View raw message