cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From johndam...@apache.org
Subject [cxf] branch master updated: [CXF-7642] Introduce separate modules for RxJava & RxJava2 (#381)
Date Thu, 15 Feb 2018 11:34:47 GMT
This is an automated email from the ASF dual-hosted git repository.

johndament pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d17bea  [CXF-7642] Introduce separate modules for RxJava & RxJava2 (#381)
3d17bea is described below

commit 3d17bead5bd2c22117b3e532c5184987b529cc01
Author: John Ament <john.d.ament@gmail.com>
AuthorDate: Thu Feb 15 06:34:45 2018 -0500

    [CXF-7642] Introduce separate modules for RxJava & RxJava2 (#381)
    
    - Introduce built in customizer for RxJava2
    - Introduce built in customizer for RxJava
    - [CXF-7631] Automatically customize on project reactor discovery.
---
 .../ext/AbstractStreamingResponseExtension.java    | 39 ++++++++++++++++++++++
 .../jaxrs/reactor/server/ReactorCustomizer.java    | 36 ++++++++++++++++++++
 ...rs.ext.JAXRSServerFactoryCustomizationExtension |  1 +
 rt/rs/extensions/rx/pom.xml                        | 19 -----------
 .../cxf/jaxrs/rx/server/ObservableCustomizer.java  | 30 +++++++++++++++++
 ...rs.ext.JAXRSServerFactoryCustomizationExtension |  1 +
 rt/rs/extensions/{rx => rx2}/pom.xml               | 20 ++---------
 .../cxf/jaxrs/rx2/client/FlowableRxInvoker.java    |  0
 .../jaxrs/rx2/client/FlowableRxInvokerImpl.java    |  0
 .../rx2/client/FlowableRxInvokerProvider.java      |  0
 .../cxf/jaxrs/rx2/client/ObservableRxInvoker.java  |  0
 .../jaxrs/rx2/client/ObservableRxInvokerImpl.java  |  0
 .../rx2/client/ObservableRxInvokerProvider.java    |  0
 .../cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java | 36 ++++++++++++++++++++
 .../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java    |  4 +--
 ...rs.ext.JAXRSServerFactoryCustomizationExtension |  1 +
 rt/rs/pom.xml                                      |  1 +
 systests/jaxrs/pom.xml                             |  5 +++
 .../jaxrs/reactive/RxJava2FlowableServer.java      | 13 ++------
 .../jaxrs/reactive/RxJavaObservableServer.java     | 10 ++----
 .../cxf/systest/jaxrs/reactor/MonoReactorTest.java |  1 +
 .../cxf/systest/jaxrs/reactor/ReactorServer.java   | 18 +++-------
 22 files changed, 165 insertions(+), 70 deletions(-)

diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/AbstractStreamingResponseExtension.java
b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/AbstractStreamingResponseExtension.java
new file mode 100644
index 0000000..e7dfc8a
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/AbstractStreamingResponseExtension.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.ext;
+
+import java.util.Collections;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
+import org.apache.cxf.service.invoker.Invoker;
+
+public abstract class AbstractStreamingResponseExtension implements JAXRSServerFactoryCustomizationExtension
{
+    protected abstract Invoker createInvoker(JAXRSServerFactoryBean bean);
+
+    @Override
+    public final void customize(JAXRSServerFactoryBean bean) {
+        bean.setInvoker(createInvoker(bean));
+        StreamingResponseProvider<Object> streamProvider = new StreamingResponseProvider<>();
+        streamProvider.setProduceMediaTypes(Collections.singletonList(MediaType.APPLICATION_JSON));
+        bean.setProvider(streamProvider);
+    }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
new file mode 100644
index 0000000..a57b574
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.server;
+
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
+import org.apache.cxf.service.invoker.Invoker;
+
+public class ReactorCustomizer extends AbstractStreamingResponseExtension {
+    @Override
+    protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
+        Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true)
+                .getOrDefault("useStreamingSubscriber", null);
+        ReactorInvoker invoker = new ReactorInvoker();
+        if (useStreamingSubscriber != null) {
+            invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
+        }
+        return invoker;
+    }
+}
\ No newline at end of file
diff --git a/rt/rs/extensions/reactor/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
b/rt/rs/extensions/reactor/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
new file mode 100644
index 0000000..0bca2eb
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
@@ -0,0 +1 @@
+org.apache.cxf.jaxrs.reactor.server.ReactorCustomizer
\ No newline at end of file
diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/rx/pom.xml
index 2e3df9e..c016384 100644
--- a/rt/rs/extensions/rx/pom.xml
+++ b/rt/rs/extensions/rx/pom.xml
@@ -45,25 +45,6 @@
           <groupId>io.reactivex</groupId>
           <artifactId>rxjava</artifactId>
           <scope>provided</scope>
-          <optional>true</optional>
-        </dependency>
-        <dependency>
-          <groupId>io.reactivex.rxjava2</groupId>
-          <artifactId>rxjava</artifactId>
-          <scope>provided</scope>
-          <optional>true</optional>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.cxf</groupId>
-            <artifactId>cxf-rt-rs-extension-reactivestreams</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-            <optional>true</optional>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableCustomizer.java
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableCustomizer.java
new file mode 100644
index 0000000..55bf3b9
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableCustomizer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.server;
+
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
+import org.apache.cxf.service.invoker.Invoker;
+
+public class ObservableCustomizer extends AbstractStreamingResponseExtension {
+    @Override
+    protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
+        return new ObservableInvoker();
+    }
+}
diff --git a/rt/rs/extensions/rx/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
b/rt/rs/extensions/rx/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
new file mode 100644
index 0000000..bb0e3d1
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
@@ -0,0 +1 @@
+org.apache.cxf.jaxrs.rx.server.ObservableCustomizer
\ No newline at end of file
diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/rx2/pom.xml
similarity index 77%
copy from rt/rs/extensions/rx/pom.xml
copy to rt/rs/extensions/rx2/pom.xml
index 2e3df9e..74f9764 100644
--- a/rt/rs/extensions/rx/pom.xml
+++ b/rt/rs/extensions/rx2/pom.xml
@@ -19,10 +19,10 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <modelVersion>4.0.0</modelVersion>
-    <artifactId>cxf-rt-rs-extension-rx</artifactId>
+    <artifactId>cxf-rt-rs-extension-rx2</artifactId>
     <packaging>bundle</packaging>
-    <name>Apache CXF JAX-RS Extensions: RxJava</name>
-    <description>Apache CXF JAX-RS Extensions: RxJava</description>
+    <name>Apache CXF JAX-RS Extensions: RxJava2</name>
+    <description>Apache CXF JAX-RS Extensions: RxJava2</description>
     <url>http://cxf.apache.org</url>
     <parent>
         <groupId>org.apache.cxf</groupId>
@@ -42,28 +42,14 @@
             <version>${project.version}</version>
         </dependency> 
         <dependency>
-          <groupId>io.reactivex</groupId>
-          <artifactId>rxjava</artifactId>
-          <scope>provided</scope>
-          <optional>true</optional>
-        </dependency>
-        <dependency>
           <groupId>io.reactivex.rxjava2</groupId>
           <artifactId>rxjava</artifactId>
           <scope>provided</scope>
-          <optional>true</optional>
         </dependency>
         <dependency>
             <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-rt-rs-extension-reactivestreams</artifactId>
             <version>${project.version}</version>
-            <scope>provided</scope>
-            <optional>true</optional>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java
similarity index 100%
rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java
rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
similarity index 100%
rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
similarity index 100%
rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java
similarity index 100%
rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java
rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
similarity index 100%
rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
similarity index 100%
rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java
new file mode 100644
index 0000000..e57bb39
--- /dev/null
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.server;
+
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
+import org.apache.cxf.service.invoker.Invoker;
+
+public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
+    @Override
+    protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
+        Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true)
+                .getOrDefault("useStreamingSubscriber", null);
+        ReactiveIOInvoker invoker = new ReactiveIOInvoker();
+        if (useStreamingSubscriber != null) {
+            invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
+        }
+        return invoker;
+    }
+}
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
similarity index 92%
rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
index 13092dc..e8871f4 100644
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
@@ -41,14 +41,14 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
     
     protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
-        single.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse,
t));
+        single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
         return asyncResponse;
     }
 
     protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
         if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) {
-            f.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse,
t));
+            f.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
         }
         return asyncResponse;
     }
diff --git a/rt/rs/extensions/rx2/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
b/rt/rs/extensions/rx2/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
new file mode 100644
index 0000000..7fd3d78
--- /dev/null
+++ b/rt/rs/extensions/rx2/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
@@ -0,0 +1 @@
+org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer
\ No newline at end of file
diff --git a/rt/rs/pom.xml b/rt/rs/pom.xml
index 4398491..debbe21 100644
--- a/rt/rs/pom.xml
+++ b/rt/rs/pom.xml
@@ -38,6 +38,7 @@
         <module>extensions/providers</module>
         <module>extensions/search</module>
         <module>extensions/rx</module>
+        <module>extensions/rx2</module>
         <module>extensions/reactor</module>
         <module>extensions/reactivestreams</module>
         <module>security</module>
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index 6106cbc..283964c 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -324,6 +324,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-extension-rx2</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-rt-rs-extension-reactor</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
index 5063b5e..2b18c83 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
@@ -19,8 +19,6 @@
 
 package org.apache.cxf.systest.jaxrs.reactive;
 
-import java.util.Collections;
-
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.apache.cxf.Bus;
@@ -28,8 +26,7 @@ import org.apache.cxf.BusFactory;
 import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
-import org.apache.cxf.jaxrs.rx2.server.ReactiveIOInvoker;
+import org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 
@@ -52,13 +49,9 @@ public class RxJava2FlowableServer extends AbstractBusTestServerBase {
     private JAXRSServerFactoryBean createFactoryBean(Bus bus, boolean useStreamingSubscriber,
                                                      String relAddress) {
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        ReactiveIOInvoker invoker = new ReactiveIOInvoker();
-        invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
-        sf.setInvoker(invoker);
+        sf.getProperties(true).put("useStreamingSubscriber", useStreamingSubscriber);
         sf.setProvider(new JacksonJsonProvider());
-        StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
-        streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
-        sf.setProvider(streamProvider);
+        new ReactiveIOCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(RxJava2FlowableService.class);
         sf.setResourceProvider(RxJava2FlowableService.class,
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java
index 85d576d..15759b1 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java
@@ -19,8 +19,6 @@
 
 package org.apache.cxf.systest.jaxrs.reactive;
 
-import java.util.Collections;
-
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.apache.cxf.Bus;
@@ -28,8 +26,7 @@ import org.apache.cxf.BusFactory;
 import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
-import org.apache.cxf.jaxrs.rx.server.ObservableInvoker;
+import org.apache.cxf.jaxrs.rx.server.ObservableCustomizer;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 
@@ -45,11 +42,8 @@ public class RxJavaObservableServer extends AbstractBusTestServerBase {
         // Make sure default JSONProvider is not loaded
         bus.setProperty("skip.default.json.provider.registration", true);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setInvoker(new ObservableInvoker());
         sf.setProvider(new JacksonJsonProvider());
-        StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
-        streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
-        sf.setProvider(streamProvider);
+        new ObservableCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(RxJavaObservableService.class);
         sf.setResourceProvider(RxJavaObservableService.class,
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
index a3ea3de..d66660c 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
@@ -77,6 +77,7 @@ public class MonoReactorTest extends AbstractBusClientServerTestBase {
                 .doOnNext(helloWorldBean -> holder.value = helloWorldBean)
                 .subscribe();
         Thread.sleep(500);
+        assertNotNull(holder.value);
         assertEquals("Hello", holder.value.getGreeting());
         assertEquals("World", holder.value.getAudience());
     }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
index dac4c2c..8965073 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
@@ -19,8 +19,6 @@
 
 package org.apache.cxf.systest.jaxrs.reactor;
 
-import java.util.Collections;
-
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.apache.cxf.Bus;
@@ -28,8 +26,7 @@ import org.apache.cxf.BusFactory;
 import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
-import org.apache.cxf.jaxrs.reactor.server.ReactorInvoker;
+import org.apache.cxf.jaxrs.reactor.server.ReactorCustomizer;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 public class ReactorServer extends AbstractBusTestServerBase {
@@ -44,13 +41,9 @@ public class ReactorServer extends AbstractBusTestServerBase {
         // Make sure default JSONProvider is not loaded
         bus.setProperty("skip.default.json.provider.registration", true);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        ReactorInvoker invoker = new ReactorInvoker();
-        invoker.setUseStreamingSubscriberIfPossible(false);
-        sf.setInvoker(invoker);
+        sf.getProperties(true).put("useStreamingSubscriber", false);
         sf.setProvider(new JacksonJsonProvider());
-        StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
-        streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
-        sf.setProvider(streamProvider);
+        new ReactorCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(FluxService.class, MonoService.class);
         sf.setResourceProvider(FluxService.class,
@@ -61,11 +54,8 @@ public class ReactorServer extends AbstractBusTestServerBase {
         server1 = sf.create();
         
         JAXRSServerFactoryBean sf2 = new JAXRSServerFactoryBean();
-        sf2.setInvoker(new ReactorInvoker());
-        StreamingResponseProvider<HelloWorldBean> streamProvider2 = new StreamingResponseProvider<HelloWorldBean>();
-        streamProvider2.setProduceMediaTypes(Collections.singletonList("application/json"));
-        sf2.setProvider(streamProvider2);
         sf2.setProvider(new JacksonJsonProvider());
+        new ReactorCustomizer().customize(sf2);
         sf2.getOutInterceptors().add(new LoggingOutInterceptor());
         sf2.setResourceClasses(FluxService.class);
         sf2.setResourceProvider(FluxService.class,

-- 
To stop receiving notification emails like this one, please contact
johndament@apache.org.

Mime
View raw message