cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From serg...@apache.org
Subject [1/2] cxf git commit: [CXF-6882] Adding CXF UseNioWrite extension - supported for JAXRS only at the moment
Date Wed, 07 Dec 2016 16:00:09 GMT
Repository: cxf
Updated Branches:
  refs/heads/master 70317dafc -> 58b379bdf


[CXF-6882] Adding CXF UseNioWrite extension - supported for JAXRS only at the moment


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/59436ead
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/59436ead
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/59436ead

Branch: refs/heads/master
Commit: 59436eadca147ba937a24a4f4fc6ced93cc0bc7f
Parents: 70317da
Author: Sergey Beryozkin <sberyozkin@gmail.com>
Authored: Wed Dec 7 15:59:14 2016 +0000
Committer: Sergey Beryozkin <sberyozkin@gmail.com>
Committed: Wed Dec 7 15:59:14 2016 +0000

----------------------------------------------------------------------
 .../cxf/jaxrs/nio/NioWriteListenerImpl.java     |  7 +-
 .../cxf/jaxrs/provider/BinaryDataProvider.java  | 81 +++++++++++++++++++-
 .../cxf/systest/jaxrs/nio/NioBookStore.java     | 13 +++-
 .../cxf/systest/jaxrs/nio/NioBookStoreTest.java | 14 ++++
 4 files changed, 108 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/59436ead/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java
b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java
index aed2cec..d35b4dd 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java
@@ -34,7 +34,7 @@ public final class NioWriteListenerImpl implements WriteListener {
     private final NioWriteEntity entity;
     private final DelegatingNioOutputStream out;
 
-    NioWriteListenerImpl(Continuation cont, NioWriteEntity entity, OutputStream out) {
+    public NioWriteListenerImpl(Continuation cont, NioWriteEntity entity, OutputStream out)
{
         this.cont = cont;
         this.entity = entity;
         this.out = new DelegatingNioOutputStream(out);
@@ -44,6 +44,11 @@ public final class NioWriteListenerImpl implements WriteListener {
     public void onWritePossible() throws IOException {
         while (cont.isReadyForWrite()) {
             if (!entity.getWriter().write(out)) {
+                // REVISIT:
+                // Immediately closing the async context with cont.resume() works better
+                // at the moment - with cont.resume() Jetty throws NPE in its internal code
+                // which is quite possibly a Jetty bug.
+                // Do we really need to complete the out chain after the response has been
written out ?
                 cont.resume();
                 return;
             }

http://git-wip-us.apache.org/repos/asf/cxf/blob/59436ead/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java
b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java
index 980b076..f3e9353 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java
@@ -38,19 +38,31 @@ import java.security.DigestInputStream;
 import java.util.UUID;
 import java.util.logging.Logger;
 
+import javax.servlet.WriteListener;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.NioOutputStream;
+import javax.ws.rs.core.NioWriterHandler;
 import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.ext.MessageBodyReader;
 import javax.ws.rs.ext.MessageBodyWriter;
 
+import org.apache.cxf.annotations.UseNioWrite;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.common.util.MessageDigestInputStream;
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
 import org.apache.cxf.helpers.FileUtils;
 import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.jaxrs.impl.HttpHeadersImpl;
+import org.apache.cxf.jaxrs.nio.DelegatingNioOutputStream;
+import org.apache.cxf.jaxrs.nio.NioWriteEntity;
+import org.apache.cxf.jaxrs.nio.NioWriteListenerImpl;
+import org.apache.cxf.jaxrs.utils.AnnotationUtils;
 import org.apache.cxf.jaxrs.utils.ExceptionUtils;
+import org.apache.cxf.jaxrs.utils.JAXRSUtils;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.phase.PhaseInterceptorChain;
@@ -151,12 +163,12 @@ public class BinaryDataProvider<T> extends AbstractConfigurableProvider
         throws IOException {
         
         if (InputStream.class.isAssignableFrom(o.getClass())) {
-            copyInputToOutput((InputStream)o, os, headers);
+            copyInputToOutput((InputStream)o, os, annotations, headers);
         } else if (File.class.isAssignableFrom(o.getClass())) {
             copyInputToOutput(new BufferedInputStream(
-                    new FileInputStream((File)o)), os, headers);
+                    new FileInputStream((File)o)), os, annotations, headers);
         } else if (byte[].class.isAssignableFrom(o.getClass())) {
-            copyInputToOutput(new ByteArrayInputStream((byte[])o), os, headers);
+            copyInputToOutput(new ByteArrayInputStream((byte[])o), os, annotations, headers);
         } else if (Reader.class.isAssignableFrom(o.getClass())) {
             try {
                 Writer writer = new OutputStreamWriter(os, getEncoding(type));
@@ -185,19 +197,48 @@ public class BinaryDataProvider<T> extends AbstractConfigurableProvider
     }
     
     protected void copyInputToOutput(InputStream is, OutputStream os,
-            MultivaluedMap<String, Object> outHeaders) throws IOException {
+            Annotation[] anns, MultivaluedMap<String, Object> outHeaders) throws IOException
{
         if (isRangeSupported()) {
             Message inMessage = PhaseInterceptorChain.getCurrentMessage().getExchange().getInMessage();
             handleRangeRequest(is, os, new HttpHeadersImpl(inMessage), outHeaders);
         } else {
+            boolean nioWrite = AnnotationUtils.getAnnotation(anns, UseNioWrite.class) !=
null;
+            if (nioWrite) {
+                ContinuationProvider provider = getContinuationProvider();
+                if (provider != null) {
+                    copyUsingNio(is, os, provider.getContinuation());
+                }
+                return;
+            } 
             if (closeResponseInputStream) {
                 IOUtils.copyAndCloseInput(is, os, bufferSize);
             } else {
                 IOUtils.copy(is, os, bufferSize);
             }
+            
         }
     }
     
+    protected void copyUsingNio(InputStream is, OutputStream os, Continuation cont) {
+        NioWriteListenerImpl listener = 
+            new NioWriteListenerImpl(cont, 
+                                     new NioWriteEntity(getNioHandler(is), null), 
+                                     new DelegatingNioOutputStream(os));
+        Message m = JAXRSUtils.getCurrentMessage();
+        m.put(WriteListener.class, listener);
+        // After this MBW registers the listener, JAXRSOutInterceptor is done, and the
+        // out chain will need to be resumed from the interceptor which follows it 
+        m.put("suspend.chain.on.current.interceptor", Boolean.TRUE);
+        cont.suspend(0);
+    }
+    
+    private ContinuationProvider getContinuationProvider() {
+        return (ContinuationProvider)JAXRSUtils.getCurrentMessage().getExchange()
+            .getInMessage().get(ContinuationProvider.class.getName());
+    }
+
+
+
     protected void handleRangeRequest(InputStream is, 
                                       OutputStream os,
                                       HttpHeaders inHeaders, 
@@ -231,4 +272,36 @@ public class BinaryDataProvider<T> extends AbstractConfigurableProvider
     public void setBufferSize(int bufferSize) {
         this.bufferSize = bufferSize;
     }
+    
+    protected NioWriterHandler getNioHandler(final InputStream in) {
+        
+        return new NioWriterHandler() {
+            final byte[] buffer = new byte[bufferSize];
+            @Override
+            public boolean write(NioOutputStream out) {
+                try {
+                    final int n = in.read(buffer);
+    
+                    if (n >= 0) {
+                        out.write(buffer, 0, n);
+                        return true;
+                    }
+                    if (closeResponseInputStream) {    
+                        try { 
+                            in.close(); 
+                        } catch (IOException ex) { 
+                            /* do nothing */ 
+                        }
+                    }
+                    
+                    return false;
+                } catch (IOException ex) {
+                    throw new WebApplicationException(ex);    
+                }
+                
+            } 
+        };
+            
+        
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/59436ead/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java
index bd885f5..ce2507a 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java
@@ -20,22 +20,23 @@ package org.apache.cxf.systest.jaxrs.nio;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
+import org.apache.cxf.annotations.UseNioWrite;
 import org.apache.cxf.helpers.IOUtils;
 
 @Path("/bookstore")
 public class NioBookStore {
     @GET
     @Produces(MediaType.TEXT_PLAIN)
-    public Response readBooks(@QueryParam("path") String path) throws IOException {
+    public Response readBooks() throws IOException {
         final ByteArrayInputStream in = new ByteArrayInputStream(
             IOUtils.readBytesFromStream(getClass().getResourceAsStream("/files/books.txt")));
         final byte[] buffer = new byte[4096];
@@ -66,4 +67,12 @@ public class NioBookStore {
             }
         ).build();
     }
+    
+    @GET
+    @Produces(MediaType.TEXT_PLAIN)
+    @Path("/is")
+    @UseNioWrite
+    public InputStream readBooksFromInputStream() throws IOException {
+        return getClass().getResourceAsStream("/files/books.txt");
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/59436ead/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreTest.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreTest.java
index 19b0830..c1743e3 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreTest.java
@@ -58,6 +58,20 @@ public class NioBookStoreTest extends AbstractBusClientServerTestBase {
         }
     }
     
+    @Test
+    public void testGetAllBooksIs() throws Exception {
+        final Response response = createWebClient("/bookstore/is", MediaType.TEXT_PLAIN).get();
+        
+        try {
+            assertEquals(200, response.getStatus());
+            
+            assertThat(response.readEntity(String.class), equalTo(IOUtils.readStringFromStream(
+                getClass().getResourceAsStream("/files/books.txt"))));
+        } finally {
+            response.close();
+        }
+    }
+    
     protected WebClient createWebClient(final String url, final String mediaType) {
         final List< ? > providers = Arrays.asList(new JacksonJsonProvider());
         


Mime
View raw message