cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [cxf] branch master updated: CXF-8022: Thread hangs using Reactor Flux when Exception is Thrown (#542)
Date Sat, 20 Apr 2019 14:35:28 GMT
This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new 57f8c55  CXF-8022: Thread hangs using Reactor Flux when Exception is Thrown (#542)
57f8c55 is described below

commit 57f8c554b04af4e06b4e97bb349cbb8adeeb7acc
Author: Andriy Redko <drreta@gmail.com>
AuthorDate: Sat Apr 20 10:35:21 2019 -0400

    CXF-8022: Thread hangs using Reactor Flux when Exception is Thrown (#542)
    
    CXF-8022: Thread hangs using Reactor Flux when Exception is Thrown
---
 .../server/StreamingAsyncSubscriber.java           |  43 +++++++-
 .../jaxrs/reactor/client/ReactorInvokerImpl.java   |   7 +-
 systests/jaxrs/pom.xml                             |   6 ++
 .../cxf/systest/jaxrs/reactor/FluxReactorTest.java | 108 +++++++++++++++++++++
 .../cxf/systest/jaxrs/reactor/FluxService.java     |  48 +++++++++
 .../reactor/IllegalArgumentExceptionMapper.java    |  32 ++++++
 .../cxf/systest/jaxrs/reactor/ReactorServer.java   |   1 +
 7 files changed, 239 insertions(+), 6 deletions(-)

diff --git a/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.java
b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.java
index c6b2ee3..b95c4b8 100644
--- a/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.java
+++ b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.java
@@ -40,6 +40,9 @@ public class StreamingAsyncSubscriber<T> extends AbstractSubscriber<T>
{
     private long pollTimeout;
     private long asyncTimeout;
     private volatile boolean completed;
+    private volatile Throwable throwable;
+    
+    private AtomicBoolean tagsWriteDone = new AtomicBoolean();
     private AtomicBoolean firstWriteDone = new AtomicBoolean();
     public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String
sep) {
         this(ar, openTag, closeTag, sep, 1000);
@@ -75,6 +78,16 @@ public class StreamingAsyncSubscriber<T> extends AbstractSubscriber<T>
{
     public void onComplete() {
         completed = true;
     }
+    
+    @Override
+    public void onError(Throwable t) {
+        throwable = t;
+        completed = true;
+        // The AsyncResponse implementation is resumable only once. If the
+        // onNext() has been called, the throwable will be propagated using
+        // StreamingResponseImpl, otherwise using normal resume operation.
+        super.onError(t);
+    }
 
     @Override
     public void onNext(T bean) {
@@ -88,10 +101,12 @@ public class StreamingAsyncSubscriber<T> extends AbstractSubscriber<T>
{
 
         @Override
         public void writeTo(Writer<T> writer) throws IOException {
-            if (openTag != null) {
-                writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
-            }
+
             while (!completed || !queue.isEmpty()) {
+                if (tagsWriteDone.compareAndSet(false, true) && openTag != null)
{
+                    writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
+                }
+                
                 try {
                     T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
                     if (bean != null) {
@@ -104,10 +119,30 @@ public class StreamingAsyncSubscriber<T> extends AbstractSubscriber<T>
{
                     // ignore
                 }
             }
-            if (closeTag != null) {
+            if (closeTag != null && tagsWriteDone.get()) {
                 writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag));
             }
 
+            if (throwable != null) {
+                if (throwable instanceof RuntimeException) {
+                    throw (RuntimeException)throwable;
+                } else if (throwable instanceof IOException) {
+                    throw (IOException)throwable;
+                } else {
+                    throw new IOException(throwable);
+                }
+            }
+            
+            // empty stream
+            if (!tagsWriteDone.get()) {
+                if (openTag != null) {
+                    writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
+                }
+                
+                if (closeTag != null) {
+                    writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag));
+                }
+            }
         }
 
     }
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerImpl.java
b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerImpl.java
index 8fd7b86..85c811d 100644
--- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerImpl.java
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/client/ReactorInvokerImpl.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.stream.StreamSupport;
 
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.client.Entity;
@@ -201,13 +202,15 @@ public class ReactorInvokerImpl implements ReactorInvoker {
     @Override
     public <T> Flux<T> flux(String name, Entity<?> entity, Class<T>
responseType) {
         Future<Response> futureResponse = webClient.async().method(name, entity);
-        return Flux.fromIterable(toIterable(futureResponse, responseType));
+        return Flux.fromStream(() -> 
+            StreamSupport.stream(toIterable(futureResponse, responseType).spliterator(),
false));
     }
 
     @Override
     public <T> Flux<T> flux(String name, Class<T> responseType) {
         Future<Response> futureResponse = webClient.async().method(name);
-        return Flux.fromIterable(toIterable(futureResponse, responseType));
+        return Flux.fromStream(() -> 
+            StreamSupport.stream(toIterable(futureResponse, responseType).spliterator(),
false));
     }
 
     @Override
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index f9f4236..f2eea86 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -531,6 +531,12 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <version>${cxf.reactor.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
index 1fe4e3c..23788c1 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
@@ -32,6 +32,7 @@ import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
 import org.apache.cxf.jaxrs.reactor.client.ReactorInvoker;
 import org.apache.cxf.jaxrs.reactor.client.ReactorInvokerProvider;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import reactor.test.StepVerifier;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -78,6 +79,113 @@ public class FluxReactorTest extends AbstractBusClientServerTestBase {
         String address = "http://localhost:" + PORT + "/reactor2/flux/textJsonImplicitListAsyncStream2";
         doTestTextJsonImplicitListAsyncStream(address);
     }
+    
+    @Test
+    public void testFluxEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor2/flux/empty";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .getFlux(HelloWorldBean.class))
+            .expectComplete()
+            .verify();
+    }
+    
+    @Test
+    public void testFluxErrors() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor2/flux/errors";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .getFlux(HelloWorldBean.class))
+            .expectNextMatches(b -> b.getGreeting().equalsIgnoreCase("Person 1"))
+            .expectComplete()
+            .verify();
+    }
+    
+    @Test
+    public void testFluxErrorsResponse() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor2/flux/errors";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .get())
+            .expectNextMatches(r -> r.getStatus() == 500)
+            .expectComplete()
+            .verify();
+    }
+    
+    @Test
+    public void testFluxErrorsResponseWithMapper() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor2/flux/mapper/errors";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .get())
+            .expectNextMatches(r -> r.getStatus() == 400)
+            .expectComplete()
+            .verify();
+    }
+    
+    @Test
+    public void testFluxImmediateErrors() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor2/flux/immediate/errors";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .getFlux(HelloWorldBean.class))
+            .expectError()
+            .verify();
+    }
+
+    @Test
+    public void testFluxImmediateErrorsResponse() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor2/flux/immediate/errors";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .get())
+            .expectNextMatches(r -> r.getStatus() == 500)
+            .expectComplete()
+            .verify();
+    }
+
     private void doTestTextJsonImplicitListAsyncStream(String address) throws Exception {
         final BlockingQueue<HelloWorldBean> holder = new LinkedBlockingQueue<>();
         ClientBuilder.newClient()
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
index 6e8bfe4..e8f4fd9 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
@@ -24,9 +24,11 @@ import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
 
 import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 @Path("/flux")
@@ -57,4 +59,50 @@ public class FluxService {
                 .map(HelloWorldBean::new)
                 .subscribeOn(Schedulers.parallel());
     }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("errors")
+    public Flux<HelloWorldBean> errors() { 
+        return Flux 
+            .range(1, 2) 
+            .flatMap(item -> { 
+                if (item < 2) { 
+                    return Mono.just(new HelloWorldBean("Person " + item)); 
+                } else { 
+                    return Mono.error(new RuntimeException("Oops")); 
+                } 
+            }); 
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/mapper/errors")
+    public Flux<HelloWorldBean> mapperErrors() { 
+        return Flux 
+            .range(1, 3) 
+            .flatMap(item -> { 
+                if (item < 3) { 
+                    return Mono.just(new HelloWorldBean("Person " + item)); 
+                } else { 
+                    return Mono.error(new IllegalArgumentException("Oops")); 
+                } 
+            }); 
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/errors")
+    public Flux<HelloWorldBean> immediateErrors() { 
+        return Flux 
+            .range(1, 2) 
+            .flatMap(item -> Mono.error(new RuntimeException("Oops"))); 
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/empty")
+    public Flux<HelloWorldBean> empty() { 
+        return Flux.empty(); 
+    }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/IllegalArgumentExceptionMapper.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/IllegalArgumentExceptionMapper.java
new file mode 100644
index 0000000..a31d1ef
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/IllegalArgumentExceptionMapper.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactor;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class IllegalArgumentExceptionMapper implements ExceptionMapper<IllegalArgumentException>
{
+    @Override
+    public Response toResponse(IllegalArgumentException exception) {
+        return Response.status(400).build();
+    }
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
index 8965073..0d7fe27 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
@@ -55,6 +55,7 @@ public class ReactorServer extends AbstractBusTestServerBase {
         
         JAXRSServerFactoryBean sf2 = new JAXRSServerFactoryBean();
         sf2.setProvider(new JacksonJsonProvider());
+        sf2.setProvider(new IllegalArgumentExceptionMapper());
         new ReactorCustomizer().customize(sf2);
         sf2.getOutInterceptors().add(new LoggingOutInterceptor());
         sf2.setResourceClasses(FluxService.class);


Mime
View raw message