Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AF507173FF for ; Fri, 31 Oct 2014 12:49:49 +0000 (UTC) Received: (qmail 19340 invoked by uid 500); 31 Oct 2014 12:49:49 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 19280 invoked by uid 500); 31 Oct 2014 12:49:49 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 19271 invoked by uid 99); 31 Oct 2014 12:49:49 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 31 Oct 2014 12:49:49 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 10218A07F9E; Fri, 31 Oct 2014 12:49:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ay@apache.org To: commits@cxf.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: [CXF-6075] NPE may occur at websocket destination under high load Date: Fri, 31 Oct 2014 12:49:48 +0000 (UTC) Repository: cxf Updated Branches: refs/heads/master fe9aaf634 -> 56c0db051 [CXF-6075] NPE may occur at websocket destination under high load Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/56c0db05 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/56c0db05 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/56c0db05 Branch: refs/heads/master Commit: 56c0db05126292a61a782f05848321b9b8b8b80c Parents: fe9aaf6 Author: Akitoshi Yoshida Authored: Fri Oct 31 13:49:12 2014 +0100 Committer: Akitoshi Yoshida Committed: Fri Oct 31 13:49:38 2014 +0100 ---------------------------------------------------------------------- .../atmosphere/AtmosphereWebSocketHandler.java | 16 ++++++++++++++-- .../transport/websocket/jetty/JettyWebSocket.java | 16 ++++++++++++++-- .../jetty9/Jetty9WebSocketDestination.java | 18 +++++++++++++++++- 3 files changed, 45 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/56c0db05/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java index 776741e..38e6599 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java @@ -27,6 +27,7 @@ import java.security.Principal; import java.util.Enumeration; import java.util.List; import java.util.Locale; +import java.util.concurrent.RejectedExecutionException; import java.util.logging.Level; import java.util.logging.Logger; @@ -99,11 +100,11 @@ public class AtmosphereWebSocketHandler implements WebSocketProtocol { protected List invokeService(final WebSocket webSocket, final InputStream stream) { LOG.fine("invokeService(WebSocket, InputStream)"); - // invoke the service directly as onMessage is synchronously blocked (in jetty) + // invoke the service asynchronously as onMessage is synchronously blocked (in jetty) // make sure the byte array passed to this method is immutable, as the websocket framework // may corrupt the byte array after this method is returned (i.e., before the data is returned in // the executor's thread. - destination.getExecutor().execute(new Runnable() { + executeServiceTask(new Runnable() { @Override public void run() { HttpServletRequest request = null; @@ -131,6 +132,17 @@ public class AtmosphereWebSocketHandler implements WebSocketProtocol { return null; } + private void executeServiceTask(Runnable r) { + try { + destination.getExecutor().execute(r); + } catch (RejectedExecutionException e) { + LOG.warning( + "Executor queue is full, run the service invocation task in caller thread." + + " Users can specify a larger executor queue to avoid this."); + r.run(); + } + } + // may want to move this error reporting code to WebSocketServletHolder protected void reportErrorStatus(HttpServletResponse response, int status) { if (response != null) { http://git-wip-us.apache.org/repos/asf/cxf/blob/56c0db05/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java index 5bab332..6ae3c9f 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.RejectedExecutionException; import java.util.logging.Level; import java.util.logging.Logger; @@ -109,7 +110,7 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa // make sure the byte array passed to this method is immutable, as the websocket framework // may corrupt the byte array after this method is returned (i.e., before the data is returned in // the executor's thread. - manager.getExecutor().execute(new Runnable() { + executeServiceTask(new Runnable() { @Override public void run() { HttpServletRequest request = null; @@ -133,7 +134,18 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa } }); } - + + private void executeServiceTask(Runnable r) { + try { + manager.getExecutor().execute(r); + } catch (RejectedExecutionException e) { + LOG.warning( + "Executor queue is full, run the service invocation task in caller thread." + + " Users can specify a larger executor queue to avoid this."); + r.run(); + } + } + // may want to move this error reporting code to WebSocketServletHolder private void reportErrorStatus(HttpServletResponse response, int status) { if (response != null) { http://git-wip-us.apache.org/repos/asf/cxf/blob/56c0db05/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java index ec2eeb2..32e681e 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java @@ -28,6 +28,8 @@ import java.util.Enumeration; import java.util.Locale; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.logging.Logger; import javax.servlet.DispatcherType; import javax.servlet.ServletConfig; @@ -37,6 +39,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.cxf.Bus; import org.apache.cxf.common.classloader.ClassLoaderUtils; +import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.service.model.EndpointInfo; import org.apache.cxf.transport.http.DestinationRegistry; import org.apache.cxf.transport.http_jetty.JettyHTTPDestination; @@ -62,6 +65,7 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; */ public class Jetty9WebSocketDestination extends JettyHTTPDestination implements WebSocketDestinationService { + private static final Logger LOG = LogUtils.getL7dLogger(Jetty9WebSocketDestination.class); //REVISIT make these keys configurable private String requestIdKey = WebSocketConstants.DEFAULT_REQUEST_ID_KEY; @@ -130,7 +134,7 @@ public class Jetty9WebSocketDestination extends JettyHTTPDestination implements // make sure the byte array passed to this method is immutable, as the websocket framework // may corrupt the byte array after this method is returned (i.e., before the data is returned in // the executor's thread. - executor.execute(new Runnable() { + executeServiceTask(new Runnable() { @Override public void run() { HttpServletRequest request = null; @@ -155,6 +159,18 @@ public class Jetty9WebSocketDestination extends JettyHTTPDestination implements }); } + + private void executeServiceTask(Runnable r) { + try { + executor.execute(r); + } catch (RejectedExecutionException e) { + LOG.warning( + "Executor queue is full, run the service invocation task in caller thread." + + " Users can specify a larger executor queue to avoid this."); + r.run(); + } + } + private void reportErrorStatus(Session session, int i, HttpServletResponse resp) { try { resp.sendError(i);