cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject git commit: [CXF-5809] WebSocket's concurrent calls handling update; add a test for it
Date Wed, 18 Jun 2014 14:17:19 GMT
Repository: cxf
Updated Branches:
  refs/heads/master d3b9fc560 -> 442972837


[CXF-5809] WebSocket's concurrent calls handling update; add a test for it


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/44297283
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/44297283
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/44297283

Branch: refs/heads/master
Commit: 4429728376a8bf0870d14785f20fd3edc7340ebe
Parents: d3b9fc5
Author: Akitoshi Yoshida <ay@apache.org>
Authored: Wed Jun 18 15:25:54 2014 +0200
Committer: Akitoshi Yoshida <ay@apache.org>
Committed: Wed Jun 18 16:10:20 2014 +0200

----------------------------------------------------------------------
 .../atmosphere/AtmosphereWebSocketHandler.java  | 47 ++++++++++--------
 .../AtmosphereWebSocketServletDestination.java  | 10 ++++
 .../websocket/jetty/JettyWebSocketManager.java  | 14 +++---
 .../jetty/JettyWebSocketManagerTest.java        | 52 ++++++++++++++++++--
 .../jaxrs/websocket/BookStoreWebSocket.java     | 13 +++++
 .../JAXRSClientServerWebSocketTest.java         | 22 +++++++++
 6 files changed, 126 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/44297283/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
index 2b17d16..6a5550d 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
@@ -55,7 +55,7 @@ public class AtmosphereWebSocketHandler implements WebSocketProtocol {
     private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketHandler.class);
 
     protected AtmosphereWebSocketServletDestination destination;
-    
+
     //REVISIT make these keys configurable
     private String requestIdKey = WebSocketConstants.DEFAULT_REQUEST_ID_KEY;
     private String responseIdKey = WebSocketConstants.DEFAULT_RESPONSE_ID_KEY;
@@ -95,29 +95,34 @@ public class AtmosphereWebSocketHandler implements WebSocketProtocol {
         return invokeService(webSocket, new ByteArrayInputStream(data, offset, length));
     }
     
-    protected List<AtmosphereRequest> invokeService(WebSocket webSocket,  InputStream
stream) {
+    protected List<AtmosphereRequest> invokeService(final WebSocket webSocket,  final
InputStream stream) {
         LOG.info("invokeService(WebSocket, InputStream)");
-        // invoke the service directly as atmosphere's onMessage is not synchronously blocked
-        HttpServletRequest request = null;
-        HttpServletResponse response = null;        
-        try {
-            WebSocketServletHolder webSocketHolder = new AtmosphereWebSocketServletHolder(webSocket);
-            response = createServletResponse(webSocketHolder);
-            request = createServletRequest(webSocketHolder, stream);
-            if (destination != null) {
-                String reqid = request.getHeader(requestIdKey);
-                if (reqid != null) {
-                    response.setHeader(responseIdKey, reqid);
+        // invoke the service directly as onMessage is synchronously blocked (in jetty)
+        destination.getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                HttpServletRequest request = null;
+                HttpServletResponse response = null;
+                try {
+                    WebSocketServletHolder webSocketHolder = new AtmosphereWebSocketServletHolder(webSocket);
+                    response = createServletResponse(webSocketHolder);
+                    request = createServletRequest(webSocketHolder, stream);
+                    if (destination != null) {
+                        String reqid = request.getHeader(requestIdKey);
+                        if (reqid != null) {
+                            response.setHeader(responseIdKey, reqid);
+                        }
+                        ((WebSocketDestinationService)destination).invokeInternal(null,
+                            webSocket.resource().getRequest().getServletContext(),
+                            request, response);
+                    }
+                } catch (InvalidPathException ex) {
+                    reportErrorStatus(response, 400);
+                } catch (Exception e) {
+                    LOG.log(Level.WARNING, "Failed to invoke service", e);
                 }
-                ((WebSocketDestinationService)destination).invokeInternal(null, 
-                    webSocket.resource().getRequest().getServletContext(),
-                    request, response);
             }
-        } catch (InvalidPathException ex) { 
-            reportErrorStatus(response, 400);
-        } catch (Exception e) {
-            LOG.log(Level.WARNING, "Failed to invoke service", e);
-        }
+        });
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/44297283/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
index ffeeb5c..508bf2d 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
@@ -20,6 +20,7 @@
 package org.apache.cxf.transport.websocket.atmosphere;
 
 import java.io.IOException;
+import java.util.concurrent.Executor;
 
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletContext;
@@ -32,6 +33,7 @@ import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.http.DestinationRegistry;
 import org.apache.cxf.transport.servlet.ServletDestination;
 import org.apache.cxf.transport.websocket.WebSocketDestinationService;
+import org.apache.cxf.workqueue.WorkQueueManager;
 import org.atmosphere.cpr.ApplicationConfig;
 import org.atmosphere.cpr.AtmosphereFramework;
 import org.atmosphere.cpr.AtmosphereRequest;
@@ -45,6 +47,7 @@ import org.atmosphere.websocket.WebSocketProtocol;
 public class AtmosphereWebSocketServletDestination extends ServletDestination implements
     WebSocketDestinationService {
     private AtmosphereFramework framework;
+    private Executor executor;
 
     public AtmosphereWebSocketServletDestination(Bus bus, DestinationRegistry registry, EndpointInfo
ei, 
                                                  String path) throws IOException {
@@ -62,6 +65,10 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination
im
         if (wsp instanceof AtmosphereWebSocketHandler) {
             ((AtmosphereWebSocketHandler)wsp).setDestination(this);
         }
+
+        // the executor for decoupling the service invocation from websocket's onMessage
call which is
+        // synchronously blocked
+        executor = bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue();
     }
 
     @Override
@@ -84,4 +91,7 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination
im
         super.invoke(config, context, req, resp);
     }
 
+    Executor getExecutor() {
+        return executor;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/44297283/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
index 5e7f4f7..7aef4b7 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
@@ -31,14 +31,15 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.cxf.transport.http.AbstractHTTPDestination;
 import org.apache.cxf.transport.websocket.WebSocketDestinationService;
-import org.apache.cxf.workqueue.OneShotAsyncExecutor;
+import org.apache.cxf.workqueue.WorkQueueManager;
 import org.eclipse.jetty.websocket.WebSocketFactory;
 import org.eclipse.jetty.websocket.WebSocketFactory.Acceptor;
 
 /**
- * 
+ * This class is used to provide the common functionality used by
+ * the two jetty websocket destination classes: JettyWebSocketDestination and JettyWebSocketServletDestination.
  */
-public class JettyWebSocketManager {
+class JettyWebSocketManager {
     private WebSocketFactory webSocketFactory;
     private AbstractHTTPDestination destination;
     private ServletContext servletContext;
@@ -50,10 +51,9 @@ public class JettyWebSocketManager {
         //TODO customize websocket factory configuration options when using the destination.
         webSocketFactory = new WebSocketFactory((Acceptor)dest, 8192);
 
-        //FIXME get the bus's executor for async service invocation to decouple
-        // the service invocation from websocket's onMessage call which is synchronously
-        // blocked.
-        executor = OneShotAsyncExecutor.getInstance();
+        // the executor for decoupling the service invocation from websocket's onMessage
call which is
+        // synchronously blocked
+        executor = dest.getBus().getExtension(WorkQueueManager.class).getAutomaticWorkQueue();
     }
 
     void setServletContext(ServletContext servletContext) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/44297283/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManagerTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManagerTest.java
b/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManagerTest.java
index 85ce439..f595fea 100644
--- a/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManagerTest.java
+++ b/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManagerTest.java
@@ -24,6 +24,10 @@ import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.cxf.Bus;
+import org.apache.cxf.transport.http.AbstractHTTPDestination;
+import org.apache.cxf.workqueue.AutomaticWorkQueue;
+import org.apache.cxf.workqueue.WorkQueueManager;
 import org.easymock.EasyMock;
 import org.easymock.IMocksControl;
 
@@ -45,18 +49,20 @@ public class JettyWebSocketManagerTest extends Assert {
     @Test
     public void testServiceUsingJettyDestination() throws Exception {
         JettyWebSocketManager jwsm = new JettyWebSocketManager();
-        
+
         JettyWebSocketDestination dest = control.createMock(JettyWebSocketDestination.class);
-                
+        setupDestination(dest);
+
         HttpServletRequest request = control.createMock(HttpServletRequest.class);
         HttpServletResponse response = control.createMock(HttpServletResponse.class);
         
         dest.invokeInternal(EasyMock.isNull(ServletConfig.class), EasyMock.anyObject(ServletContext.class),

                     EasyMock.eq(request), EasyMock.eq(response));
         EasyMock.expectLastCall();
+        
         control.replay();
         jwsm.init(dest);
-        
+
         jwsm.service(request, response);
         control.verify();
     }
@@ -66,7 +72,8 @@ public class JettyWebSocketManagerTest extends Assert {
         JettyWebSocketManager jwsm = new JettyWebSocketManager();
         
         JettyWebSocketServletDestination dest = control.createMock(JettyWebSocketServletDestination.class);
-                
+        setupDestination(dest);
+
         HttpServletRequest request = control.createMock(HttpServletRequest.class);
         HttpServletResponse response = control.createMock(HttpServletResponse.class);
         
@@ -79,4 +86,41 @@ public class JettyWebSocketManagerTest extends Assert {
         jwsm.service(request, response);
         control.verify();
     }
+
+
+    private void setupDestination(AbstractHTTPDestination dest) {
+        Bus bus = control.createMock(Bus.class);
+        WorkQueueManager wqm = control.createMock(WorkQueueManager.class);
+
+        EasyMock.expect(dest.getBus()).andReturn(bus);
+        EasyMock.expect(bus.getExtension(WorkQueueManager.class)).andReturn(wqm);
+        EasyMock.expect(wqm.getAutomaticWorkQueue()).andReturn(
+            new AutomaticWorkQueue() {
+                @Override
+                public void execute(Runnable work, long timeout) {
+                }
+
+                @Override
+                public void schedule(Runnable work, long delay) {
+                }
+
+                @Override
+                public void execute(Runnable command) {
+                }
+
+                @Override
+                public String getName() {
+                    return null;
+                }
+
+                @Override
+                public void shutdown(boolean processRemainingWorkItems) {
+                }
+
+                @Override
+                public boolean isShutdown() {
+                    return false;
+                }
+            });
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/44297283/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
index d2c70e1..857bccc 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
@@ -22,6 +22,7 @@ package org.apache.cxf.systest.jaxrs.websocket;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.Date;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -123,6 +124,18 @@ public class BookStoreWebSocket {
         };
     }
     
+    @GET
+    @Path("/hold/{t}")
+    @Produces("text/plain")
+    public String hold(@PathParam("t") long t) {
+        Date from = new Date();
+        try {
+            Thread.sleep(t);
+        } catch (InterruptedException e) {
+            // ignore
+        }
+        return "Held from " + from + " for " + t + " ms";
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/44297283/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
index 46d8fc2..b30984f 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java
@@ -306,6 +306,28 @@ public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestB
         }
     }
     
+    @Test
+    public void testCallsInParallel() throws Exception {
+        String address = "ws://localhost:" + getPort() + "/websocket/web/bookstore";
+
+        WebSocketTestClient wsclient = new WebSocketTestClient(address);
+        wsclient.connect();
+        try {
+            // call the GET service that takes a long time to response
+            wsclient.reset(2);
+            wsclient.sendTextMessage(
+                "GET /websocket/web/bookstore/hold/3000");
+            wsclient.sendTextMessage(
+                "GET /websocket/web/bookstore/hold/3000");
+            // each call takes 3 seconds but executed in parallel, so waiting 4 secs is sufficient
+            assertTrue("response expected", wsclient.await(4));
+            List<WebSocketTestClient.Response> received = wsclient.getReceivedResponses();
+            assertEquals(2, received.size());
+        } finally {
+            wsclient.close();
+        }
+    }
+
     protected String getPort() {
         return PORT;
     }


Mime
View raw message