cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From serg...@apache.org
Subject [cxf] branch master updated: [CXF-7556] Minor optimizations
Date Wed, 27 Dec 2017 11:28:46 GMT
This is an automated email from the ASF dual-hosted git repository.

sergeyb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new 07e1a17  [CXF-7556] Minor optimizations
07e1a17 is described below

commit 07e1a175818695b048975e81f4db2b2756a9a209
Author: Sergey Beryozkin <sberyozkin@gmail.com>
AuthorDate: Wed Dec 27 11:28:32 2017 +0000

    [CXF-7556] Minor optimizations
---
 .../reactivestreams/server/AbstractReactiveInvoker.java    | 14 +++++++++++++-
 .../apache/cxf/jaxrs/reactor/server/ReactorInvoker.java    |  5 +----
 .../org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java |  5 +----
 3 files changed, 15 insertions(+), 9 deletions(-)

diff --git a/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/AbstractReactiveInvoker.java
b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/AbstractReactiveInvoker.java
index 237a4dc..e82166b 100644
--- a/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/AbstractReactiveInvoker.java
+++ b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/AbstractReactiveInvoker.java
@@ -20,11 +20,13 @@ package org.apache.cxf.jaxrs.reactivestreams.server;
 
 import java.util.concurrent.CancellationException;
 
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.MediaType;
 
 import org.apache.cxf.jaxrs.JAXRSInvoker;
 import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
 import org.apache.cxf.message.Message;
+import org.reactivestreams.Publisher;
 
 
 public abstract class AbstractReactiveInvoker extends JAXRSInvoker {
@@ -44,11 +46,21 @@ public abstract class AbstractReactiveInvoker extends JAXRSInvoker {
         return MediaType.APPLICATION_JSON.equals(inMessage.getExchange().get(Message.CONTENT_TYPE));
     }
 
-    
     public boolean isUseStreamingSubscriberIfPossible() {
         return useStreamingSubscriberIfPossible;
     }
 
+    protected boolean isStreamingSubscriberUsed(Publisher<?> publisher,
+                                                AsyncResponse asyncResponse, 
+                                                Message inMessage) {
+        if (isUseStreamingSubscriberIfPossible() && isJsonResponse(inMessage)) {
+            publisher.subscribe(new JsonStreamingAsyncSubscriber<>(asyncResponse));
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
     public void setUseStreamingSubscriberIfPossible(boolean useStreamingSubscriberIfPossible)
{
         this.useStreamingSubscriberIfPossible = useStreamingSubscriberIfPossible;
     }
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
index 9e204a0..3d529f5 100644
--- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
@@ -20,7 +20,6 @@ package org.apache.cxf.jaxrs.reactor.server;
 
 import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
-import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
 import org.apache.cxf.message.Message;
 
 import reactor.core.publisher.Flux;
@@ -33,9 +32,7 @@ public class ReactorInvoker extends AbstractReactiveInvoker {
         if (result instanceof Flux) {
             final Flux<?> flux = (Flux<?>) result;
             final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
-            if (isUseStreamingSubscriberIfPossible() && isJsonResponse(inMessage))
{
-                flux.subscribe(new JsonStreamingAsyncSubscriber<>(asyncResponse));
-            } else {
+            if (!isStreamingSubscriberUsed(flux, asyncResponse, inMessage)) {
                 flux.doOnNext(asyncResponse::resume)
                     .doOnError(t -> handleThrowable(asyncResponse, t))
                     .subscribe();
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
index e113d40..13092dc 100644
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
@@ -20,7 +20,6 @@ package org.apache.cxf.jaxrs.rx2.server;
 
 import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
-import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
 import org.apache.cxf.message.Message;
 
 import io.reactivex.Flowable;
@@ -48,9 +47,7 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker {
 
     protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
-        if (isUseStreamingSubscriberIfPossible() && isJsonResponse(inMessage)) {
-            f.subscribe(new JsonStreamingAsyncSubscriber<>(asyncResponse));
-        } else {
+        if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) {
             f.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse,
t));
         }
         return asyncResponse;

-- 
To stop receiving notification emails like this one, please contact
['"commits@cxf.apache.org" <commits@cxf.apache.org>'].

Mime
View raw message