Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6976518DF1 for ; Tue, 16 Feb 2016 20:25:23 +0000 (UTC) Received: (qmail 53490 invoked by uid 500); 16 Feb 2016 20:10:22 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 53412 invoked by uid 500); 16 Feb 2016 20:10:22 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 52841 invoked by uid 99); 16 Feb 2016 20:10:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Feb 2016 20:10:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BB983E699E; Tue, 16 Feb 2016 20:10:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 16 Feb 2016 20:10:32 -0000 Message-Id: <8658c8905b3a41ec804bfd32de65c0f9@git.apache.org> In-Reply-To: <245ab439bc114dfc8df95c6666f056d7@git.apache.org> References: <245ab439bc114dfc8df95c6666f056d7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/48] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java index 5016e30..dc91873 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java @@ -16,22 +16,41 @@ */ package org.apache.activemq.transport.failover; -public class TwoBrokerFailoverClusterTest extends FailoverClusterTestSupport { +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; - private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616"; - private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617"; - private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://127.0.0.1:61626"; - private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://127.0.0.1:61627"; - private static final String BROKER_A_NAME = "BROKERA"; - private static final String BROKER_B_NAME = "BROKERB"; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +public class TwoBrokerFailoverClusterTest extends OpenwireArtemisBaseTest { + + private static final int NUMBER_OF_CLIENTS = 30; + private final List connections = new ArrayList(); + private EmbeddedJMS server0; + private EmbeddedJMS server1; + private String clientUrl; + + @Test public void testTwoBrokersRestart() throws Exception { - createBrokerA(false, "", null, null); - createBrokerB(false, "", null, null); - getBroker(BROKER_B_NAME).waitUntilStarted(); Thread.sleep(2000); - setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")"); createClients(); Thread.sleep(5000); @@ -39,59 +58,106 @@ public class TwoBrokerFailoverClusterTest extends FailoverClusterTestSupport { assertClientsConnectedToTwoBrokers(); assertClientsConnectionsEvenlyDistributed(.35); - getBroker(BROKER_A_NAME).stop(); - getBroker(BROKER_A_NAME).waitUntilStopped(); - removeBroker(BROKER_A_NAME); + server0.stop(); + Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 1)); Thread.sleep(1000); - assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS); + assertAllConnectedTo(newURI("127.0.0.1", 1)); Thread.sleep(5000); - createBrokerA(false, "", null, null); - getBroker(BROKER_A_NAME).waitUntilStarted(); + server0.start(); + Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); Thread.sleep(5000); + //need update-cluster-clients, -on-remove and rebalance set to true. assertClientsConnectedToTwoBrokers(); assertClientsConnectionsEvenlyDistributed(.35); } - private void createBrokerA(boolean multi, - String params, - String clusterFilter, - String destinationFilter) throws Exception { - final String tcParams = (params == null) ? "" : params; - if (getBroker(BROKER_A_NAME) == null) { - addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME)); - addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + tcParams, true); - if (multi) { - addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS + tcParams, false); - addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); - } - else { - addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); - } - getBroker(BROKER_A_NAME).start(); + + @Before + public void setUp() throws Exception { + Configuration config0 = createConfig("127.0.0.1", 0); + Configuration config1 = createConfig("127.0.0.1", 1); + + deployClusterConfiguration(config0, 1); + deployClusterConfiguration(config1, 0); + + server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl()); + + clientUrl = "failover://(" + newURI("127.0.0.1", 0) + "," + newURI("127.0.0.1", 1) + ")"; + + server0.start(); + server1.start(); + Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + } + + @After + public void tearDown() throws Exception { + server0.stop(); + server1.stop(); + } + + protected void createClients() throws Exception { + createClients(NUMBER_OF_CLIENTS); + } + + protected void createClients(int numOfClients) throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl); + for (int i = 0; i < numOfClients; i++) { + ActiveMQConnection c = (ActiveMQConnection) factory.createConnection(); + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = s.createQueue(getClass().getName()); + MessageConsumer consumer = s.createConsumer(queue); + connections.add(c); } } - private void createBrokerB(boolean multi, - String params, - String clusterFilter, - String destinationFilter) throws Exception { - final String tcParams = (params == null) ? "" : params; - if (getBroker(BROKER_B_NAME) == null) { - addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME)); - addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + tcParams, true); - if (multi) { - addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS + tcParams, false); - addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); + protected void assertClientsConnectedToTwoBrokers() { + Set set = new HashSet(); + for (ActiveMQConnection c : connections) { + if (c.getTransportChannel().getRemoteAddress() != null) { + set.add(c.getTransportChannel().getRemoteAddress()); } - else { - addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); + } + Assert.assertTrue("Only 2 connections should be found: " + set, set.size() == 2); + } + + protected void assertClientsConnectionsEvenlyDistributed(double minimumPercentage) { + Map clientConnectionCounts = new HashMap(); + int total = 0; + for (ActiveMQConnection c : connections) { + String key = c.getTransportChannel().getRemoteAddress(); + if (key != null) { + total++; + if (clientConnectionCounts.containsKey(key)) { + double count = clientConnectionCounts.get(key); + count += 1.0; + clientConnectionCounts.put(key, count); + } + else { + clientConnectionCounts.put(key, 1.0); + } } - getBroker(BROKER_B_NAME).start(); + } + Set keys = clientConnectionCounts.keySet(); + for (String key : keys) { + double count = clientConnectionCounts.get(key); + double percentage = count / total; + System.out.println(count + " of " + total + " connections for " + key + " = " + percentage); + Assert.assertTrue("Connections distribution expected to be >= than " + minimumPercentage + ". Actuall distribution was " + percentage + " for connection " + key, percentage >= minimumPercentage); + } + } + + protected void assertAllConnectedTo(String url) throws Exception { + for (ActiveMQConnection c : connections) { + Assert.assertEquals(url, c.getTransportChannel().getRemoteAddress()); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java index dc369be..a372b79 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java @@ -26,45 +26,42 @@ import javax.jms.Session; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.apache.activemq.util.MessageIdList; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; -public class FanoutTest extends TestCase { +public class FanoutTest extends OpenwireArtemisBaseTest { - BrokerService broker1; - BrokerService broker2; + EmbeddedJMS[] servers = new EmbeddedJMS[2]; ActiveMQConnectionFactory producerFactory = new ActiveMQConnectionFactory("fanout:(static:(tcp://localhost:61616,tcp://localhost:61617))?fanOutQueues=true"); Connection producerConnection; Session producerSession; int messageCount = 100; - @Override + @Before public void setUp() throws Exception { - broker1 = BrokerFactory.createBroker("broker:(tcp://localhost:61616)/brokerA?persistent=false&useJmx=false"); - broker2 = BrokerFactory.createBroker("broker:(tcp://localhost:61617)/brokerB?persistent=false&useJmx=false"); - - broker1.start(); - broker2.start(); - - broker1.waitUntilStarted(); - broker2.waitUntilStarted(); + setUpNonClusterServers(servers); producerConnection = producerFactory.createConnection(); producerConnection.start(); producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); } - @Override + @After public void tearDown() throws Exception { producerSession.close(); producerConnection.close(); - broker1.stop(); - broker2.stop(); + shutDownNonClusterServers(servers); } + @Test public void testSendReceive() throws Exception { MessageProducer prod = createProducer(); @@ -76,7 +73,6 @@ public class FanoutTest extends TestCase { assertMessagesReceived("tcp://localhost:61616"); assertMessagesReceived("tcp://localhost:61617"); - } protected MessageProducer createProducer() throws Exception { @@ -95,7 +91,7 @@ public class FanoutTest extends TestCase { listener.assertMessagesReceived(messageCount); consumer.close(); - consumerConnection.close(); consumerSession.close(); + consumerConnection.close(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java index 7e52f13..2cfc136 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java @@ -18,51 +18,111 @@ package org.apache.activemq.transport.fanout; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.jms.DeliveryMode; +import javax.jms.MessageNotWriteableException; -import junit.framework.Test; - +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.broker.StubConnection; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.SessionInfo; -import org.apache.activemq.network.NetworkTestSupport; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.mock.MockTransport; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FanoutTransportBrokerTest extends NetworkTestSupport { +@RunWith(Parameterized.class) +public class FanoutTransportBrokerTest extends OpenwireArtemisBaseTest { + public static final boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true"); + + protected ArrayList connections = new ArrayList(); + protected long idGenerator; + protected int msgIdGenerator; + protected int maxWait = 10000; private static final Logger LOG = LoggerFactory.getLogger(FanoutTransportBrokerTest.class); - public ActiveMQDestination destination; - public int deliveryMode; + private EmbeddedJMS server; + private EmbeddedJMS remoteServer; + + private ActiveMQDestination destination; + private int deliveryMode; - public static Test suite() { - return suite(FanoutTransportBrokerTest.class); + @Parameterized.Parameters(name="test-{index}") + public static Collection getParams() + { + return Arrays.asList(new Object[][]{ + {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQQueue("TEST")}, + {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQTopic("TEST")}, + {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQQueue("TEST")}, + {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQTopic("TEST")} + }); } - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); + public FanoutTransportBrokerTest(int deliveryMode, ActiveMQDestination destination) { + this.deliveryMode = deliveryMode; + this.destination = destination; } - public void initCombosForTestPublisherFansout() { - addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destination", new Object[]{new ActiveMQTopic("TEST"), new ActiveMQQueue("TEST")}); + @Before + public void setUp() throws Exception { + Configuration config0 = createConfig(0); + server = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + Configuration config1 = createConfig(1); + remoteServer = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl()); + server.start(); + remoteServer.start(); + + } + @After + public void tearDown() throws Exception { + for (StubConnection conn : connections) { + try { + conn.stop(); + } + catch (Exception e) { + } + } + try { + remoteServer.stop(); + } + catch (Exception e) { + } + try { + server.stop(); + } + catch (Exception e) { + } } + @Test public void testPublisherFansout() throws Exception { - // Start a normal consumer on the local broker StubConnection connection1 = createConnection(); ConnectionInfo connectionInfo1 = createConnectionInfo(); @@ -94,21 +154,28 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport { // Send the message using the fail over publisher. connection3.request(createMessage(producerInfo3, destination, deliveryMode)); - assertNotNull(receiveMessage(connection1)); + Assert.assertNotNull(receiveMessage(connection1)); assertNoMessagesLeft(connection1); - assertNotNull(receiveMessage(connection2)); + Assert.assertNotNull(receiveMessage(connection2)); assertNoMessagesLeft(connection2); } + /* public void initCombosForTestPublisherWaitsForServerToBeUp() { addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); addCombinationValues("destination", new Object[]{new ActiveMQTopic("TEST")}); } +*/ + @Test public void testPublisherWaitsForServerToBeUp() throws Exception { + if (name.getMethodName().contains("test-0") || name.getMethodName().contains("test-2")) { + System.out.println("Discarding invalid test: " + name.getMethodName()); + return; + } // Start a normal consumer on the local broker StubConnection connection1 = createConnection(); ConnectionInfo connectionInfo1 = createConnectionInfo(); @@ -140,19 +207,18 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport { // Send the message using the fail over publisher. connection3.request(createMessage(producerInfo3, destination, deliveryMode)); - assertNotNull(receiveMessage(connection1)); + Assert.assertNotNull(receiveMessage(connection1)); assertNoMessagesLeft(connection1); - assertNotNull(receiveMessage(connection2)); + Assert.assertNotNull(receiveMessage(connection2)); assertNoMessagesLeft(connection2); final CountDownLatch publishDone = new CountDownLatch(1); // The MockTransport is on the remote connection. // Slip in a new transport filter after the MockTransport - MockTransport mt = connection3.getTransport().narrow(MockTransport.class); + MockTransport mt = (MockTransport) connection3.getTransport().narrow(MockTransport.class); mt.install(new TransportFilter(mt.getNext()) { - @Override public void oneway(Object command) throws IOException { LOG.info("Dropping: " + command); // just eat it! to simulate a recent failure. @@ -161,7 +227,6 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport { // Send a message (async) as this will block new Thread() { - @Override public void run() { // Send the message using the fail over publisher. try { @@ -175,7 +240,7 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport { }.start(); // Assert that we block: - assertFalse(publishDone.await(3, TimeUnit.SECONDS)); + Assert.assertFalse(publishDone.await(3, TimeUnit.SECONDS)); // Restart the remote server. State should be re-played and the publish // should continue. @@ -184,26 +249,127 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport { LOG.info("Broker Restarted"); // This should reconnect, and resend - assertTrue(publishDone.await(20, TimeUnit.SECONDS)); + Assert.assertTrue(publishDone.await(20, TimeUnit.SECONDS)); } - @Override protected String getLocalURI() { return "tcp://localhost:61616"; } - @Override protected String getRemoteURI() { return "tcp://localhost:61617"; } protected StubConnection createFanoutConnection() throws Exception { - URI fanoutURI = new URI("fanout://(static://(" + connector.getServer().getConnectURI() + "," + "mock://" + remoteConnector.getServer().getConnectURI() + "))?fanOutQueues=true"); + URI fanoutURI = new URI("fanout://(static://(" + newURI(0) + "," + "mock://" + newURI(1) + "))?fanOutQueues=true"); Transport transport = TransportFactory.connect(fanoutURI); StubConnection connection = new StubConnection(transport); connections.add(connection); return connection; } + + protected StubConnection createConnection() throws Exception { + Transport transport = TransportFactory.connect(new URI(newURI(0))); + StubConnection connection = new StubConnection(transport); + connections.add(connection); + return connection; + } + + protected StubConnection createRemoteConnection() throws Exception { + Transport transport = TransportFactory.connect(new URI(newURI(1))); + StubConnection connection = new StubConnection(transport); + connections.add(connection); + return connection; + } + + protected ConnectionInfo createConnectionInfo() throws Exception { + ConnectionInfo info = new ConnectionInfo(); + info.setConnectionId(new ConnectionId("connection:" + (++idGenerator))); + info.setClientId(info.getConnectionId().getValue()); + return info; + } + + protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception { + SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator); + return info; + } + + protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, + ActiveMQDestination destination) throws Exception { + ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator); + info.setBrowser(false); + info.setDestination(destination); + info.setPrefetchSize(1000); + info.setDispatchAsync(false); + return info; + } + + protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception { + ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator); + return info; + } + + protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT); + return message; + } + + protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator)); + message.setDestination(destination); + message.setPersistent(false); + try { + message.setText("Test Message Payload."); + } + catch (MessageNotWriteableException e) { + } + return message; + } + + public Message receiveMessage(StubConnection connection) throws InterruptedException { + return receiveMessage(connection, maxWait); + } + + public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException { + while (true) { + Object o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS); + + if (o == null) { + return null; + } + if (o instanceof MessageDispatch) { + + MessageDispatch dispatch = (MessageDispatch) o; + if (dispatch.getMessage() == null) { + return null; + } + dispatch.setMessage(dispatch.getMessage().copy()); + dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter()); + return dispatch.getMessage(); + } + } + } + + protected void assertNoMessagesLeft(StubConnection connection) throws InterruptedException { + long wait = FAST_NO_MESSAGE_LEFT_ASSERT ? 0 : maxWait; + while (true) { + Object o = connection.getDispatchQueue().poll(wait, TimeUnit.MILLISECONDS); + if (o == null) { + return; + } + if (o instanceof MessageDispatch && ((MessageDispatch) o).getMessage() != null) { + Assert.fail("Received a message: " + ((MessageDispatch) o).getMessage().getMessageId()); + } + } + } + protected void restartRemoteBroker() throws Exception { + remoteServer.stop(); + Thread.sleep(2000); + remoteServer.start(); + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java index 619190f..01f6963 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java @@ -173,11 +173,8 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra } public void testClientHang() throws Exception { - - // // Manually create a client transport so that it does not send KeepAlive - // packets. - // this should simulate a client hang. + // packets. this should simulate a client hang. clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null); clientTransport.setTransportListener(new TransportListener() { @Override @@ -205,9 +202,10 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra public void transportResumed() { } }); + clientTransport.start(); WireFormatInfo info = new WireFormatInfo(); - info.setVersion(OpenWireFormat.DEFAULT_VERSION); + info.setVersion(OpenWireFormat.DEFAULT_LEGACY_VERSION); info.setMaxInactivityDuration(1000); clientTransport.oneway(info); @@ -242,19 +240,17 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra * @throws URISyntaxException */ public void initCombosForTestNoClientHangWithServerBlock() throws Exception { - startClient(); - addCombinationValues("clientInactivityLimit", new Object[]{Long.valueOf(1000)}); - addCombinationValues("serverInactivityLimit", new Object[]{Long.valueOf(1000)}); - addCombinationValues("serverRunOnCommand", new Object[]{new Runnable() { + addCombinationValues("clientInactivityLimit", new Object[] {Long.valueOf(1000)}); + addCombinationValues("serverInactivityLimit", new Object[] {Long.valueOf(1000)}); + addCombinationValues("serverRunOnCommand", new Object[] {new Runnable() { @Override public void run() { try { LOG.info("Sleeping"); Thread.sleep(4000); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { } } }}); @@ -272,5 +268,4 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra assertEquals(0, clientErrorCount.get()); assertEquals(0, serverErrorCount.get()); } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java index 9ae82ac..9d3c347 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java @@ -23,7 +23,6 @@ import junit.framework.Test; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.BrokerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -183,15 +182,6 @@ public class TransportUriTest extends EmbeddedBrokerTestSupport { super.tearDown(); } - @Override - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - answer.setUseJmx(false); - answer.setPersistent(isPersistent()); - answer.addConnector(bindAddress); - return answer; - } - public static Test suite() { return suite(TransportUriTest.class); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java deleted file mode 100644 index 3791848..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.vm; - -import java.net.URI; - -import javax.jms.Connection; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerRegistry; - -public class VMTransportBrokerNameTest extends TestCase { - - private static final String MY_BROKER = "myBroker"; - final String vmUrl = "vm:(broker:(tcp://localhost:61616)/" + MY_BROKER + "?persistent=false)"; - - public void testBrokerName() throws Exception { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(vmUrl)); - ActiveMQConnection c1 = (ActiveMQConnection) cf.createConnection(); - assertTrue("Transport has name in it: " + c1.getTransport(), c1.getTransport().toString().contains(MY_BROKER)); - - // verify Broker is there with name - ActiveMQConnectionFactory cfbyName = new ActiveMQConnectionFactory(new URI("vm://" + MY_BROKER + "?create=false")); - Connection c2 = cfbyName.createConnection(); - - assertNotNull(BrokerRegistry.getInstance().lookup(MY_BROKER)); - assertEquals(BrokerRegistry.getInstance().findFirst().getBrokerName(), MY_BROKER); - assertEquals(BrokerRegistry.getInstance().getBrokers().size(), 1); - - c1.close(); - c2.close(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java deleted file mode 100644 index 52e4b88..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.vm; - -import junit.framework.Test; - -import org.apache.activemq.transport.TransportBrokerTestSupport; - -public class VMTransportBrokerTest extends TransportBrokerTestSupport { - - @Override - protected String getBindLocation() { - return "vm://localhost"; - } - - public static Test suite() { - return suite(VMTransportBrokerTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java deleted file mode 100644 index dbc7f29..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.vm; - -import java.net.URI; -import java.net.URISyntaxException; - -import javax.jms.DeliveryMode; - -import org.apache.activemq.broker.BrokerRegistry; -import org.apache.activemq.broker.BrokerTestSupport; -import org.apache.activemq.broker.StubConnection; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.SessionInfo; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportFactory; -import org.apache.activemq.util.IOExceptionSupport; - -/** - * Used to see if the VM transport starts an embedded broker on demand. - */ -public class VMTransportEmbeddedBrokerTest extends BrokerTestSupport { - - public static void main(String[] args) { - junit.textui.TestRunner.run(VMTransportEmbeddedBrokerTest.class); - } - - public void testConsumerPrefetchAtOne() throws Exception { - - // Make sure the broker is created due to the connection being started. - assertNull(BrokerRegistry.getInstance().lookup("localhost")); - StubConnection connection = createConnection(); - assertNotNull(BrokerRegistry.getInstance().lookup("localhost")); - - // Start a producer and consumer - ConnectionInfo connectionInfo = createConnectionInfo(); - SessionInfo sessionInfo = createSessionInfo(connectionInfo); - ProducerInfo producerInfo = createProducerInfo(sessionInfo); - connection.send(connectionInfo); - connection.send(sessionInfo); - connection.send(producerInfo); - - ActiveMQQueue destination = new ActiveMQQueue("TEST"); - - ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); - consumerInfo.setPrefetchSize(1); - connection.send(consumerInfo); - - // Send 2 messages to the broker. - connection.send(createMessage(producerInfo, destination, DeliveryMode.NON_PERSISTENT)); - connection.send(createMessage(producerInfo, destination, DeliveryMode.NON_PERSISTENT)); - - // Make sure only 1 message was delivered. - Message m = receiveMessage(connection); - assertNotNull(m); - assertNoMessagesLeft(connection); - - // Make sure the broker is shutdown when the connection is stopped. - assertNotNull(BrokerRegistry.getInstance().lookup("localhost")); - connection.stop(); - assertNull(BrokerRegistry.getInstance().lookup("localhost")); - } - - @Override - protected void setUp() throws Exception { - // Don't call super since it manually starts up a broker. - } - - @Override - protected void tearDown() throws Exception { - // Don't call super since it manually tears down a broker. - } - - @Override - protected StubConnection createConnection() throws Exception { - try { - Transport transport = TransportFactory.connect(new URI("vm://localhost?broker.persistent=false")); - StubConnection connection = new StubConnection(transport); - return connection; - } - catch (URISyntaxException e) { - throw IOExceptionSupport.create(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java deleted file mode 100644 index 2268048..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java +++ /dev/null @@ -1,937 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.vm; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.net.URI; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.activemq.command.BaseCommand; -import org.apache.activemq.command.ExceptionResponse; -import org.apache.activemq.command.Response; -import org.apache.activemq.command.ShutdownInfo; -import org.apache.activemq.state.CommandVisitor; -import org.apache.activemq.transport.FutureResponse; -import org.apache.activemq.transport.MutexTransport; -import org.apache.activemq.transport.ResponseCallback; -import org.apache.activemq.transport.ResponseCorrelator; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportDisposedIOException; -import org.apache.activemq.transport.TransportListener; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class VMTransportThreadSafeTest { - - private static final Logger LOG = LoggerFactory.getLogger(VMTransportThreadSafeTest.class); - - private final static String location1 = "vm://transport1"; - private final static String location2 = "vm://transport2"; - - private final ConcurrentLinkedQueue localReceived = new ConcurrentLinkedQueue<>(); - private final ConcurrentLinkedQueue remoteReceived = new ConcurrentLinkedQueue<>(); - - private class DummyCommand extends BaseCommand { - - public final int sequenceId; - - public DummyCommand() { - this.sequenceId = 0; - } - - public DummyCommand(int id) { - this.sequenceId = id; - } - - @Override - public Response visit(CommandVisitor visitor) throws Exception { - return null; - } - - @Override - public byte getDataStructureType() { - return 42; - } - } - - private class VMTestTransportListener implements TransportListener { - - protected final Queue received; - - public boolean shutdownReceived = false; - - public VMTestTransportListener(Queue receiveQueue) { - this.received = receiveQueue; - } - - @Override - public void onCommand(Object command) { - - if (command instanceof ShutdownInfo) { - shutdownReceived = true; - } - else { - received.add((DummyCommand) command); - } - } - - @Override - public void onException(IOException error) { - } - - @Override - public void transportInterupted() { - } - - @Override - public void transportResumed() { - } - } - - private class VMResponderTransportListener implements TransportListener { - - protected final Queue received; - - private final Transport peer; - - public VMResponderTransportListener(Queue receiveQueue, Transport peer) { - this.received = receiveQueue; - this.peer = peer; - } - - @Override - public void onCommand(Object command) { - - if (command instanceof ShutdownInfo) { - return; - } - else { - received.add((DummyCommand) command); - - if (peer != null) { - try { - peer.oneway(command); - } - catch (IOException e) { - } - } - } - } - - @Override - public void onException(IOException error) { - } - - @Override - public void transportInterupted() { - } - - @Override - public void transportResumed() { - } - } - - private class SlowVMTestTransportListener extends VMTestTransportListener { - - private final TimeUnit delayUnit; - private final long delay; - - public SlowVMTestTransportListener(Queue receiveQueue) { - this(receiveQueue, 10, TimeUnit.MILLISECONDS); - } - - public SlowVMTestTransportListener(Queue receiveQueue, long delay, TimeUnit delayUnit) { - super(receiveQueue); - - this.delay = delay; - this.delayUnit = delayUnit; - } - - @Override - public void onCommand(Object command) { - super.onCommand(command); - try { - delayUnit.sleep(delay); - } - catch (InterruptedException e) { - } - } - } - - private class GatedVMTestTransportListener extends VMTestTransportListener { - - private final CountDownLatch gate; - - public GatedVMTestTransportListener(Queue receiveQueue) { - this(receiveQueue, new CountDownLatch(1)); - } - - public GatedVMTestTransportListener(Queue receiveQueue, CountDownLatch gate) { - super(receiveQueue); - - this.gate = gate; - } - - @Override - public void onCommand(Object command) { - super.onCommand(command); - try { - gate.await(); - } - catch (InterruptedException e) { - } - } - } - - private void assertMessageAreOrdered(ConcurrentLinkedQueue queue) { - int lastSequenceId = 0; - for (DummyCommand command : queue) { - int id = command.sequenceId; - assertTrue("Last id: " + lastSequenceId + " should be less than current id: " + id, id > lastSequenceId); - } - } - - @Before - public void setUp() throws Exception { - localReceived.clear(); - remoteReceived.clear(); - } - - @After - public void tearDown() throws Exception { - } - - @Test(timeout = 60000) - public void testStartWthoutListenerIOE() throws Exception { - - final VMTransport local = new VMTransport(new URI(location1)); - final VMTransport remote = new VMTransport(new URI(location2)); - - local.setPeer(remote); - remote.setPeer(local); - - remote.setTransportListener(new VMTestTransportListener(localReceived)); - - try { - local.start(); - fail("Should have thrown an IOExcoption"); - } - catch (IOException e) { - } - } - - @Test(timeout = 60000) - public void testOnewayOnStoppedTransportTDE() throws Exception { - - final VMTransport local = new VMTransport(new URI(location1)); - final VMTransport remote = new VMTransport(new URI(location2)); - - local.setPeer(remote); - remote.setPeer(local); - - local.setTransportListener(new VMTestTransportListener(localReceived)); - remote.setTransportListener(new VMTestTransportListener(remoteReceived)); - - local.start(); - local.stop(); - - try { - local.oneway(new DummyCommand()); - fail("Should have thrown a TransportDisposedException"); - } - catch (TransportDisposedIOException e) { - } - } - - @Test(timeout = 60000) - public void testStopSendsShutdownToPeer() throws Exception { - - final VMTransport local = new VMTransport(new URI(location1)); - final VMTransport remote = new VMTransport(new URI(location2)); - - local.setPeer(remote); - remote.setPeer(local); - - final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived); - - local.setTransportListener(new VMTestTransportListener(localReceived)); - remote.setTransportListener(remoteListener); - - local.start(); - local.stop(); - - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return remoteListener.shutdownReceived; - } - })); - } - - @Test(timeout = 60000) - public void testRemoteStopSendsExceptionToPendingRequests() throws Exception { - - final VMTransport local = new VMTransport(new URI(location1)); - final VMTransport remote = new VMTransport(new URI(location2)); - - local.setPeer(remote); - remote.setPeer(local); - - final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived); - remote.setTransportListener(remoteListener); - remote.start(); - - final Response[] answer = new Response[1]; - ResponseCorrelator responseCorrelator = new ResponseCorrelator(local); - responseCorrelator.setTransportListener(new VMTestTransportListener(localReceived)); - responseCorrelator.start(); - responseCorrelator.asyncRequest(new DummyCommand(), new ResponseCallback() { - @Override - public void onCompletion(FutureResponse resp) { - try { - answer[0] = resp.getResult(); - } - catch (IOException e) { - e.printStackTrace(); - } - } - }); - - // simulate broker stop - remote.stop(); - - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.info("answer: " + answer[0]); - return answer[0] instanceof ExceptionResponse && ((ExceptionResponse) answer[0]).getException() instanceof TransportDisposedIOException; - } - })); - - local.stop(); - } - - @Test(timeout = 60000) - public void testMultipleStartsAndStops() throws Exception { - - final VMTransport local = new VMTransport(new URI(location1)); - final VMTransport remote = new VMTransport(new URI(location2)); - - local.setPeer(remote); - remote.setPeer(local); - - local.setTransportListener(new VMTestTransportListener(localReceived)); - remote.setTransportListener(new VMTestTransportListener(remoteReceived)); - - local.start(); - remote.start(); - - local.start(); - remote.start(); - - for (int i = 0; i < 100; ++i) { - local.oneway(new DummyCommand()); - } - - for (int i = 0; i < 100; ++i) { - remote.oneway(new DummyCommand()); - } - - local.start(); - remote.start(); - - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return remoteReceived.size() == 100; - } - })); - - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return localReceived.size() == 100; - } - })); - - local.stop(); - local.stop(); - remote.stop(); - remote.stop(); - } - - @Test(timeout = 60000) - public void testStartWithPeerNotStartedEnqueusCommandsNonAsync() throws Exception { - doTestStartWithPeerNotStartedEnqueusCommands(false); - } - - private void doTestStartWithPeerNotStartedEnqueusCommands(boolean async) throws Exception { - - final VMTransport local = new VMTransport(new URI(location1)); - final VMTransport remote = new VMTransport(new URI(location2)); - - remote.setAsync(async); - - local.setPeer(remote); - remote.setPeer(local); - - local.setTransportListener(new VMTestTransportListener(localReceived)); - remote.setTransportListener(new VMTestTransportListener(remoteReceived)); - - local.start(); - - for (int i = 0; i < 100; ++i) { - local.oneway(new DummyCommand()); - } - - assertEquals(100, remote.getMessageQueue().size()); - - remote.start(); - - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return remoteReceived.size() == 100; - } - })); - - local.stop(); - remote.stop(); - } - - @Test(timeout = 60000) - public void testBlockedOnewayEnqeueAandStopTransportAsync() throws Exception { - doTestBlockedOnewayEnqeueAandStopTransport(true); - } - - @Test(timeout = 60000) - public void testBlockedOnewayEnqeueAandStopTransportNonAsync() throws Exception { - doTestBlockedOnewayEnqeueAandStopTransport(false); - } - - private void doTestBlockedOnewayEnqeueAandStopTransport(boolean async) throws Exception { - - final VMTransport local = new VMTransport(new URI(location1)); - final VMTransport remote = new VMTransport(new URI(location2)); - - final AtomicInteger sequenceId = new AtomicInteger(); - - remote.setAsync(async); - remote.setAsyncQueueDepth(99); - - local.setPeer(remote); - remote.setPeer(local); - - local.setTransportListener(new VMTestTransportListener(localReceived)); - remote.setTransportListener(new VMTestTransportListener(remoteReceived)); - - local.start(); - - Thread t = new Thread(new Runnable() { - - @Override - public void run() { - for (int i = 0; i < 100; ++i) { - try { - local.oneway(new DummyCommand(sequenceId.incrementAndGet())); - } - catch (Exception e) { - } - } - - } - }); - t.start(); - - LOG.debug("Started async delivery, wait for remote's queue to fill up"); - - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return remote.getMessageQueue().remainingCapacity() == 0; - } - })); - - LOG.debug("Remote messageQ is full, start it and stop all"); - - remote.start(); - local.stop(); - remote.stop(); - } - - @Test(timeout = 60000) - public void testBlockedOnewayEnqeueWhileStartedDetectsStop() throws Exception { - final VMTransport local = new VMTransport(new URI(location1)); - final VMTransport remote = new VMTransport(new URI(location2)); - - final AtomicInteger sequenceId = new AtomicInteger(); - - remote.setAsync(true); - remote.setAsyncQueueDepth(2); - - local.setPeer(remote); - remote.setPeer(local); - - local.setTransportListener(new VMTestTransportListener(localReceived)); - remote.setTransportListener(new GatedVMTestTransportListener(remoteReceived)); - - local.start(); - remote.start(); - - Thread t = new Thread(new Runnable() { - - @Override - public void run() { - for (int i = 0; i < 3; ++i) { - try { - local.oneway(new DummyCommand(sequenceId.incrementAndGet())); - } - catch (Exception e) { - } - } - - } - }); - t.start(); - - LOG.debug("Started async delivery, wait for remote's queue to fill up"); - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return remote.getMessageQueue().remainingCapacity() == 0; - } - })); - - LOG.debug("Starting async gate open."); - Thread gateman = new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(100); - } - catch (InterruptedException e) { - } - ((GatedVMTestTransportListener) remote.getTransportListener()).gate.countDown(); - } - }); - gateman.start(); - - remote.stop(); - local.stop(); - - assertEquals(1, remoteReceived.size()); - assertMessageAreOrdered(remoteReceived); - } - - @Test(timeout = 60000) - public void testStopWhileStartingAsyncWithNoAsyncLimit() throws Exception { - // In the async case the iterate method should see that we are stopping and - // drop out before we dispatch all the messages but it should get at least 49 since - // the stop thread waits 500 mills and the listener is waiting 10 mills on each receive. - doTestStopWhileStartingWithNoAsyncLimit(true, 49); - } - - @Test(timeout = 60000) - public void testStopWhileStartingNonAsyncWithNoAsyncLimit() throws Exception { - // In the non-async case the start dispatches all messages up front and then continues on - doTestStopWhileStartingWithNoAsyncLimit(false, 100); - } - - private void doTestStopWhileStartingWithNoAsyncLimit(boolean async, final int expect) throws Exception { - - final VMTransport local = new VMTransport(new URI(location1)); - final VMTransport remote = new VMTransport(new URI(location2)); - - remote.setAsync(async); - - local.setPeer(remote); - remote.setPeer(local); - - local.setTransportListener(new VMTestTransportListener(localReceived)); - remote.setTransportListener(new SlowVMTestTransportListener(remoteReceived)); - - local.start(); - - for (int i = 0; i < 100; ++i) { - local.oneway(new DummyCommand(i)); - } - - Thread t = new Thread(new Runnable() { - - @Override - public void run() { - try { - Thread.sleep(1000); - remote.stop(); - } - catch (Exception e) { - } - } - }); - - remote.start(); - - t.start(); - - assertTrue("Remote should receive: " + expect + ", commands but got: " + remoteReceived.size(), Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return remoteReceived.size() >= expect; - } - })); - - LOG.debug("Remote listener received " + remoteReceived.size() + " messages"); - - local.stop(); - - assertTrue("Remote transport never was disposed.", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return remote.isDisposed(); - } - })); - } - - @Test(timeout = 120000) - public void TestTwoWayMessageThroughPutSync() throws Exception { - - long totalTimes = 0; - final long executions = 20; - - for (int i = 0; i < 20; ++i) { - totalTimes += doTestTwoWayMessageThroughPut(false); - } - - LOG.info("Total time of one way sync send throughput test: " + (totalTimes / executions) + "ms"); - } - - @Test(timeout = 120000) - public void TestTwoWayMessageThroughPutAsnyc() throws Exception { - - long totalTimes = 0; - final long executions = 50; - - for (int i = 0; i < executions; ++i) { - totalTimes += doTestTwoWayMessageThroughPut(false); - } - - LOG.info("Total time of one way async send throughput test: " + (totalTimes / executions) + "ms"); - } - - private long doTestTwoWayMessageThroughPut(boolean async) throws Exception { - - final VMTransport local = new VMTransport(new URI(location1)); - final VMTransport remote = new VMTransport(new URI(location2)); - - final AtomicInteger sequenceId = new AtomicInteger(); - - remote.setAsync(async); - - local.setPeer(remote); - remote.setPeer(local); - - local.setTransportListener(new VMTestTransportListener(localReceived)); - remote.setTransportListener(new VMTestTransportListener(remoteReceived)); - - final int messageCount = 200000; - - local.start(); - remote.start(); - - long startTime = System.currentTimeMillis(); - - Thread localSend = new Thread(new Runnable() { - - @Override - public void run() { - for (int i = 0; i < messageCount; ++i) { - try { - local.oneway(new DummyCommand(sequenceId.incrementAndGet())); - } - catch (Exception e) { - } - } - - } - }); - - Thread remoteSend = new Thread(new Runnable() { - - @Override - public void run() { - for (int i = 0; i < messageCount; ++i) { - try { - remote.oneway(new DummyCommand(sequenceId.incrementAndGet())); - } - catch (Exception e) { - } - } - - } - }); - - localSend.start(); - remoteSend.start(); - - // Wait for both to finish and then check that each side go the correct amount - localSend.join(); - remoteSend.join(); - - long endTime = System.currentTimeMillis(); - - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return remoteReceived.size() == messageCount; - } - })); - - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return localReceived.size() == messageCount; - } - })); - - LOG.debug("All messages sent,stop all"); - - local.stop(); - remote.stop(); - - localReceived.clear(); - remoteReceived.clear(); - - return endTime - startTime; - } - - @Test(timeout = 120000) - public void TestOneWayMessageThroughPutSync() throws Exception { - - long totalTimes = 0; - final long executions = 30; - - for (int i = 0; i < executions; ++i) { - totalTimes += doTestOneWayMessageThroughPut(false); - } - - LOG.info("Total time of one way sync send throughput test: " + (totalTimes / executions) + "ms"); - } - - @Test(timeout = 120000) - public void TestOneWayMessageThroughPutAsnyc() throws Exception { - - long totalTimes = 0; - final long executions = 20; - - for (int i = 0; i < 20; ++i) { - totalTimes += doTestOneWayMessageThroughPut(true); - } - - LOG.info("Total time of one way async send throughput test: " + (totalTimes / executions) + "ms"); - } - - private long doTestOneWayMessageThroughPut(boolean async) throws Exception { - - final VMTransport local = new VMTransport(new URI(location1)); - final VMTransport remote = new VMTransport(new URI(location2)); - - final AtomicInteger sequenceId = new AtomicInteger(); - - remote.setAsync(async); - - local.setPeer(remote); - remote.setPeer(local); - - local.setTransportListener(new VMTestTransportListener(localReceived)); - remote.setTransportListener(new VMTestTransportListener(remoteReceived)); - - final int messageCount = 100000; - - local.start(); - remote.start(); - - long startTime = System.currentTimeMillis(); - - Thread localSend = new Thread(new Runnable() { - - @Override - public void run() { - for (int i = 0; i < messageCount; ++i) { - try { - local.oneway(new DummyCommand(sequenceId.incrementAndGet())); - } - catch (Exception e) { - } - } - - } - }); - - localSend.start(); - - // Wait for both to finish and then check that each side go the correct amount - localSend.join(); - - long endTime = System.currentTimeMillis(); - - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return remoteReceived.size() == messageCount; - } - })); - - LOG.debug("All messages sent,stop all"); - - local.stop(); - remote.stop(); - - localReceived.clear(); - remoteReceived.clear(); - - return endTime - startTime; - } - - @Test(timeout = 120000) - public void testTwoWayTrafficWithMutexTransportSync1() throws Exception { - - for (int i = 0; i < 20; ++i) { - doTestTwoWayTrafficWithMutexTransport(false, false); - } - } - - @Test(timeout = 120000) - public void testTwoWayTrafficWithMutexTransportSync2() throws Exception { - - for (int i = 0; i < 20; ++i) { - doTestTwoWayTrafficWithMutexTransport(true, false); - } - } - - @Test(timeout = 120000) - public void testTwoWayTrafficWithMutexTransportSync3() throws Exception { - - for (int i = 0; i < 20; ++i) { - doTestTwoWayTrafficWithMutexTransport(false, true); - } - } - - @Test(timeout = 120000) - public void testTwoWayTrafficWithMutexTransportSync4() throws Exception { - - for (int i = 0; i < 20; ++i) { - doTestTwoWayTrafficWithMutexTransport(false, false); - } - } - - public void doTestTwoWayTrafficWithMutexTransport(boolean localAsync, boolean remoteAsync) throws Exception { - - final VMTransport vmlocal = new VMTransport(new URI(location1)); - final VMTransport vmremote = new VMTransport(new URI(location2)); - - final MutexTransport local = new MutexTransport(vmlocal); - final MutexTransport remote = new MutexTransport(vmremote); - - final AtomicInteger sequenceId = new AtomicInteger(); - - vmlocal.setAsync(localAsync); - vmremote.setAsync(remoteAsync); - - vmlocal.setPeer(vmremote); - vmremote.setPeer(vmlocal); - - local.setTransportListener(new VMTestTransportListener(localReceived)); - remote.setTransportListener(new VMResponderTransportListener(remoteReceived, remote)); - - final int messageCount = 200000; - - Thread localSend = new Thread(new Runnable() { - - @Override - public void run() { - for (int i = 0; i < messageCount; ++i) { - try { - local.oneway(new DummyCommand(sequenceId.incrementAndGet())); - } - catch (Exception e) { - } - } - } - }); - - Thread remoteSend = new Thread(new Runnable() { - - @Override - public void run() { - for (int i = 0; i < messageCount; ++i) { - try { - remote.oneway(new DummyCommand(sequenceId.incrementAndGet())); - } - catch (Exception e) { - } - } - } - }); - - localSend.start(); - remoteSend.start(); - - Thread.sleep(10); - - local.start(); - remote.start(); - - // Wait for both to finish and then check that each side go the correct amount - localSend.join(); - remoteSend.join(); - - assertTrue("Remote should have received (" + messageCount + ") but got ()" + remoteReceived.size(), Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return remoteReceived.size() == messageCount; - } - })); - - assertTrue("Local should have received (" + messageCount * 2 + ") but got ()" + localReceived.size(), Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return localReceived.size() == messageCount * 2; - } - })); - - LOG.debug("All messages sent,stop all"); - - local.stop(); - remote.stop(); - - localReceived.clear(); - remoteReceived.clear(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java deleted file mode 100644 index dd14d67..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.vm; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.net.URI; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.JMSException; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerRegistry; -import org.apache.activemq.broker.BrokerService; -import org.junit.After; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class VMTransportWaitForTest { - - static final Logger LOG = LoggerFactory.getLogger(VMTransportWaitForTest.class); - - private static final int WAIT_TIME = 20000; - private static final int SHORT_WAIT_TIME = 5000; - - private static final String VM_BROKER_URI_NO_WAIT = "vm://localhost?broker.persistent=false&create=false"; - - private static final String VM_BROKER_URI_WAIT_FOR_START = VM_BROKER_URI_NO_WAIT + "&waitForStart=" + WAIT_TIME; - - private static final String VM_BROKER_URI_SHORT_WAIT_FOR_START = VM_BROKER_URI_NO_WAIT + "&waitForStart=" + SHORT_WAIT_TIME; - - CountDownLatch started = new CountDownLatch(1); - CountDownLatch gotConnection = new CountDownLatch(1); - - @After - public void after() throws IOException { - BrokerRegistry.getInstance().unbind("localhost"); - } - - @Test(timeout = 90000) - public void testWaitFor() throws Exception { - try { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_NO_WAIT)); - cf.createConnection(); - fail("expect broker not exist exception"); - } - catch (JMSException expectedOnNoBrokerAndNoCreate) { - } - - // spawn a thread that will wait for an embedded broker to start via - // vm://.. - Thread t = new Thread("ClientConnectionThread") { - @Override - public void run() { - try { - started.countDown(); - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_WAIT_FOR_START)); - cf.createConnection(); - gotConnection.countDown(); - } - catch (Exception e) { - e.printStackTrace(); - fail("unexpected exception: " + e); - } - } - }; - t.start(); - started.await(20, TimeUnit.SECONDS); - Thread.yield(); - assertFalse("has not got connection", gotConnection.await(2, TimeUnit.SECONDS)); - - BrokerService broker = new BrokerService(); - broker.setPersistent(false); - broker.start(); - assertTrue("has got connection", gotConnection.await(5, TimeUnit.SECONDS)); - broker.stop(); - } - - @Test(timeout = 90000) - public void testWaitForNoBrokerInRegistry() throws Exception { - - long startTime = System.currentTimeMillis(); - - try { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START)); - cf.createConnection(); - fail("expect broker not exist exception"); - } - catch (JMSException expectedOnNoBrokerAndNoCreate) { - } - - long endTime = System.currentTimeMillis(); - - LOG.info("Total wait time was: {}", endTime - startTime); - assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100); - } - - @Test(timeout = 90000) - public void testWaitForNotStartedButInRegistry() throws Exception { - - BrokerService broker = new BrokerService(); - broker.setPersistent(false); - BrokerRegistry.getInstance().bind("localhost", broker); - - long startTime = System.currentTimeMillis(); - - try { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START)); - cf.createConnection(); - fail("expect broker not exist exception"); - } - catch (JMSException expectedOnNoBrokerAndNoCreate) { - } - - long endTime = System.currentTimeMillis(); - - LOG.info("Total wait time was: {}", endTime - startTime); - assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java deleted file mode 100644 index 2b97cff..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.vm; - -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.bugs.embedded.ThreadExplorer; -import org.apache.activemq.network.NetworkConnector; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class VmTransportNetworkBrokerTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(VmTransportNetworkBrokerTest.class); - - private static final String VM_BROKER_URI = "vm://localhost?create=false"; - - CountDownLatch started = new CountDownLatch(1); - CountDownLatch gotConnection = new CountDownLatch(1); - - public void testNoThreadLeak() throws Exception { - - // with VMConnection and simple discovery network connector - int originalThreadCount = Thread.activeCount(); - LOG.debug(ThreadExplorer.show("threads at beginning")); - - BrokerService broker = new BrokerService(); - broker.setDedicatedTaskRunner(true); - broker.setPersistent(false); - broker.addConnector("tcp://localhost:61616"); - NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://wrongHostname1:61617,tcp://wrongHostname2:61618)?useExponentialBackOff=false"); - networkConnector.setDuplex(true); - broker.start(); - - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI)); - Connection connection = cf.createConnection("system", "manager"); - connection.start(); - - // let it settle - TimeUnit.SECONDS.sleep(5); - - int threadCountAfterStart = Thread.activeCount(); - TimeUnit.SECONDS.sleep(30); - int threadCountAfterSleep = Thread.activeCount(); - - assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" + threadCountAfterStart + " threadCountAfterSleep=" + threadCountAfterSleep, threadCountAfterSleep < threadCountAfterStart + 8); - - connection.close(); - broker.stop(); - broker.waitUntilStopped(); - - // testNoDanglingThreadsAfterStop with tcp transport - broker = new BrokerService(); - broker.setSchedulerSupport(true); - broker.setDedicatedTaskRunner(true); - broker.setPersistent(false); - broker.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000"); - broker.start(); - - cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000"); - connection = cf.createConnection("system", "manager"); - connection.start(); - connection.close(); - broker.stop(); - broker.waitUntilStopped(); - - // let it settle - TimeUnit.SECONDS.sleep(5); - - // get final threads but filter out any daemon threads that the JVM may have created. - Thread[] threads = filterDaemonThreads(ThreadExplorer.listThreads()); - int threadCountAfterStop = threads.length; - - // lets see the thread counts at INFO level so they are always in the test log - LOG.info(ThreadExplorer.show("active after stop")); - LOG.info("originalThreadCount=" + originalThreadCount + " threadCountAfterStop=" + threadCountAfterStop); - - assertTrue("Threads are leaking: " + - ThreadExplorer.show("active after stop") + - ". originalThreadCount=" + - originalThreadCount + - " threadCountAfterStop=" + - threadCountAfterStop, threadCountAfterStop <= originalThreadCount); - } - - /** - * Filters any daemon threads from the thread list. - * - * Thread counts before and after the test should ideally be equal. - * However there is no guarantee that the JVM does not create any - * additional threads itself. - * E.g. on Mac OSX there is a JVM internal thread called - * "Poller SunPKCS11-Darwin" created after the test go started and - * under the main thread group. - * When debugging tests in Eclipse another so called "Reader" thread - * is created by Eclipse. - * So we cannot assume that the JVM does not create additional threads - * during the test. However for the time being we assume that any such - * additionally created threads are daemon threads. - * - * @param threads - the array of threads to parse - * @return a new array with any daemon threads removed - */ - public Thread[] filterDaemonThreads(Thread[] threads) throws Exception { - - List threadList = new ArrayList<>(Arrays.asList(threads)); - - // Can't use an Iterator as it would raise a - // ConcurrentModificationException when trying to remove an element - // from the list, so using standard walk through - for (int i = 0; i < threadList.size(); i++) { - - Thread thread = threadList.get(i); - LOG.debug("Inspecting thread " + thread.getName()); - if (thread.isDaemon()) { - LOG.debug("Removing deamon thread."); - threadList.remove(thread); - Thread.sleep(100); - - } - } - LOG.debug("Converting list back to Array"); - return threadList.toArray(new Thread[0]); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java new file mode 100644 index 0000000..03e0d2e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.io.File; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class LockFileTest { + + @Test + public void testNoDeleteOnUnlockIfNotLocked() throws Exception { + + File lockFile = new File(IOHelper.getDefaultDataDirectory(), "lockToTest1"); + IOHelper.mkdirs(lockFile.getParentFile()); + lockFile.createNewFile(); + + LockFile underTest = new LockFile(lockFile, true); + + underTest.lock(); + + lockFile.delete(); + + assertFalse("no longer valid", underTest.keepAlive()); + + // a slave gets in + lockFile.createNewFile(); + + underTest.unlock(); + + assertTrue("file still exists after unlock when not locked", lockFile.exists()); + + } + + @Test + public void testDeleteOnUnlockIfLocked() throws Exception { + + File lockFile = new File(IOHelper.getDefaultDataDirectory(), "lockToTest2"); + IOHelper.mkdirs(lockFile.getParentFile()); + lockFile.createNewFile(); + + LockFile underTest = new LockFile(lockFile, true); + + underTest.lock(); + + assertTrue("valid", underTest.keepAlive()); + + underTest.unlock(); + + assertFalse("file deleted on unlock", lockFile.exists()); + + } +}