Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F1DD419AB7 for ; Tue, 5 Apr 2016 12:35:15 +0000 (UTC) Received: (qmail 597 invoked by uid 500); 5 Apr 2016 12:35:10 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 389 invoked by uid 500); 5 Apr 2016 12:35:10 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 223 invoked by uid 99); 5 Apr 2016 12:35:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Apr 2016 12:35:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6430FE0A2A; Tue, 5 Apr 2016 12:35:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ay@apache.org To: commits@cxf.apache.org Date: Tue, 05 Apr 2016 12:35:13 -0000 Message-Id: <375a60b869df473b8a433d07a7d70c9b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/8] cxf git commit: [CXF-5855] enable atmosphere's sse handling; update the sample [CXF-5855] enable atmosphere's sse handling; update the sample Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/a529d270 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/a529d270 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/a529d270 Branch: refs/heads/3.1.x-fixes Commit: a529d270668dabd1cfa6b040769cb25af97973db Parents: 91197fd Author: Akitoshi Yoshida Authored: Thu Mar 17 00:11:58 2016 +0100 Committer: Akitoshi Yoshida Committed: Tue Apr 5 13:36:57 2016 +0200 ---------------------------------------------------------------------- .../release/samples/jax_rs/websocket/README.txt | 26 ++- .../release/samples/jax_rs/websocket/pom.xml | 10 +- .../java/demo/jaxrs/server/CustomerService.java | 105 ++++++++- .../websocket/src/test/resources/client.js | 232 +++++++++++++++++++ .../websocket/atmosphere/AtmosphereUtils.java | 18 ++ .../AtmosphereWebSocketJettyDestination.java | 3 +- .../AtmosphereWebSocketServletDestination.java | 3 +- .../atmosphere/DefaultProtocolInterceptor.java | 90 ++++++- 8 files changed, 463 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/distribution/src/main/release/samples/jax_rs/websocket/README.txt ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/websocket/README.txt b/distribution/src/main/release/samples/jax_rs/websocket/README.txt index 09c43e8..f1d3455 100644 --- a/distribution/src/main/release/samples/jax_rs/websocket/README.txt +++ b/distribution/src/main/release/samples/jax_rs/websocket/README.txt @@ -6,6 +6,13 @@ This is a websocket transport version of JAX-RS Basic Demo. A RESTful customer service is provided on URL ws://localhost:9000/customers. Users access this URI to operate on customer. +This sample includes two convenient clients: a plain javascript browser client +and a node.js client based on atmosphere. + + +Connecting to the server +--------------------------------------- + Open a websocket to ws://localhost:9000/ and send requests over the websocket. A GET request to path /customerservice/customers/123 @@ -102,7 +109,6 @@ Please review the README in the samples directory before continuing. - Building and running the demo using maven --------------------------------------- @@ -118,6 +124,8 @@ Using either UNIX or Windows: To remove the target dir, run mvn clean". +Using Javascript client in Browser +-------- Using a web browser that natively supports WebSocket (Safari, Chrome, Firefox): After starting the server (see above), open the index.html page located at @@ -140,3 +148,19 @@ Content-Type: text/xml; charset="utf-8" Jack ------------------------------------------------------------------------ + + +Using Node.js client +-------- + +Go to samples/jax_rs/websocket/src/test/resources and at the console + +Assuming node (>=v4) and npm are installed, execute the following shell commands. + +% npm install atmosphere.js +% node client.js + +This client program supports websocket and sse and allows +you to choose your preferred protocol. + + http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/distribution/src/main/release/samples/jax_rs/websocket/pom.xml ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/websocket/pom.xml b/distribution/src/main/release/samples/jax_rs/websocket/pom.xml index abe9d3b..353e544 100644 --- a/distribution/src/main/release/samples/jax_rs/websocket/pom.xml +++ b/distribution/src/main/release/samples/jax_rs/websocket/pom.xml @@ -32,6 +32,7 @@ ${project.version} 1.8.5 + 2.3.7 8.1.15.v20140411 9.2.2.v20140723 ${cxf.jetty8.version} @@ -207,6 +208,13 @@ org.springframework spring-core - + + + + org.atmosphere + atmosphere-runtime + ${cxf.atmosphere.version} + + http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java b/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java index 967e978..cc039ff 100644 --- a/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java +++ b/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java @@ -25,6 +25,9 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.servlet.http.HttpServletResponse; import javax.ws.rs.DELETE; import javax.ws.rs.GET; @@ -43,13 +46,15 @@ import org.apache.cxf.transport.websocket.WebSocketConstants; @Path("/customerservice/") @Produces("text/xml") public class CustomerService { + private static final int MAX_ERROR_COUNT = 5; private static ExecutorService executor = Executors.newSingleThreadExecutor(); long currentId = 123; Map customers = new HashMap(); Map orders = new HashMap(); - Map monitors = new HashMap(); - + Map> monitors = new HashMap>(); + Map> monitors2 = new HashMap>(); + public CustomerService() { init(); } @@ -60,7 +65,9 @@ public class CustomerService { System.out.println("----invoking getCustomer, Customer id is: " + id); long idNumber = Long.parseLong(id); Customer customer = customers.get(idNumber); - sendCustomerEvent("retrieved", customer); + if (customer != null) { + sendCustomerEvent("retrieved", customer); + } return customer; } @@ -129,35 +136,83 @@ public class CustomerService { final String key = reqid == null ? "*" : reqid; return new StreamingOutput() { public void write(final OutputStream out) throws IOException, WebApplicationException { - monitors.put(key, out); + monitors.put(key, new WriterHolder(out, MAX_ERROR_COUNT)); out.write(("Subscribed at " + new java.util.Date()).getBytes()); } + }; } @GET + @Path("/monitor2") + @Produces("text/*") + public void monitorCustomers2( + final @javax.ws.rs.core.Context HttpServletResponse httpResponse, + @HeaderParam(WebSocketConstants.DEFAULT_REQUEST_ID_KEY) String reqid) { + final String key = reqid == null ? "*" : reqid; + monitors2.put(key, new WriterHolder(httpResponse, MAX_ERROR_COUNT)); + try { + httpResponse.getOutputStream().write(("Subscribed at " + new java.util.Date()).getBytes()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @GET @Path("/unmonitor/{key}") @Produces("text/*") public String unmonitorCustomers(@PathParam("key") String key) { return (monitors.remove(key) != null ? "Removed: " : "Already removed: ") + key; } + @GET + @Path("/unmonitor2/{key}") + @Produces("text/*") + public String unmonitorCustomers2(@PathParam("key") String key) { + return (monitors2.remove(key) != null ? "Removed: " : "Already removed: ") + key; + } + private void sendCustomerEvent(final String msg, final Customer customer) { executor.execute(new Runnable() { public void run() { try { String t = msg + ": " + customer.getId() + "/" + customer.getName(); - for (Iterator it = monitors.values().iterator(); it.hasNext();) { - OutputStream out = it.next(); + for (Iterator> it = monitors.values().iterator(); it.hasNext();) { + WriterHolder wh = it.next(); try { - out.write(t.getBytes()); + wh.getValue().write(t.getBytes()); + wh.getValue().flush(); + wh.reset(); } catch (IOException e) { - try { - out.close(); - } catch (IOException e2) { - // ignore; + System.out.println("----error writing to " + wh.getValue() + " " + wh.get()); + if (wh.increment()) { + try { + wh.getValue().close(); + } catch (IOException e2) { + // ignore; + } + it.remove(); + System.out.println("----purged " + wh.getValue()); + } + } + } + for (Iterator> it = monitors2.values().iterator(); it.hasNext();) { + WriterHolder wh = it.next(); + try { + wh.getValue().getOutputStream().write(t.getBytes()); + wh.getValue().getOutputStream().flush(); + wh.reset(); + } catch (IOException e) { + System.out.println("----error writing to " + wh.getValue() + " " + wh.get()); + if (wh.increment()) { + try { + wh.getValue().getOutputStream().close(); + } catch (IOException e2) { + // ignore; + } + it.remove(); + System.out.println("----purged " + wh.getValue()); } - it.remove(); } } } catch (Exception e) { @@ -183,4 +238,30 @@ public class CustomerService { orders.put(o.getId(), o); } + private static class WriterHolder { + final private T value; + final private int max; + final private AtomicInteger errorCount; + + public WriterHolder(T object, int max) { + this.value = object; + this.max = max; + this.errorCount = new AtomicInteger(); + } + + public T getValue() { + return value; + } + + public int get() { + return errorCount.get(); + } + public boolean increment() { + return max < errorCount.getAndIncrement(); + } + + public void reset() { + errorCount.getAndSet(0); + } + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/distribution/src/main/release/samples/jax_rs/websocket/src/test/resources/client.js ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/websocket/src/test/resources/client.js b/distribution/src/main/release/samples/jax_rs/websocket/src/test/resources/client.js new file mode 100644 index 0000000..7eb55f3 --- /dev/null +++ b/distribution/src/main/release/samples/jax_rs/websocket/src/test/resources/client.js @@ -0,0 +1,232 @@ +/** + * client.js + * + * A client program to interact with samples/jax_rs/websocket's server. + * + * + */ + +"use strict"; + +var HOST_URL = 'http://localhost:9100/'; + +var reader = require('readline'); +var prompt = reader.createInterface(process.stdin, process.stdout); + +var atmosphere = require('atmosphere.js'); + +var request = { url: HOST_URL, + transport : 'websocket', + trackMessageLength: false, + dropHeaders: false, + reconnectInterval : 5000}; +var isopen = false; + +const TRANSPORT_NAMES = ["websocket", "sse"]; + +const COMMAND_LIST = + [["add name", "Add a new consumer and return the customer instance."], + ["delete id", "Delete the customer."], + ["get id", "Return the customere."], + ["quit", "Quit the application."], + ["subscribe", "Subscribe to the customer updatese."], + ["unsubscribe", "Unsubscribe from the customer updatese."], + ["update id name", "Update the customer."]]; + +function selectOption(c, opts) { + var i = parseInt(c); + if (!(i >= 0 && i < opts.length)) { + console.log('Invalid selection: ' + c + '; Using ' + opts[0]); + i = 0; + } + return opts[i]; +} + +function getArgs(name, msg) { + var sp = name.length; + if (msg.length > name.length && msg.charAt(name.length) != ' ') { + // remove the command suffix + sp = msg.indexOf(' ', name.length); + if (sp < 0) { + sp = msg.length; + } + } + return msg.substring(sp).trim().split(' '); +} + +function createAddCustomerPayload(name) { + return "\n\n " + name + "\n\n"; +} + +function createUpdateCustomerPayload(id, name) { + return "\n\n " + name + "\n " + id + "\n\n"; +} + +/// + +function doHelp() { + console.log('Available commands'); + for (var i = 0; i < COMMAND_LIST.length; i++) { + var c = COMMAND_LIST[i][0]; + console.log(c + " ".substring(0, 20 - c.length) + COMMAND_LIST[i][1]); + } +} + +function doAdd(v) { + var req; + if (transport == 'websocket') { + req = "POST /customerservice/customers\r\nContent-Type: text/xml; charset='utf-8'\r\nAccept: text/xml\r\n\r\n" + + createAddCustomerPayload(v[0]); + } else if (transport == 'sse') { + req = {"method": "POST", "url": HOST_URL + "customerservice/customers", "headers": {"content-type": "text/xml; charset=utf-8", "accept": "text/xml"}, "data": createAddCustomerPayload(v[0])} + } + console.log("TRACE: sending ", req); + subSocket.push(req); +} + +function doDelete(v) { + var req; + if (transport == 'websocket') { + req = "DELETE /customerservice/customers/" + v[0]; + } else if (transport == 'sse') { + req = {"method": "DELETE", "url": HOST_URL + "customerservice/customers/" + v[0]} + } + console.log("TRACE: sending ", req); + subSocket.push(req); +} + +function doGet(v) { + var req; + if (transport == 'websocket') { + req = "GET /customerservice/customers/" + v[0]; + } else if (transport == 'sse') { + req = {"method": "GET", "url": HOST_URL + "customerservice/customers/" + v[0]} + } + console.log("TRACE: sending ", req); + subSocket.push(req); +} + +function doSubscribe() { + var req; + if (transport == 'websocket') { + req = "GET /customerservice/monitor\r\nAccept: text/plain\r\n"; + } else if (transport == 'sse') { + req = {"method": "GET", "url": HOST_URL + "customerservice/monitor2", "headers": {"accept": "text/plain"}} + } + console.log("TRACE: sending ", req); + subSocket.push(req); +} + +function doUnsubscribe() { + var req; + if (transport == 'websocket') { + req = "GET /customerservice/unmonitor/*\r\nAccept: text/plain\r\n"; + } else if (transport == 'sse') { + req = {"method": "GET", "url": HOST_URL + "customerservice/unmonitor2/*", "headers": {"accept": "text/plain"}} + } + console.log("TRACE: sending ", req); + subSocket.push(req); +} + +function doUpdate(v) { + var req; + if (transport == 'websocket') { + req = "PUT /customerservice/customers\r\nContent-Type: text/xml; charset='utf-8'\r\nAccept: text/xml\r\n\r\n" + + createUpdateCustomerPayload(v[0], v[1]); + } else if (transport == 'sse') { + req = {"method": "PUT", "url": HOST_URL + "customerservice/customers", "headers": {"content-type": "text/xml; charset=utf-8", "accept": "text/xml"}, "data": createUpdateCustomerPayload(v[0], v[1])} + } + console.log("TRACE: sending ", req); + subSocket.push(req); +} + +function doQuit() { + subSocket.close(); + process.exit(0); +} + +/// + +request.onOpen = function(response) { + isopen = true; + console.log('Connected using ' + response.transport); + prompt.setPrompt("> ", 2); + prompt.prompt(); +}; + +request.onMessage = function (response) { + var message = response.responseBody; + console.log('Received: ', message); + console.log('------------------------------------------------------------------------'); + prompt.setPrompt("> ", 2); + prompt.prompt(); +}; + +request.onReconnect = function(response) { + console.log('Reconnecting ...'); +} + +request.onReopen = function(response) { + isopen = true; + console.log('Reconnected using ' + response.transport); + prompt.setPrompt("> ", 2); + prompt.prompt(); +} + +request.onClose = function(response) { + isopen = false; +} + +request.onError = function(response) { + console.log("Sorry, something went wrong: " + response.responseBody); +}; + +var transport = null; +var subSocket = null; +var author = null; + +console.log("Select transport ..."); +for (var i = 0; i < TRANSPORT_NAMES.length; i++) { + console.log(i + ": " + TRANSPORT_NAMES[i]); +} +prompt.setPrompt("select: ", 6); +prompt.prompt(); + +prompt. +on('line', function(line) { + var msg = line.trim(); + if (transport == null) { + transport = selectOption(msg, TRANSPORT_NAMES); + request.transport = transport; + subSocket = atmosphere.subscribe(request); + console.log("Connecting using " + transport); + setTimeout(function() { + if (!isopen) { + console.log("Unable to open a connection. Terminated."); + process.exit(0); + } + }, 3000); + } else if (msg.length == 0) { + doHelp(); + } else if (msg.indexOf("add") == 0) { + doAdd(getArgs("add", msg)); + } else if (msg.indexOf("del") == 0) { + doDelete(getArgs("del", msg)); + } else if (msg.indexOf("get") == 0) { + doGet(getArgs("get", msg)); + } else if (msg.indexOf("quit") == 0) { + doQuit(); + } else if (msg.indexOf("sub") == 0) { + doSubscribe(getArgs("sub", msg)); + } else if (msg.indexOf("unsub") == 0) { + doUnsubscribe(getArgs("unsub", msg)); + } else if (msg.indexOf("update") == 0) { + doUpdate(getArgs("update", msg)); + } + prompt.setPrompt("> ", 2); + prompt.prompt(); +}). +on('close', function() { + console.log("close"); + process.exit(0); +}); http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java index 1a4a9b5..079792c 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java @@ -21,10 +21,18 @@ package org.apache.cxf.transport.websocket.atmosphere; import java.util.List; +import javax.servlet.http.HttpServletRequest; + import org.apache.cxf.Bus; import org.apache.cxf.helpers.CastUtils; import org.atmosphere.cpr.AtmosphereFramework; import org.atmosphere.cpr.AtmosphereInterceptor; +import org.atmosphere.cpr.HeaderConfig; +import org.atmosphere.interceptor.CacheHeadersInterceptor; +import org.atmosphere.interceptor.HeartbeatInterceptor; +import org.atmosphere.interceptor.JavaScriptProtocol; +import org.atmosphere.interceptor.SSEAtmosphereInterceptor; +import org.atmosphere.util.Utils; /** * @@ -36,6 +44,10 @@ public final class AtmosphereUtils { public static void addInterceptors(AtmosphereFramework framework, Bus bus) { Object ais = bus.getProperty("atmosphere.interceptors"); + // pre-install those atmosphere default interceptors before the custom interceptors. + framework.interceptor(new CacheHeadersInterceptor()).interceptor(new HeartbeatInterceptor()) + .interceptor(new SSEAtmosphereInterceptor()).interceptor(new JavaScriptProtocol()); + if (ais == null || ais instanceof AtmosphereInterceptor) { framework.interceptor(ais == null ? new DefaultProtocolInterceptor() : (AtmosphereInterceptor)ais); @@ -43,9 +55,15 @@ public final class AtmosphereUtils { } if (ais instanceof List) { List icps = CastUtils.cast((List)ais); + // add the custom interceptors for (AtmosphereInterceptor icp : icps) { framework.interceptor(icp); } } } + + public static boolean useAtmosphere(HttpServletRequest req) { + return Utils.webSocketEnabled(req) + || req.getParameter(HeaderConfig.X_ATMOSPHERE_TRANSPORT) != null; + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java index 0e02923..d0cc806 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java @@ -45,7 +45,6 @@ import org.atmosphere.cpr.AtmosphereRequest; import org.atmosphere.cpr.AtmosphereResource; import org.atmosphere.cpr.AtmosphereResponse; import org.atmosphere.handler.AbstractReflectorAtmosphereHandler; -import org.atmosphere.util.Utils; import org.eclipse.jetty.server.Request; @@ -119,7 +118,7 @@ public class AtmosphereWebSocketJettyDestination extends JettyHTTPDestination im @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { - if (Utils.webSocketEnabled(request)) { + if (AtmosphereUtils.useAtmosphere(request)) { try { framework.doCometSupport(AtmosphereRequest.wrap(request), AtmosphereResponse.wrap(response)); http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java index 6459150..a4e702c 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java @@ -41,7 +41,6 @@ import org.atmosphere.cpr.AtmosphereRequest; import org.atmosphere.cpr.AtmosphereResource; import org.atmosphere.cpr.AtmosphereResponse; import org.atmosphere.handler.AbstractReflectorAtmosphereHandler; -import org.atmosphere.util.Utils; /** * @@ -69,7 +68,7 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im @Override public void invoke(ServletConfig config, ServletContext context, HttpServletRequest req, HttpServletResponse resp) throws IOException { - if (Utils.webSocketEnabled(req)) { + if (AtmosphereUtils.useAtmosphere(req)) { try { framework.doCometSupport(AtmosphereRequest.wrap(req), AtmosphereResponse.wrap(resp)); http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java index 1a2cd9a..2631d51 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java @@ -37,8 +37,8 @@ import org.apache.cxf.io.CachedOutputStream; import org.apache.cxf.transport.websocket.InvalidPathException; import org.apache.cxf.transport.websocket.WebSocketConstants; import org.apache.cxf.transport.websocket.WebSocketUtils; -import org.atmosphere.config.service.AtmosphereInterceptorService; import org.atmosphere.cpr.Action; +import org.atmosphere.cpr.ApplicationConfig; import org.atmosphere.cpr.AsyncIOInterceptor; import org.atmosphere.cpr.AsyncIOInterceptorAdapter; import org.atmosphere.cpr.AsyncIOWriter; @@ -48,20 +48,24 @@ import org.atmosphere.cpr.AtmosphereInterceptorAdapter; import org.atmosphere.cpr.AtmosphereInterceptorWriter; import org.atmosphere.cpr.AtmosphereRequest; import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.AtmosphereResourceEvent; +import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter; import org.atmosphere.cpr.AtmosphereResponse; import org.atmosphere.cpr.FrameworkConfig; /** * DefaultProtocolInterceptor provides the default CXF's WebSocket protocol that uses. * + * This interceptor is automatically engaged when no atmosphere interceptor is configured. */ -@AtmosphereInterceptorService public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter { private static final Logger LOG = LogUtils.getL7dLogger(DefaultProtocolInterceptor.class); private static final String REQUEST_DISPATCHED = "request.dispatched"; private static final String RESPONSE_PARENT = "response.parent"; + private Map suspendedResponses = new HashMap(); + private final AsyncIOInterceptor interceptor = new Interceptor(); private Pattern includedheaders; @@ -102,10 +106,77 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter { this.excludedheaders = excludedheaders; } - @SuppressWarnings("deprecation") @Override public Action inspect(final AtmosphereResource r) { LOG.log(Level.FINE, "inspect"); + if (AtmosphereResource.TRANSPORT.WEBSOCKET != r.transport() + && AtmosphereResource.TRANSPORT.SSE != r.transport() + && AtmosphereResource.TRANSPORT.POLLING != r.transport()) { + LOG.fine("Skipping ignorable request"); + return Action.CONTINUE; + } + if (AtmosphereResource.TRANSPORT.POLLING == r.transport()) { + final String saruuid = (String)r.getRequest() + .getAttribute(ApplicationConfig.SUSPENDED_ATMOSPHERE_RESOURCE_UUID); + final AtmosphereResponse suspendedResponse = suspendedResponses.get(saruuid); + LOG.fine("Attaching a proxy writer to suspended response"); + r.getResponse().asyncIOWriter(new AtmosphereInterceptorWriter() { + @Override + public AsyncIOWriter write(AtmosphereResponse r, String data) throws IOException { + suspendedResponse.write(data); + suspendedResponse.flushBuffer(); + return this; + } + + @Override + public AsyncIOWriter write(AtmosphereResponse r, byte[] data) throws IOException { + suspendedResponse.write(data); + suspendedResponse.flushBuffer(); + return this; + } + + @Override + public AsyncIOWriter write(AtmosphereResponse r, byte[] data, int offset, int length) + throws IOException { + suspendedResponse.write(data, offset, length); + suspendedResponse.flushBuffer(); + return this; + } + }); + // REVISIT we need to keep this response's asyncwriter alive so that data can be written to the + // suspended response, but investigate if there is a better alternative. + r.getResponse().destroyable(false); + return Action.CONTINUE; + } + + r.addEventListener(new AtmosphereResourceEventListenerAdapter() { + @Override + public void onSuspend(AtmosphereResourceEvent event) { + final String srid = (String)event.getResource().getRequest() + .getAttribute(ApplicationConfig.SUSPENDED_ATMOSPHERE_RESOURCE_UUID); + LOG.log(Level.FINE, "Registrering suspended resource: {}", srid); + suspendedResponses.put(srid, event.getResource().getResponse()); + + AsyncIOWriter writer = event.getResource().getResponse().getAsyncIOWriter(); + if (writer == null) { + writer = new AtmosphereInterceptorWriter(); + r.getResponse().asyncIOWriter(writer); + } + if (writer instanceof AtmosphereInterceptorWriter) { + ((AtmosphereInterceptorWriter)writer).interceptor(interceptor); + } + } + + @Override + public void onDisconnect(AtmosphereResourceEvent event) { + super.onDisconnect(event); + final String srid = (String)event.getResource().getRequest() + .getAttribute(ApplicationConfig.SUSPENDED_ATMOSPHERE_RESOURCE_UUID); + LOG.log(Level.FINE, "Unregistrering suspended resource: {}", srid); + suspendedResponses.remove(srid); + } + + }); AtmosphereRequest request = r.getRequest(); if (request.getAttribute(REQUEST_DISPATCHED) == null) { @@ -115,6 +186,11 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter { try { byte[] data = WebSocketUtils.readBody(request.getInputStream()); if (data.length == 0) { + if (AtmosphereResource.TRANSPORT.WEBSOCKET == r.transport() + || AtmosphereResource.TRANSPORT.SSE == r.transport()) { + r.suspend(); + return Action.SUSPEND; + } return Action.CANCELLED; } @@ -124,10 +200,10 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter { try { AtmosphereRequest ar = createAtmosphereRequest(request, data); response = new WrappedAtmosphereResponse(r.getResponse(), ar); - ar.attributes().put(REQUEST_DISPATCHED, "true"); + ar.localAttributes().put(REQUEST_DISPATCHED, "true"); String refid = ar.getHeader(WebSocketConstants.DEFAULT_REQUEST_ID_KEY); if (refid != null) { - ar.attributes().put(WebSocketConstants.DEFAULT_REQUEST_ID_KEY, refid); + ar.localAttributes().put(WebSocketConstants.DEFAULT_REQUEST_ID_KEY, refid); } // This is a new request, we must clean the Websocket AtmosphereResource. request.removeAttribute(FrameworkConfig.INJECTED_ATMOSPHERE_RESOURCE); @@ -156,7 +232,6 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter { return Action.CANCELLED; } catch (IOException e) { LOG.log(Level.WARNING, "Error during protocol processing", e); - return Action.CONTINUE; } } else { request.destroyable(false); @@ -229,6 +304,9 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter { AtmosphereRequest request = response.request(); String refid = (String)request.getAttribute(WebSocketConstants.DEFAULT_REQUEST_ID_KEY); + if (AtmosphereResource.TRANSPORT.WEBSOCKET != response.resource().transport()) { + return payload; + } Map headers = new HashMap(); if (refid != null) { response.addHeader(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, refid);