cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [05/23] cxf git commit: [CXF-6833] Closer integration with AsyncResponse
Date Mon, 05 Sep 2016 01:10:12 GMT
[CXF-6833] Closer integration with AsyncResponse


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/1224d5cc
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/1224d5cc
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/1224d5cc

Branch: refs/heads/master-jaxrs-2.1
Commit: 1224d5cc9f646aa3a4924ce9cb045366d2f4cdff
Parents: b114d6c
Author: Sergey Beryozkin <sberyozkin@gmail.com>
Authored: Wed Aug 31 16:52:59 2016 +0100
Committer: Sergey Beryozkin <sberyozkin@gmail.com>
Committed: Wed Aug 31 16:52:59 2016 +0100

----------------------------------------------------------------------
 .../provider/StreamingResponseProvider.java     |   4 +-
 .../provider/rx/AbstractAsyncSubscriber.java    |  12 ++
 .../rx/JsonStreamingAsyncSubscriber.java        |  31 +++++
 .../jaxrs/provider/rx/ListAsyncSubscriber.java  |  42 +++++++
 .../provider/rx/StreamingAsyncSubscriber.java   | 125 +++++++++++++++++++
 .../systest/jaxrs/reactive/HelloWorldBean.java  |   9 +-
 .../jaxrs/reactive/JAXRSReactiveTest.java       |  10 ++
 .../systest/jaxrs/reactive/ReactiveServer.java  |   7 ++
 .../systest/jaxrs/reactive/ReactiveService.java |  33 ++++-
 9 files changed, 268 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java
b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java
index 2749d7c..e33728d 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java
@@ -34,8 +34,8 @@ import javax.ws.rs.ext.Providers;
 import org.apache.cxf.jaxrs.ext.StreamingResponse;
 import org.apache.cxf.jaxrs.utils.InjectionUtils;
 
-public class StreamingResponseProvider<T> implements 
-    MessageBodyWriter<StreamingResponse<T>> {
+public class StreamingResponseProvider<T> extends AbstractConfigurableProvider 
+    implements MessageBodyWriter<StreamingResponse<T>> {
 
     @Context
     private Providers providers;

http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
index ae4459c..c49144f 100644
--- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
+++ b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
@@ -18,8 +18,12 @@
  */
 package org.apache.cxf.jaxrs.provider.rx;
 
+import java.util.List;
+
 import javax.ws.rs.container.AsyncResponse;
 
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+
 import rx.Subscriber;
 
 public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> {
@@ -32,6 +36,14 @@ public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T>
{
     public void resume(T response) {
         ar.resume(response);
     }
+    
+    public void resume(List<T> response) {
+        ar.resume(response);
+    }
+    
+    public void resume(StreamingResponse<T> response) {
+        ar.resume(response);
+    }
 
     @Override
     public void onError(Throwable t) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java
b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java
new file mode 100644
index 0000000..5e36e9c
--- /dev/null
+++ b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.provider.rx;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T>
{
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
+        this(ar, 1000);
+    }
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
+        super(ar, "[", "]", ",", pollTimeout);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java
b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java
new file mode 100644
index 0000000..6bfb1cb
--- /dev/null
+++ b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.provider.rx;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class ListAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
+    
+    private List<T> beans = new LinkedList<T>();
+    public ListAsyncSubscriber(AsyncResponse ar) {
+        super(ar);
+    }
+    @Override
+    public void onCompleted() {
+        super.resume(beans);
+    }
+
+    @Override
+    public void onNext(T bean) {
+        beans.add(bean);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java
b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java
new file mode 100644
index 0000000..48a5ac4
--- /dev/null
+++ b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.provider.rx;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
+
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+
+public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
+    
+    private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
+    private String openTag;
+    private String closeTag;
+    private String separator;
+    private long pollTimeout;
+    private long asyncTimeout;
+    private volatile boolean completed;
+    private volatile boolean firstWriteDone;
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String
sep) {
+        this(ar, openTag, closeTag, "", 1000);
+    }
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String
sep, 
+                                    long pollTimeout) {
+        this(ar, openTag, closeTag, sep, pollTimeout, 0);
+    }
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String
sep, 
+                                    long pollTimeout, long asyncTimeout) {
+        super(ar);
+        this.openTag = openTag;
+        this.closeTag = closeTag;
+        this.separator = sep;
+        this.pollTimeout = pollTimeout;
+        this.asyncTimeout = 0;
+        if (asyncTimeout > 0) {
+            ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+            ar.setTimeoutHandler(new TimeoutHandlerImpl());
+        }
+    }
+    @Override
+    public void onStart() {
+        if (asyncTimeout == 0) {
+            resumeAsyncResponse();
+        }
+    }
+    private void resumeAsyncResponse() {
+        super.resume(new StreamingResponseImpl());
+    }
+    @Override
+    public void onCompleted() {
+        completed = true;
+    }
+    
+    @Override
+    public void onNext(T bean) {
+        if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
+            resumeAsyncResponse();
+        }
+        queue.add(bean);
+    }
+    private class StreamingResponseImpl implements StreamingResponse<T> {
+
+        @Override
+        public void writeTo(Writer<T> writer) throws IOException {
+            if (openTag != null) {
+                writer.getEntityStream().write(StringUtils.getBytesUtf8(openTag));
+            }
+            while (!completed || queue.size() > 0) {
+                try {
+                    T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
+                    if (bean != null) {
+                        if (firstWriteDone) {
+                            writer.getEntityStream().write(StringUtils.getBytesUtf8(separator));
+                        }
+                        writer.write(bean);
+                        firstWriteDone = true;
+                        
+                    }
+                } catch (InterruptedException ex) {
+                    // ignore
+                }
+            }
+            if (closeTag != null) {
+                writer.getEntityStream().write(StringUtils.getBytesUtf8(closeTag));
+            }
+            
+        }
+
+    }
+    public class TimeoutHandlerImpl implements TimeoutHandler {
+
+        @Override
+        public void handleTimeout(AsyncResponse asyncResponse) {
+            if (queue.isEmpty()) {
+                asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+            } else {
+                resumeAsyncResponse();
+            }
+
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/HelloWorldBean.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/HelloWorldBean.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/HelloWorldBean.java
index c73a63d..79dc6f2 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/HelloWorldBean.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/HelloWorldBean.java
@@ -19,8 +19,15 @@
 package org.apache.cxf.systest.jaxrs.reactive;
 
 public class HelloWorldBean {
-    private String greeting = "Hello";
+    private String greeting;
     private String audience = "World";
+    public HelloWorldBean() {
+        this("Hello");
+    }
+    public HelloWorldBean(String greeting) {
+        this.greeting = greeting;
+    }
+    
     public String getGreeting() {
         return greeting;
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
index f82d139..45a9468 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
@@ -104,6 +104,16 @@ public class JAXRSReactiveTest extends AbstractBusClientServerTestBase
{
         String address = "http://localhost:" + PORT + "/reactive/textJsonImplicitList";
         doTestGetHelloWorldJsonList(address);
     }
+    @Test
+    public void testGetHelloWorldJsonImplicitListAsync() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactive/textJsonImplicitListAsync";
+        doTestGetHelloWorldJsonList(address);
+    }
+    @Test
+    public void testGetHelloWorldJsonImplicitListAsyncStream() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactive/textJsonImplicitListAsyncStream";
+        doTestGetHelloWorldJsonList(address);
+    }
     private void doTestGetHelloWorldJsonList(String address) throws Exception {
         WebClient wc = WebClient.create(address,
                                         Collections.singletonList(new JacksonJsonProvider()));

http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
index ced133b..d12641a 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
@@ -19,6 +19,8 @@
 
 package org.apache.cxf.systest.jaxrs.reactive;
 
+import java.util.Collections;
+
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.apache.cxf.Bus;
@@ -26,8 +28,10 @@ import org.apache.cxf.BusFactory;
 import org.apache.cxf.interceptor.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
 import org.apache.cxf.jaxrs.provider.rx.ObservableWriter;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
     
 public class ReactiveServer extends AbstractBusTestServerBase {
     public static final String PORT = allocatePort(ReactiveServer.class);
@@ -43,6 +47,9 @@ public class ReactiveServer extends AbstractBusTestServerBase {
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
         sf.setProvider(new ObservableWriter<Object>());
         sf.setProvider(new JacksonJsonProvider());
+        StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
+        streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
+        sf.setProvider(streamProvider);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(ReactiveService.class);
         sf.setResourceProvider(ReactiveService.class,

http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
index 5d77969..6081f2b 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
@@ -30,8 +30,11 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 
 import org.apache.cxf.jaxrs.provider.rx.AbstractAsyncSubscriber;
+import org.apache.cxf.jaxrs.provider.rx.JsonStreamingAsyncSubscriber;
+import org.apache.cxf.jaxrs.provider.rx.ListAsyncSubscriber;
 
 import rx.Observable;
+import rx.schedulers.Schedulers;
 
 
 @Path("/reactive")
@@ -65,12 +68,38 @@ public class ReactiveService {
     @Path("textJsonImplicitList")
     public Observable<HelloWorldBean> getJsonImplicitList() {
         HelloWorldBean bean1 = new HelloWorldBean();
-        HelloWorldBean bean2 = new HelloWorldBean();
-        bean2.setGreeting("Ciao");
+        HelloWorldBean bean2 = new HelloWorldBean("Ciao");
         return Observable.just(bean1, bean2);
     }
     @GET
     @Produces("application/json")
+    @Path("textJsonImplicitListAsync")
+    public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
+        final HelloWorldBean bean1 = new HelloWorldBean();
+        final HelloWorldBean bean2 = new HelloWorldBean("Ciao");
+        new Thread(new Runnable() {
+            public void run() {
+                try {
+                    Thread.sleep(2000);
+                } catch (InterruptedException ex) {
+                    // ignore
+                }  
+                Observable.just(bean1, bean2).subscribe(new ListAsyncSubscriber<HelloWorldBean>(ar));
+            }
+        }).start();
+        
+    }
+    @GET
+    @Produces("application/json")
+    @Path("textJsonImplicitListAsyncStream")
+    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
+        Observable.just("Hello", "Ciao")
+            .map(s -> new HelloWorldBean(s))
+            .subscribeOn(Schedulers.computation())
+            .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar));
+    }
+    @GET
+    @Produces("application/json")
     @Path("textJsonList")
     public Observable<List<HelloWorldBean>> getJsonList() {
         HelloWorldBean bean1 = new HelloWorldBean();


Mime
View raw message