Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 736ED185F1 for ; Wed, 11 Nov 2015 18:57:47 +0000 (UTC) Received: (qmail 86547 invoked by uid 500); 11 Nov 2015 18:57:47 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 86487 invoked by uid 500); 11 Nov 2015 18:57:47 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 86478 invoked by uid 99); 11 Nov 2015 18:57:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Nov 2015 18:57:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 16909E00DE; Wed, 11 Nov 2015 18:57:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dkulp@apache.org To: commits@cxf.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: cxf git commit: More updates to start getting the atmosphere based websocket stuff to work with Jetty 9.3 Date: Wed, 11 Nov 2015 18:57:47 +0000 (UTC) Repository: cxf Updated Branches: refs/heads/master 4f92f75aa -> 531f9a9f3 More updates to start getting the atmosphere based websocket stuff to work with Jetty 9.3 Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/531f9a9f Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/531f9a9f Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/531f9a9f Branch: refs/heads/master Commit: 531f9a9f336634edd223c25815077026b1ccf3d3 Parents: 4f92f75 Author: Daniel Kulp Authored: Wed Nov 11 13:57:08 2015 -0500 Committer: Daniel Kulp Committed: Wed Nov 11 13:57:38 2015 -0500 ---------------------------------------------------------------------- .../atmosphere/DefaultProtocolInterceptor.java | 109 ++++++++++++------- .../jaxrs/websocket/BookStoreWebSocket.java | 2 + 2 files changed, 70 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/531f9a9f/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java index b70a467..54431ce 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java @@ -21,6 +21,7 @@ package org.apache.cxf.transport.websocket.atmosphere; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.Deque; import java.util.HashMap; import java.util.Map; @@ -32,6 +33,7 @@ import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletResponse; import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.io.CachedOutputStream; import org.apache.cxf.transport.websocket.InvalidPathException; import org.apache.cxf.transport.websocket.WebSocketConstants; import org.apache.cxf.transport.websocket.WebSocketUtils; @@ -107,7 +109,7 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter { AtmosphereRequest request = r.getRequest(); if (request.getAttribute(REQUEST_DISPATCHED) == null) { - AtmosphereResponse response = new WrappedAtmosphereResponse(r.getResponse(), request); + AtmosphereResponse response = null; AtmosphereFramework framework = r.getAtmosphereConfig().framework(); try { @@ -121,6 +123,7 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter { } try { AtmosphereRequest ar = createAtmosphereRequest(request, data); + response = new WrappedAtmosphereResponse(r.getResponse(), ar); ar.attributes().put(REQUEST_DISPATCHED, "true"); String refid = ar.getHeader(WebSocketConstants.DEFAULT_REQUEST_ID_KEY); if (refid != null) { @@ -138,12 +141,17 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter { } } catch (Exception e) { LOG.log(Level.WARNING, "Error during request dispatching", e); + if (response == null) { + response = new WrappedAtmosphereResponse(r.getResponse(), request); + } if (e instanceof InvalidPathException) { - response.setStatus(400); + response.setIntHeader(WebSocketUtils.SC_KEY, 400); } else { - response.setStatus(500); + response.setIntHeader(WebSocketUtils.SC_KEY, 500); } - response.getOutputStream().write(createResponse(response, null, true)); + OutputStream out = response.getOutputStream(); + out.write(createResponse(response, null, true)); + out.close(); } return Action.CANCELLED; } catch (IOException e) { @@ -228,7 +236,11 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter { } if (parent) { // include the status code and content-type and those matched headers - headers.put(WebSocketUtils.SC_KEY, Integer.toString(response.getStatus())); + String sc = response.getHeader(WebSocketUtils.SC_KEY); + if (sc == null) { + sc = Integer.toString(response.getStatus()); + } + headers.put(WebSocketUtils.SC_KEY, sc); if (payload != null && payload.length > 0) { headers.put("Content-Type", response.getContentType()); } @@ -273,47 +285,62 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter { // a workaround to flush the header data upon close when no write operation occurs private class WrappedAtmosphereResponse extends AtmosphereResponse { - WrappedAtmosphereResponse(AtmosphereResponse resp, AtmosphereRequest req) { - super((HttpServletResponse)resp.getResponse(), resp.getAsyncIOWriter(), req, resp.isDestroyable()); + final AtmosphereResponse response; + final ServletOutputStream delegate; + ServletOutputStream sout; + WrappedAtmosphereResponse(AtmosphereResponse resp, AtmosphereRequest req) throws IOException { + super((HttpServletResponse)resp.getResponse(), null, req, resp.isDestroyable()); + response = resp; + response.request(req); + delegate = super.getOutputStream(); } @Override public ServletOutputStream getOutputStream() throws IOException { - final ServletOutputStream delegate = super.getOutputStream(); - return new ServletOutputStream() { - private boolean written; - - @Override - public void write(int i) throws IOException { - written = true; - delegate.write(i); - } - - @Override - public void close() throws IOException { - if (!written) { - delegate.write(createResponse(WrappedAtmosphereResponse.this, null, true)); + if (sout == null) { + sout = new ServletOutputStream() { + CachedOutputStream out = new CachedOutputStream(); + OutputStream getOut() { + if (out == null) { + out = new CachedOutputStream(); + } + return out; + } + void send(boolean complete) throws IOException { + if (out == null) { + return; + } + if (response.getStatus() >= 400) { + int i = response.getStatus(); + response.setStatus(200); + response.addIntHeader(WebSocketUtils.SC_KEY, i); + } + out.flush(); + out.lockOutputStream(); + out.writeCacheTo(delegate); + delegate.flush(); + out.close(); + out = null; } - delegate.close(); - } - - @Override - public void flush() throws IOException { - delegate.flush(); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - written = true; - delegate.write(b, off, len); - } - - @Override - public void write(byte[] b) throws IOException { - written = true; - delegate.write(b); - } - }; + public void write(int i) throws IOException { + getOut().write(i); + } + public void close() throws IOException { + send(true); + delegate.close(); + } + public void flush() throws IOException { + send(false); + } + public void write(byte[] b, int off, int len) throws IOException { + getOut().write(b, off, len); + } + public void write(byte[] b) throws IOException { + getOut().write(b); + } + }; + } + return sout; } } } http://git-wip-us.apache.org/repos/asf/cxf/blob/531f9a9f/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java index 39e64fc..888ada0 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java @@ -121,6 +121,7 @@ public class BookStoreWebSocket { for (int i = 2; i <= 5; i++) { Thread.sleep(500); out.write(new Book("WebSocket" + i, i)); + out.getEntityStream().flush(); } } catch (Exception e) { e.printStackTrace(); @@ -166,6 +167,7 @@ public class BookStoreWebSocket { OutputStream out = it.next(); try { out.write(("News: event " + name + " created").getBytes()); + out.flush(); } catch (IOException e) { it.remove(); e.printStackTrace();