cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject [8/9] cxf git commit: More updates to start getting the atmosphere based websocket stuff to work with Jetty 9.3
Date Thu, 12 Nov 2015 15:23:34 GMT
More updates to start getting the atmosphere based websocket stuff to work with Jetty 9.3


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/6c67f878
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/6c67f878
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/6c67f878

Branch: refs/heads/3.1.x-fixes
Commit: 6c67f878e2c210e7f03c4aea52b2ba2eec7d611a
Parents: 2ec8759
Author: Daniel Kulp <dkulp@apache.org>
Authored: Wed Nov 11 13:57:08 2015 -0500
Committer: Daniel Kulp <dkulp@apache.org>
Committed: Thu Nov 12 10:23:13 2015 -0500

----------------------------------------------------------------------
 .../atmosphere/DefaultProtocolInterceptor.java  | 109 ++++++++++++-------
 .../jaxrs/websocket/BookStoreWebSocket.java     |   2 +
 2 files changed, 70 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/6c67f878/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
index b70a467..54431ce 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
@@ -21,6 +21,7 @@ package org.apache.cxf.transport.websocket.atmosphere;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.Map;
@@ -32,6 +33,7 @@ import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.transport.websocket.InvalidPathException;
 import org.apache.cxf.transport.websocket.WebSocketConstants;
 import org.apache.cxf.transport.websocket.WebSocketUtils;
@@ -107,7 +109,7 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter
{
         AtmosphereRequest request = r.getRequest();
 
         if (request.getAttribute(REQUEST_DISPATCHED) == null) {
-            AtmosphereResponse response = new WrappedAtmosphereResponse(r.getResponse(),
request);
+            AtmosphereResponse response = null;
 
             AtmosphereFramework framework = r.getAtmosphereConfig().framework();
             try {
@@ -121,6 +123,7 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter
{
                 }
                 try {
                     AtmosphereRequest ar = createAtmosphereRequest(request, data);
+                    response = new WrappedAtmosphereResponse(r.getResponse(), ar);
                     ar.attributes().put(REQUEST_DISPATCHED, "true");
                     String refid = ar.getHeader(WebSocketConstants.DEFAULT_REQUEST_ID_KEY);
                     if (refid != null) {
@@ -138,12 +141,17 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter
{
                     }
                 } catch (Exception e) {
                     LOG.log(Level.WARNING, "Error during request dispatching", e);
+                    if (response == null) {
+                        response = new WrappedAtmosphereResponse(r.getResponse(), request);
+                    }
                     if (e instanceof InvalidPathException) {
-                        response.setStatus(400);
+                        response.setIntHeader(WebSocketUtils.SC_KEY, 400);
                     } else {
-                        response.setStatus(500);
+                        response.setIntHeader(WebSocketUtils.SC_KEY, 500);
                     }
-                    response.getOutputStream().write(createResponse(response, null, true));
+                    OutputStream out = response.getOutputStream();
+                    out.write(createResponse(response, null, true));
+                    out.close();
                 }
                 return Action.CANCELLED;
             } catch (IOException e) {
@@ -228,7 +236,11 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter
{
         }
         if (parent) {
             // include the status code and content-type and those matched headers
-            headers.put(WebSocketUtils.SC_KEY, Integer.toString(response.getStatus()));
+            String sc = response.getHeader(WebSocketUtils.SC_KEY); 
+            if (sc == null) {
+                sc = Integer.toString(response.getStatus());
+            }
+            headers.put(WebSocketUtils.SC_KEY, sc);            
             if (payload != null && payload.length > 0) {
                 headers.put("Content-Type",  response.getContentType());
             }
@@ -273,47 +285,62 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter
{
 
     // a workaround to flush the header data upon close when no write operation occurs  
     private class WrappedAtmosphereResponse extends AtmosphereResponse {
-        WrappedAtmosphereResponse(AtmosphereResponse resp, AtmosphereRequest req) {
-            super((HttpServletResponse)resp.getResponse(), resp.getAsyncIOWriter(), req,
resp.isDestroyable());
+        final AtmosphereResponse response;
+        final ServletOutputStream delegate;
+        ServletOutputStream sout;
+        WrappedAtmosphereResponse(AtmosphereResponse resp, AtmosphereRequest req) throws
IOException {
+            super((HttpServletResponse)resp.getResponse(), null, req, resp.isDestroyable());
+            response = resp;
+            response.request(req);
+            delegate = super.getOutputStream();
         }
 
         @Override
         public ServletOutputStream getOutputStream() throws IOException {
-            final ServletOutputStream delegate = super.getOutputStream();
-            return new ServletOutputStream() {
-                private boolean written;
-
-                @Override
-                public void write(int i) throws IOException {
-                    written = true;
-                    delegate.write(i);
-                }
-
-                @Override
-                public void close() throws IOException {
-                    if (!written) {
-                        delegate.write(createResponse(WrappedAtmosphereResponse.this, null,
true));
+            if (sout == null) {
+                sout = new ServletOutputStream() {
+                    CachedOutputStream out = new CachedOutputStream();
+                    OutputStream getOut() {
+                        if (out == null) {
+                            out = new CachedOutputStream();
+                        }
+                        return out;
+                    }                
+                    void send(boolean complete) throws IOException {
+                        if (out == null) {
+                            return;
+                        }
+                        if (response.getStatus() >= 400) {
+                            int i = response.getStatus();
+                            response.setStatus(200);
+                            response.addIntHeader(WebSocketUtils.SC_KEY, i);
+                        }
+                        out.flush();
+                        out.lockOutputStream();
+                        out.writeCacheTo(delegate);
+                        delegate.flush();
+                        out.close();
+                        out = null;
                     }
-                    delegate.close();
-                }
-
-                @Override
-                public void flush() throws IOException {
-                    delegate.flush();
-                }
-
-                @Override
-                public void write(byte[] b, int off, int len) throws IOException {
-                    written = true;
-                    delegate.write(b, off, len);
-                }
-
-                @Override
-                public void write(byte[] b) throws IOException {
-                    written = true;
-                    delegate.write(b);
-                }
-            };
+                    public void write(int i) throws IOException {
+                        getOut().write(i);
+                    }
+                    public void close() throws IOException {
+                        send(true);
+                        delegate.close();
+                    }
+                    public void flush() throws IOException {
+                        send(false);
+                    }
+                    public void write(byte[] b, int off, int len) throws IOException {
+                        getOut().write(b, off, len);
+                    }
+                    public void write(byte[] b) throws IOException {
+                        getOut().write(b);
+                    }
+                };
+            }
+            return sout;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/6c67f878/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
index 39e64fc..888ada0 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
@@ -121,6 +121,7 @@ public class BookStoreWebSocket {
                             for (int i = 2; i <= 5; i++) {
                                 Thread.sleep(500);
                                 out.write(new Book("WebSocket" + i, i));
+                                out.getEntityStream().flush();
                             }
                         } catch (Exception e) {
                             e.printStackTrace();
@@ -166,6 +167,7 @@ public class BookStoreWebSocket {
             OutputStream out = it.next();
             try {
                 out.write(("News: event " + name + " created").getBytes());
+                out.flush();
             } catch (IOException e) {
                 it.remove();
                 e.printStackTrace();


Mime
View raw message