Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 26EE417CE2 for ; Thu, 30 Apr 2015 19:00:06 +0000 (UTC) Received: (qmail 91554 invoked by uid 500); 30 Apr 2015 19:00:06 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 91517 invoked by uid 500); 30 Apr 2015 19:00:06 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 91507 invoked by uid 99); 30 Apr 2015 19:00:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Apr 2015 19:00:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D8A01E0061; Thu, 30 Apr 2015 19:00:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-5755 Date: Thu, 30 Apr 2015 19:00:05 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/master 82200b6e7 -> f05f83b15 https://issues.apache.org/jira/browse/AMQ-5755 Unit tests for some STOMP over WebSockets functionality and some fixes for resource cleanup. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f05f83b1 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f05f83b1 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f05f83b1 Branch: refs/heads/master Commit: f05f83b15dbee58927c4b231ece0419122bff4bf Parents: 82200b6 Author: Timothy Bish Authored: Thu Apr 30 14:41:59 2015 -0400 Committer: Timothy Bish Committed: Thu Apr 30 14:41:59 2015 -0400 ---------------------------------------------------------------------- .../transport/ws/AbstractStompSocket.java | 17 +- .../transport/ws/jetty8/StompSocket.java | 19 +- .../transport/ws/jetty9/StompSocket.java | 9 + .../transport/ws/StompWSConnection.java | 147 ++++++++++ .../transport/ws/StompWSTransportTest.java | 283 +++++++++++++++++++ .../activemq/transport/ws/WSTransportTest.java | 68 +---- .../transport/ws/WSTransportTestSupport.java | 160 +++++++++++ .../src/test/resources/log4j.properties | 2 + 8 files changed, 643 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java index b74bf5f..472561a 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java @@ -61,11 +61,9 @@ public abstract class AbstractStompSocket extends TransportSupport implements St } @Override - public abstract void sendToStomp(StompFrame command) throws IOException; - - @Override protected void doStop(ServiceStopper stopper) throws Exception { stompInactivityMonitor.stop(); + handleStopped(); } @Override @@ -74,6 +72,19 @@ public abstract class AbstractStompSocket extends TransportSupport implements St stompInactivityMonitor.setTransportListener(getTransportListener()); } + //----- Abstract methods for subclasses to implement ---------------------// + + @Override + public abstract void sendToStomp(StompFrame command) throws IOException; + + /** + * Called when the transport is stopping to allow the dervied classes + * a chance to close WebSocket resources. + * + * @throws IOException if an error occurs during the stop. + */ + public abstract void handleStopped() throws IOException; + //----- Accessor methods -------------------------------------------------// @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java index be1c8d1..23357bd 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java @@ -35,6 +35,20 @@ class StompSocket extends AbstractStompSocket implements WebSocket.OnTextMessage private Connection outbound; @Override + public void handleStopped() throws IOException { + if (outbound != null && outbound.isOpen()) { + outbound.close(); + } + } + + @Override + public void sendToStomp(StompFrame command) throws IOException { + outbound.sendMessage(command.format()); + } + + //----- WebSocket.OnTextMessage callback handlers ------------------------// + + @Override public void onOpen(Connection connection) { this.outbound = connection; } @@ -52,9 +66,4 @@ class StompSocket extends AbstractStompSocket implements WebSocket.OnTextMessage public void onMessage(String data) { processStompFrame(data); } - - @Override - public void sendToStomp(StompFrame command) throws IOException { - outbound.sendMessage(command.format()); - } } http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java index a07ccd0..be7dc30 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java @@ -41,6 +41,15 @@ class StompSocket extends AbstractStompSocket implements WebSocketListener { } @Override + public void handleStopped() throws IOException { + if (session != null && session.isOpen()) { + session.close(); + } + } + + //----- WebSocketListener event callbacks --------------------------------// + + @Override public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) { } http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnection.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnection.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnection.java new file mode 100644 index 0000000..09ec106 --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnection.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.ws; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.transport.stomp.StompFrame; +import org.eclipse.jetty.websocket.WebSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * STOMP over WS based Connection class + */ +public class StompWSConnection implements WebSocket, WebSocket.OnTextMessage { + + private static final Logger LOG = LoggerFactory.getLogger(StompWSConnection.class); + + private Connection connection; + private final CountDownLatch connectLatch = new CountDownLatch(1); + + private final BlockingQueue prefetch = new LinkedBlockingDeque(); + + private int closeCode = -1; + private String closeMessage; + + public boolean isConnected() { + return connection != null ? connection.isOpen() : false; + } + + public void close() { + if (connection != null) { + connection.close(); + } + } + + //---- Send methods ------------------------------------------------------// + + public void sendRawFrame(String rawFrame) throws Exception { + checkConnected(); + connection.sendMessage(rawFrame); + } + + public void sendFrame(StompFrame frame) throws Exception { + checkConnected(); + connection.sendMessage(frame.format()); + } + + public void keepAlive() throws Exception { + checkConnected(); + connection.sendMessage("\n"); + } + + //----- Receive methods --------------------------------------------------// + + public String receive() throws Exception { + checkConnected(); + return prefetch.take(); + } + + public String receive(long timeout, TimeUnit unit) throws Exception { + checkConnected(); + return prefetch.poll(timeout, unit); + } + + public String receiveNoWait() throws Exception { + checkConnected(); + return prefetch.poll(); + } + + //---- Blocking state change calls ---------------------------------------// + + public void awaitConnection() throws InterruptedException { + connectLatch.await(); + } + + public boolean awaitConnection(long time, TimeUnit unit) throws InterruptedException { + return connectLatch.await(time, unit); + } + + //----- Property Accessors -----------------------------------------------// + + public int getCloseCode() { + return closeCode; + } + + public String getCloseMessage() { + return closeMessage; + } + + //----- WebSocket callback handlers --------------------------------------// + + @Override + public void onMessage(String data) { + if (data == null) { + return; + } + + if (data.equals("\n")) { + LOG.debug("New incoming heartbeat read"); + } else { + LOG.trace("New incoming STOMP Frame read: \n{}", data); + prefetch.add(data); + } + } + + @Override + public void onOpen(Connection connection) { + this.connection = connection; + this.connectLatch.countDown(); + } + + @Override + public void onClose(int closeCode, String message) { + LOG.trace("STOMP WS Connection closed, code:{} message:{}", closeCode, message); + + this.connection = null; + this.closeCode = closeCode; + this.closeMessage = message; + } + + //----- Internal implementation ------------------------------------------// + + private void checkConnected() throws IOException { + if (!isConnected()) { + throw new IOException("STOMP WS Connection is closed."); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java new file mode 100644 index 0000000..c6bfdd4 --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.ws; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompFrame; +import org.apache.activemq.util.Wait; +import org.eclipse.jetty.websocket.WebSocketClient; +import org.eclipse.jetty.websocket.WebSocketClientFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test STOMP over WebSockets functionality. + */ +public class StompWSTransportTest extends WSTransportTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(StompWSTransportTest.class); + + protected WebSocketClient wsClient; + protected StompWSConnection wsStompConnection; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + WebSocketClientFactory clientFactory = new WebSocketClientFactory(); + clientFactory.start(); + + wsClient = clientFactory.newWebSocketClient(); + wsStompConnection = new StompWSConnection(); + + wsClient.open(wsConnectUri, wsStompConnection); + if (!wsStompConnection.awaitConnection(30, TimeUnit.SECONDS)) { + throw new IOException("Could not connect to STOMP WS endpoint"); + } + } + + @Override + @After + public void tearDown() throws Exception { + if (wsStompConnection != null) { + wsStompConnection.close(); + wsStompConnection = null; + wsClient = null; + } + + super.tearDown(); + } + + @Test(timeout = 60000) + public void testConnect() throws Exception { + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.2\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + + wsStompConnection.sendRawFrame(connectFrame); + + String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS); + assertNotNull(incoming); + assertTrue(incoming.startsWith("CONNECTED")); + + assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 1; + } + })); + + wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT)); + wsStompConnection.close(); + + assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 0; + } + })); + } + + @Test(timeout = 60000) + public void testConnectWithVersionOptions() throws Exception { + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.0,1.1\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + wsStompConnection.sendRawFrame(connectFrame); + + String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS); + + assertTrue(incoming.startsWith("CONNECTED")); + assertTrue(incoming.indexOf("version:1.1") >= 0); + assertTrue(incoming.indexOf("session:") >= 0); + + wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT)); + wsStompConnection.close(); + } + + @Test(timeout = 60000) + public void testRejectInvalidHeartbeats1() throws Exception { + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.1\n" + + "heart-beat:0\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + wsStompConnection.sendRawFrame(connectFrame); + + String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS); + + assertTrue(incoming.startsWith("ERROR")); + assertTrue(incoming.indexOf("heart-beat") >= 0); + assertTrue(incoming.indexOf("message:") >= 0); + + assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 0; + } + })); + } + + @Test(timeout = 60000) + public void testRejectInvalidHeartbeats2() throws Exception { + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.1\n" + + "heart-beat:T,0\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + wsStompConnection.sendRawFrame(connectFrame); + + String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS); + + assertTrue(incoming.startsWith("ERROR")); + assertTrue(incoming.indexOf("heart-beat") >= 0); + assertTrue(incoming.indexOf("message:") >= 0); + + assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 0; + } + })); + } + + @Test(timeout = 60000) + public void testRejectInvalidHeartbeats3() throws Exception { + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.1\n" + + "heart-beat:100,10,50\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + wsStompConnection.sendRawFrame(connectFrame); + + String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS); + + assertTrue(incoming.startsWith("ERROR")); + assertTrue(incoming.indexOf("heart-beat") >= 0); + assertTrue(incoming.indexOf("message:") >= 0); + + assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 0; + } + })); + } + + @Test(timeout = 60000) + public void testHeartbeatsDropsIdleConnection() throws Exception { + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.1\n" + + "heart-beat:1000,0\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + + wsStompConnection.sendRawFrame(connectFrame); + String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS); + assertTrue(incoming.startsWith("CONNECTED")); + assertTrue(incoming.indexOf("version:1.1") >= 0); + assertTrue(incoming.indexOf("heart-beat:") >= 0); + assertTrue(incoming.indexOf("session:") >= 0); + + assertTrue("Broker should have closed WS connection:", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !wsStompConnection.isConnected(); + } + })); + } + + @Test(timeout = 60000) + public void testHeartbeatsKeepsConnectionOpen() throws Exception { + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.1\n" + + "heart-beat:2000,0\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + + wsStompConnection.sendRawFrame(connectFrame); + String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS); + assertTrue(incoming.startsWith("CONNECTED")); + assertTrue(incoming.indexOf("version:1.1") >= 0); + assertTrue(incoming.indexOf("heart-beat:") >= 0); + assertTrue(incoming.indexOf("session:") >= 0); + + String message = "SEND\n" + "destination:/queue/" + getTestName() + "\n\n" + "Hello World" + Stomp.NULL; + wsStompConnection.sendRawFrame(message); + + ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); + + service.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + LOG.info("Sending next KeepAlive"); + wsStompConnection.keepAlive(); + } catch (Exception e) { + } + } + }, 1, 1, TimeUnit.SECONDS); + + TimeUnit.SECONDS.sleep(15); + + String frame = "SUBSCRIBE\n" + "destination:/queue/" + getTestName() + "\n" + + "id:12345\n" + "ack:auto\n\n" + Stomp.NULL; + wsStompConnection.sendRawFrame(frame); + + incoming = wsStompConnection.receive(30, TimeUnit.SECONDS); + assertTrue(incoming.startsWith("MESSAGE")); + + service.shutdownNow(); + service.awaitTermination(5, TimeUnit.SECONDS); + + wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT)); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java index 140356e..546209e 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java @@ -21,29 +21,22 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; -import java.net.ServerSocket; import java.net.Socket; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; -import javax.net.ServerSocketFactory; - -import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.spring.SpringSslContext; import org.apache.activemq.transport.SocketConnectorFactory; import org.apache.activemq.transport.stomp.StompConnection; import org.apache.activemq.util.Wait; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.webapp.WebAppContext; - import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; - import org.openqa.selenium.By; import org.openqa.selenium.WebDriver; import org.openqa.selenium.WebElement; @@ -54,43 +47,25 @@ import org.openqa.selenium.firefox.FirefoxProfile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class WSTransportTest { +public class WSTransportTest extends WSTransportTestSupport { private static final Logger LOG = LoggerFactory.getLogger(WSTransportTest.class); private static final int MESSAGE_COUNT = 1000; - private BrokerService broker; private Server server; private WebDriver driver; private File profileDir; private String stompUri; - private int proxyPort = 0; - protected String wsUri; private StompConnection stompConnection = new StompConnection(); - protected BrokerService createBroker(boolean deleteMessages) throws Exception { - BrokerService broker = BrokerFactory.createBroker( - new URI("broker:()/localhost?persistent=false&useJmx=false")); - - SpringSslContext context = new SpringSslContext(); - context.setKeyStore("src/test/resources/server.keystore"); - context.setKeyStoreKeyPassword("password"); - context.setTrustStore("src/test/resources/client.keystore"); - context.setTrustStorePassword("password"); - context.afterPropertiesSet(); - broker.setSslContext(context); - - stompUri = broker.addConnector("stomp://localhost:0").getPublishableConnectString(); - wsUri = broker.addConnector(getWSConnectorURI()).getPublishableConnectString(); - broker.setDeleteAllMessagesOnStartup(deleteMessages); - broker.start(); - broker.waitUntilStarted(); - - return broker; + @Override + protected void addAdditionalConnectors(BrokerService service) throws Exception { + stompUri = service.addConnector("stomp://localhost:0").getPublishableConnectString(); } + @Override protected String getWSConnectorURI() { return "ws://127.0.0.1:61623?websocket.maxTextMessageSize=99999&transport.maxIdleTime=1001"; } @@ -114,31 +89,13 @@ public class WSTransportTest { return server; } - protected int getProxyPort() { - if (proxyPort == 0) { - ServerSocket ss = null; - try { - ss = ServerSocketFactory.getDefault().createServerSocket(0); - proxyPort = ss.getLocalPort(); - } catch (IOException e) { // ignore - } finally { - try { - if (ss != null ) { - ss.close(); - } - } catch (IOException e) { // ignore - } - } - } - return proxyPort; - } - protected Connector createJettyConnector(Server server) throws Exception { Connector c = new SocketConnectorFactory().createConnector(server); c.getClass().getMethod("setPort", Integer.TYPE).invoke(c, getProxyPort()); return c; } + @Override protected void stopBroker() throws Exception { if (broker != null) { broker.stop(); @@ -147,14 +104,16 @@ public class WSTransportTest { } } + @Override @Before public void setUp() throws Exception { + super.setUp(); profileDir = new File("activemq-data/profiles"); - broker = createBroker(true); stompConnect(); server = createWebServer(); } + @Override @After public void tearDown() throws Exception { try { @@ -163,10 +122,11 @@ public class WSTransportTest { // Some tests explicitly disconnect from stomp so can ignore } finally { try { - stopBroker(); - } catch (Exception e) { - LOG.warn("Error on Broker stop."); + super.tearDown(); + } catch (Exception ex) { + LOG.warn("Error on super tearDown()"); } + if (driver != null) { try { driver.quit(); @@ -234,7 +194,7 @@ public class WSTransportTest { protected String getTestURI() { int port = getProxyPort(); - return "http://localhost:" + port + "/websocket.html#" + wsUri; + return "http://localhost:" + port + "/websocket.html#" + wsConnectUri; } public void doTestWebSockets(WebDriver driver) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java new file mode 100644 index 0000000..9c4abc8 --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.ws; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.URI; + +import javax.jms.JMSException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.net.ServerSocketFactory; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.jmx.TopicViewMBean; +import org.apache.activemq.spring.SpringSslContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Basic infrastructure for test WebSocket connections. + */ +public class WSTransportTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(WSTransportTestSupport.class); + + @Rule + public TestName name = new TestName(); + + private int proxyPort = 0; + + protected BrokerService broker; + protected URI wsConnectUri; + + @Before + public void setUp() throws Exception { + broker = createBroker(true); + } + + @After + public void tearDown() throws Exception { + try { + stopBroker(); + } catch(Exception e) { + LOG.warn("Error on Broker stop."); + } + } + + protected String getWSConnectorURI() { + return "ws://127.0.0.1:" + getProxyPort() + "?websocket.maxTextMessageSize=99999&transport.maxIdleTime=1001"; + } + + protected void addAdditionalConnectors(BrokerService service) throws Exception { + + } + + protected BrokerService createBroker(boolean deleteMessages) throws Exception { + + BrokerService broker = new BrokerService(); + + SpringSslContext context = new SpringSslContext(); + context.setKeyStore("src/test/resources/server.keystore"); + context.setKeyStoreKeyPassword("password"); + context.setTrustStore("src/test/resources/client.keystore"); + context.setTrustStorePassword("password"); + context.afterPropertiesSet(); + broker.setSslContext(context); + + wsConnectUri = broker.addConnector(getWSConnectorURI()).getPublishableConnectURI(); + + broker.setUseJmx(true); + broker.getManagementContext().setCreateConnector(false); + broker.setPersistent(isPersistent()); + broker.setDeleteAllMessagesOnStartup(deleteMessages); + broker.start(); + broker.waitUntilStarted(); + + addAdditionalConnectors(broker); + + return broker; + } + + protected boolean isPersistent() { + return false; + } + + protected String getTestName() { + return name.getMethodName(); + } + + protected int getProxyPort() { + if (proxyPort == 0) { + ServerSocket ss = null; + try { + ss = ServerSocketFactory.getDefault().createServerSocket(0); + proxyPort = ss.getLocalPort(); + } catch (IOException e) { // ignore + } finally { + try { + if (ss != null ) { + ss.close(); + } + } catch (IOException e) { // ignore + } + } + } + + return proxyPort; + } + + protected void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + } + + protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException { + ObjectName brokerViewMBean = new ObjectName( + "org.apache.activemq:type=Broker,brokerName=localhost"); + BrokerViewMBean proxy = (BrokerViewMBean) broker.getManagementContext() + .newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true); + return proxy; + } + + protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } + + protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException { + ObjectName topicViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name); + TopicViewMBean proxy = (TopicViewMBean) broker.getManagementContext() + .newProxyInstance(topicViewMBeanName, TopicViewMBean.class, true); + return proxy; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/activemq-http/src/test/resources/log4j.properties b/activemq-http/src/test/resources/log4j.properties index 7cc1941..f28c2a8 100755 --- a/activemq-http/src/test/resources/log4j.properties +++ b/activemq-http/src/test/resources/log4j.properties @@ -20,6 +20,8 @@ # log4j.rootLogger=INFO, out, stdout +log4j.logger.org.apache.activemq.transport.ws=DEBUG + #log4j.logger.org.apache.activemq.broker.scheduler=DEBUG #log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG #log4j.logger.org.apache.activemq.transport.failover=TRACE