cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From serg...@apache.org
Subject [cxf] branch master updated: [CXF-7556] Removing RxJava2 specific import in Rx2Java abstract subscriber code
Date Thu, 09 Nov 2017 15:35:40 GMT
This is an automated email from the ASF dual-hosted git repository.

sergeyb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new adc783c  [CXF-7556] Removing RxJava2 specific import in Rx2Java abstract subscriber
code
adc783c is described below

commit adc783c79ca4417599458598442a26f34656d1dc
Author: Sergey Beryozkin <sberyozkin@gmail.com>
AuthorDate: Thu Nov 9 15:35:27 2017 +0000

    [CXF-7556] Removing RxJava2 specific import in Rx2Java abstract subscriber code
---
 ...syncSubscriber.java => AbstractSubscriber.java} | 41 +++++++++++++++++-----
 .../jaxrs/rx2/server/StreamingAsyncSubscriber.java |  7 ++--
 2 files changed, 37 insertions(+), 11 deletions(-)

diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractAsyncSubscriber.java
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractSubscriber.java
similarity index 66%
rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractAsyncSubscriber.java
rename to rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractSubscriber.java
index 17664c2..5c1ec6c 100644
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractAsyncSubscriber.java
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractSubscriber.java
@@ -23,14 +23,17 @@ import java.util.List;
 import javax.ws.rs.container.AsyncResponse;
 
 import org.apache.cxf.jaxrs.ext.StreamingResponse;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
 
 import io.reactivex.subscribers.DefaultSubscriber;
 
-public abstract class AbstractAsyncSubscriber<T> extends DefaultSubscriber<T>
{
+public abstract class AbstractSubscriber<T> implements Subscriber<T> {
 
     private AsyncResponse ar;
+    private Subscription subscription;
 
-    protected AbstractAsyncSubscriber(AsyncResponse ar) {
+    protected AbstractSubscriber(AsyncResponse ar) {
         this.ar = ar;
     }
     public void resume(T response) {
@@ -50,16 +53,38 @@ public abstract class AbstractAsyncSubscriber<T> extends DefaultSubscriber<T>
{
         ar.resume(t);
     }
 
+    @Override
+    public void onSubscribe(Subscription inSubscription) {
+        this.subscription = inSubscription;
+        requestAll();
+    }
+
+    @Override
+    public void onNext(T t) {
+        resume(t);
+    }
+
+    @Override
+    public void onComplete() {
+    }
+
     protected AsyncResponse getAsyncResponse() {
         return ar;
     }
 
-    @Override
-    public void onStart() {
-        requestNext();
+    protected Subscription getSubscription() {
+        return subscription;
     }
-    
+
     protected void requestNext() {
-        super.request(1);
+        request(1);
+    }
+
+    protected void requestAll() {
+        request(Long.MAX_VALUE);
+    }
+
+    protected final void request(long elements) {
+        this.subscription.request(elements);
     }
-}
+}
\ No newline at end of file
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/StreamingAsyncSubscriber.java
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/StreamingAsyncSubscriber.java
index 3916878..128f499 100644
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/StreamingAsyncSubscriber.java
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/StreamingAsyncSubscriber.java
@@ -29,8 +29,9 @@ import javax.ws.rs.container.TimeoutHandler;
 
 import org.apache.cxf.common.util.StringUtils;
 import org.apache.cxf.jaxrs.ext.StreamingResponse;
+import org.reactivestreams.Subscription;
 
-public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
+public class StreamingAsyncSubscriber<T> extends AbstractSubscriber<T> {
 
     private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
     private String openTag;
@@ -61,11 +62,11 @@ public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T>
{
         }
     }
     @Override
-    public void onStart() {
+    public void onSubscribe(Subscription subscription) {
         if (asyncTimeout == 0) {
             resumeAsyncResponse();
         }
-        super.onStart();
+        super.onSubscribe(subscription);
     }
     private void resumeAsyncResponse() {
         super.resume(new StreamingResponseImpl());

-- 
To stop receiving notification emails like this one, please contact
['"commits@cxf.apache.org" <commits@cxf.apache.org>'].

Mime
View raw message