cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From serg...@apache.org
Subject git commit: [CXF-5639] Introducing StreamingResponse, can be used with/without WebSocket
Date Tue, 25 Mar 2014 10:59:46 GMT
Repository: cxf
Updated Branches:
  refs/heads/master ed0ab4cb9 -> 3f1542f09


[CXF-5639] Introducing StreamingResponse, can be used with/without WebSocket


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/3f1542f0
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/3f1542f0
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/3f1542f0

Branch: refs/heads/master
Commit: 3f1542f09664fc185546e24ffb4699233da0eec2
Parents: ed0ab4c
Author: Sergey Beryozkin <sberyozkin@talend.com>
Authored: Tue Mar 25 10:59:23 2014 +0000
Committer: Sergey Beryozkin <sberyozkin@talend.com>
Committed: Tue Mar 25 10:59:23 2014 +0000

----------------------------------------------------------------------
 .../apache/cxf/jaxrs/ext/StreamingResponse.java |  30 ++++++
 .../provider/StreamingResponseProvider.java     | 104 +++++++++++++++++++
 .../jaxrs/websocket/BookServerWebSocket.java    |   3 +
 .../jaxrs/websocket/BookStoreWebSocket.java     |  24 +++++
 .../JAXRSClientServerWebSocketTest.java         |  33 ++++++
 .../resources/jaxrs_websocket/WEB-INF/beans.xml |   3 +
 .../jaxrs_websocket/beans-embedded.xml          |   3 +
 7 files changed, 200 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/3f1542f0/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/StreamingResponse.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/StreamingResponse.java
b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/StreamingResponse.java
new file mode 100644
index 0000000..e16a4ef
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/StreamingResponse.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.ext;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface StreamingResponse<T> {
+    interface Writer<T> {
+        void write(T data) throws IOException;
+        OutputStream getEntityStream();
+    }
+    void writeTo(Writer<T> writer) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/3f1542f0/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java
b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java
new file mode 100644
index 0000000..1154df8
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.provider;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Providers;
+
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+import org.apache.cxf.jaxrs.utils.InjectionUtils;
+
+public class StreamingResponseProvider<T> implements 
+    MessageBodyWriter<StreamingResponse<T>> {
+
+    @Context
+    private Providers providers;
+    
+    @Override
+    public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, MediaType
mt) {
+        return StreamingResponse.class.isAssignableFrom(cls);
+    }
+
+    @Override
+    public void writeTo(StreamingResponse<T> p, Class<?> cls, Type t, Annotation[]
anns,
+                        MediaType mt, MultivaluedMap<String, Object> headers, OutputStream
os)
+        throws IOException, WebApplicationException {
+        Class<?> actualCls = InjectionUtils.getActualType(t);
+        @SuppressWarnings("unchecked")
+        MessageBodyWriter<T> writer = 
+            (MessageBodyWriter<T>)providers.getMessageBodyWriter(actualCls, actualCls,
anns, mt);
+        if (writer == null) {
+            throw new InternalServerErrorException();
+        }
+        //TODO: review the possibility of caching the providers
+        StreamingProviderWriterImpl thewriter = 
+            new StreamingProviderWriterImpl(writer, actualCls, anns, mt, headers, os);
+        p.writeTo(thewriter);
+    }
+
+    @Override
+    public long getSize(StreamingResponse<T> arg0, Class<?> arg1, Type arg2,
Annotation[] arg3, MediaType arg4) {
+        return -1;
+    }
+    
+    private class StreamingProviderWriterImpl implements StreamingResponse.Writer<T>
{
+        private MessageBodyWriter<T> writer;
+        private Class<?> cls;
+        private MediaType mt;
+        private Annotation[] anns;
+        private MultivaluedMap<String, Object> headers; 
+        private OutputStream os;
+                
+        public StreamingProviderWriterImpl(MessageBodyWriter<T> writer, 
+                                           Class<?> cls,
+                                           Annotation[] anns,
+                                           MediaType mt,
+                                           MultivaluedMap<String, Object> headers,

+                                           OutputStream os) {
+            this.writer = writer;
+            this.cls = cls;
+            this.anns = anns;
+            this.mt = mt;
+            this.headers = headers;
+            this.os = os;
+        }
+        
+        @Override
+        public void write(T data) throws IOException {
+            writer.writeTo(data, cls, cls, anns, mt, headers, os);
+            
+        }
+
+        @Override
+        public OutputStream getEntityStream() {
+            return os;
+        }
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/3f1542f0/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookServerWebSocket.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookServerWebSocket.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookServerWebSocket.java
index 5938f09..33c1b52 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookServerWebSocket.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookServerWebSocket.java
@@ -23,6 +23,8 @@ import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
+import org.apache.cxf.systest.jaxrs.Book;
 import org.apache.cxf.systest.jaxrs.BookStorePerRequest;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
     
@@ -42,6 +44,7 @@ public class BookServerWebSocket extends AbstractBusTestServerBase {
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
         sf.setBus(bus);
         sf.setResourceClasses(BookStoreWebSocket.class, BookStorePerRequest.class);
+        sf.setProvider(new StreamingResponseProvider<Book>());
         sf.setResourceProvider(BookStoreWebSocket.class,
                                new SingletonResourceProvider(new BookStoreWebSocket(), true));
         sf.setAddress("ws://localhost:" + PORT + "/websocket");

http://git-wip-us.apache.org/repos/asf/cxf/blob/3f1542f0/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
index e658ca6..d2c70e1 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
@@ -36,6 +36,7 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.StreamingOutput;
 
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
 import org.apache.cxf.systest.jaxrs.Book;
 
 @Path("/web/bookstore")
@@ -99,6 +100,29 @@ public class BookStoreWebSocket {
         };
     }
     
+    @GET
+    @Path("/bookstream")
+    @Produces("application/json")
+    public StreamingResponse<Book> getBookStream() {
+        return new StreamingResponse<Book>() {
+            public void writeTo(final StreamingResponse.Writer<Book> out) throws IOException
{
+                out.write(new Book("WebSocket1", 1));
+                executor.execute(new Runnable() {
+                    public void run() {
+                        try {
+                            for (int i = 2; i <= 5; i++) {
+                                Thread.sleep(500);
+                                out.write(new Book("WebSocket" + i, i));
+                            }
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
+            }
+        };
+    }
+    
 }
 
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/3f1542f0/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
index df0b5aa..1c32f85 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
@@ -128,6 +128,39 @@ public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestB
     }
     
     @Test
+    public void testGetBookStream() throws Exception {
+        String address = "ws://localhost:" + getPort() + "/websocket/web/bookstore";
+
+        WebSocketTestClient wsclient = new WebSocketTestClient(address);
+        wsclient.connect();
+        try {
+            wsclient.reset(5);
+            wsclient.sendMessage(
+                "GET /websocket/web/bookstore/bookstream\r\nAccept: application/json\r\n\r\n".getBytes());
+            assertTrue("response expected", wsclient.await(5));
+            List<WebSocketTestClient.Response> received = wsclient.getReceivedResponses();
+            assertEquals(5, received.size());
+            WebSocketTestClient.Response resp = received.get(0);
+            assertEquals(200, resp.getStatusCode());
+            assertEquals("application/json", resp.getContentType());
+            String value = resp.getTextEntity();
+            assertEquals(value, getBookJson(1));
+            for (int i = 2; i <= 5; i++) {
+                // subsequent data should not carry the headers nor the status.
+                resp = received.get(i - 1);
+                assertEquals(0, resp.getStatusCode());
+                assertEquals(resp.getTextEntity(), getBookJson(i));
+            }
+        } finally {
+            wsclient.close();
+        }
+    }
+    
+    private String getBookJson(int index) {
+        return "{\"Book\":{\"id\":" + index + ",\"name\":\"WebSocket" + index + "\"}}";
+    }
+    
+    @Test
     public void testBookWithWebSocketAndHTTP() throws Exception {
         String address = "ws://localhost:" + getPort() + "/websocket/web/bookstore";
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/3f1542f0/systests/jaxrs/src/test/resources/jaxrs_websocket/WEB-INF/beans.xml
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/resources/jaxrs_websocket/WEB-INF/beans.xml b/systests/jaxrs/src/test/resources/jaxrs_websocket/WEB-INF/beans.xml
index 3d79ea0..9f4d006 100644
--- a/systests/jaxrs/src/test/resources/jaxrs_websocket/WEB-INF/beans.xml
+++ b/systests/jaxrs/src/test/resources/jaxrs_websocket/WEB-INF/beans.xml
@@ -40,6 +40,9 @@
         <jaxrs:serviceBeans>
             <ref bean="serviceBean"/>
         </jaxrs:serviceBeans>
+        <jaxrs:providers>
+            <bean class="org.apache.cxf.jaxrs.provider.StreamingResponseProvider"/>
+        </jaxrs:providers>
     </jaxrs:server>
     <jaxrs:server id="bookserviceHTTP" address="/http">
         <jaxrs:serviceBeans>

http://git-wip-us.apache.org/repos/asf/cxf/blob/3f1542f0/systests/jaxrs/src/test/resources/jaxrs_websocket/beans-embedded.xml
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/resources/jaxrs_websocket/beans-embedded.xml b/systests/jaxrs/src/test/resources/jaxrs_websocket/beans-embedded.xml
index fb4206b..b722ea1 100644
--- a/systests/jaxrs/src/test/resources/jaxrs_websocket/beans-embedded.xml
+++ b/systests/jaxrs/src/test/resources/jaxrs_websocket/beans-embedded.xml
@@ -40,6 +40,9 @@
         <jaxrs:serviceBeans>
             <ref bean="serviceBean"/>
         </jaxrs:serviceBeans>
+        <jaxrs:providers>
+            <bean class="org.apache.cxf.jaxrs.provider.StreamingResponseProvider"/>
+        </jaxrs:providers>
     </jaxrs:server>
 </beans>
 <!-- END SNIPPET: beans -->


Mime
View raw message