Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 617D41971C for ; Mon, 4 Apr 2016 16:09:14 +0000 (UTC) Received: (qmail 5445 invoked by uid 500); 4 Apr 2016 16:09:14 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 5260 invoked by uid 500); 4 Apr 2016 16:09:13 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 3709 invoked by uid 99); 4 Apr 2016 16:09:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Apr 2016 16:09:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9DFD4E0615; Mon, 4 Apr 2016 16:09:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbertram@apache.org To: commits@activemq.apache.org Date: Mon, 04 Apr 2016 16:09:42 -0000 Message-Id: <3739819c05bc4d9cb928dfe4c88a326b@git.apache.org> In-Reply-To: <6f87f3c2123e429bbe038ffe6eeaf089@git.apache.org> References: <6f87f3c2123e429bbe038ffe6eeaf089@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [33/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java new file mode 100644 index 0000000..f47620f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicBoolean; + +import junit.framework.TestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Enforces a test case to run for only an allotted time to prevent them from + * hanging and breaking the whole testing. + * + * + */ + +public abstract class AutoFailTestSupport extends TestCase { + public static final int EXIT_SUCCESS = 0; + public static final int EXIT_ERROR = 1; + private static final Logger LOG = LoggerFactory.getLogger(AutoFailTestSupport.class); + + private long maxTestTime = 5 * 60 * 1000; // 5 mins by default + private Thread autoFailThread; + + private boolean verbose = true; + private boolean useAutoFail; // Disable auto fail by default + private AtomicBoolean isTestSuccess; + + protected void setUp() throws Exception { + // Runs the auto fail thread before performing any setup + if (isAutoFail()) { + startAutoFailThread(); + } + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + + // Stops the auto fail thread only after performing any clean up + stopAutoFailThread(); + } + + /** + * Manually start the auto fail thread. To start it automatically, just set + * the auto fail to true before calling any setup methods. As a rule, this + * method is used only when you are not sure, if the setUp and tearDown + * method is propagated correctly. + */ + public void startAutoFailThread() { + setAutoFail(true); + isTestSuccess = new AtomicBoolean(false); + autoFailThread = new Thread(new Runnable() { + public void run() { + try { + // Wait for test to finish succesfully + Thread.sleep(getMaxTestTime()); + } catch (InterruptedException e) { + // This usually means the test was successful + } finally { + // Check if the test was able to tear down succesfully, + // which usually means, it has finished its run. + if (!isTestSuccess.get()) { + LOG.error("Test case has exceeded the maximum allotted time to run of: " + getMaxTestTime() + " ms."); + dumpAllThreads(getName()); + if (System.getProperty("org.apache.activemq.AutoFailTestSupport.disableSystemExit") == null) { + System.exit(EXIT_ERROR); + } else { + LOG.error("No system.exit as it kills surefire - forkedProcessTimeoutInSeconds (surefire.timeout) will kick in eventually see pom.xml surefire plugin config"); + } + } + } + } + }, "AutoFailThread"); + + if (verbose) { + LOG.info("Starting auto fail thread..."); + } + + LOG.info("Starting auto fail thread..."); + autoFailThread.start(); + } + + /** + * Manually stops the auto fail thread. As a rule, this method is used only + * when you are not sure, if the setUp and tearDown method is propagated + * correctly. + */ + public void stopAutoFailThread() { + if (isAutoFail() && autoFailThread != null && autoFailThread.isAlive()) { + isTestSuccess.set(true); + + if (verbose) { + LOG.info("Stopping auto fail thread..."); + } + + LOG.info("Stopping auto fail thread..."); + autoFailThread.interrupt(); + } + } + + /** + * Sets the auto fail value. As a rule, this should be used only before any + * setup methods is called to automatically enable the auto fail thread in + * the setup method of the test case. + * + * @param val + */ + public void setAutoFail(boolean val) { + this.useAutoFail = val; + } + + public boolean isAutoFail() { + return this.useAutoFail; + } + + /** + * The assigned value will only be reflected when the auto fail thread has + * started its run. Value is in milliseconds. + * + * @param val + */ + public void setMaxTestTime(long val) { + this.maxTestTime = val; + } + + public long getMaxTestTime() { + return this.maxTestTime; + } + + + public static void dumpAllThreads(String prefix) { + Map stacks = Thread.getAllStackTraces(); + for (Entry stackEntry : stacks.entrySet()) { + System.err.println(prefix + " " + stackEntry.getKey()); + for(StackTraceElement element : stackEntry.getValue()) { + System.err.println(" " + element); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java index d7caafa..dc8f138 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java @@ -116,7 +116,7 @@ public abstract class CombinationTestSupport extends AutoFailTestSupport { public static void checkStopped() throws Exception { ArtemisBrokerHelper.stopArtemisBroker(); boolean notStopped = BrokerService.checkStopped(); - TcpTransportFactory.setBrokerName(null); + TcpTransportFactory.clearService(); if (notStopped) { fail("brokers not stopped see exceptions above"); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java index 5e5b993..b8397e2 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java @@ -52,22 +52,50 @@ public class ConnectionCleanupTest extends TestCase { try { connection.setClientID("test"); - // fail("Should have received JMSException"); + fail("Should have received JMSException"); } catch (JMSException e) { } - connection.cleanup(); + connection.doCleanup(true); connection.setClientID("test"); connection.createSession(false, Session.AUTO_ACKNOWLEDGE); try { connection.setClientID("test"); - // fail("Should have received JMSException"); + fail("Should have received JMSException"); } catch (JMSException e) { } } + public void testChangeClientIDDenied() throws JMSException { + + connection.setClientID("test"); + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try { + connection.setClientID("test"); + fail("Should have received JMSException"); + } catch (JMSException e) { + } + + connection.cleanup(); + + try { + connection.setClientID("test"); + fail("Should have received JMSException"); + } catch (JMSException e) { + } + + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try { + connection.setClientID("test"); + fail("Should have received JMSException"); + } catch (JMSException e) { + } + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java index fa58ebe..1e6a227 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java @@ -16,15 +16,23 @@ */ package org.apache.activemq; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.junit.rules.TemporaryFolder; import org.springframework.jms.core.JmsTemplate; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; +import java.io.File; /** * A useful base class which creates and closes an embedded broker @@ -32,17 +40,27 @@ import javax.jms.Destination; public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { protected BrokerService broker; - // protected String bindAddress = "tcp://localhost:61616"; - protected String bindAddress = "vm://localhost"; + protected EmbeddedJMS artemisBroker; + protected String bindAddress = "tcp://localhost:61616"; protected ConnectionFactory connectionFactory; protected boolean useTopic; protected ActiveMQDestination destination; protected JmsTemplate template; + protected boolean disableWrapper = false; + + public TemporaryFolder temporaryFolder; + + public String CLUSTER_PASSWORD = "OPENWIRECLUSTER"; - @Override protected void setUp() throws Exception { - if (broker == null) { - broker = createBroker(); + BrokerService.disableWrapper = disableWrapper; + File tmpRoot = new File("./target/tmp"); + tmpRoot.mkdirs(); + temporaryFolder = new TemporaryFolder(tmpRoot); + temporaryFolder.create(); + + if (artemisBroker == null) { + artemisBroker = createArtemisBroker(); } startBroker(); @@ -58,13 +76,43 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { @Override protected void tearDown() throws Exception { - if (broker != null) { + if (artemisBroker != null) { try { - broker.stop(); + artemisBroker.stop(); + artemisBroker = null; } catch (Exception e) { } } + temporaryFolder.delete(); + } + + public String getTmp() { + return getTmpFile().getAbsolutePath(); + } + + public File getTmpFile() { + return temporaryFolder.getRoot(); + } + + protected String getJournalDir(int serverID, boolean backup) { + return getTmp() + "/journal_" + serverID + "_" + backup; + } + + protected String getBindingsDir(int serverID, boolean backup) { + return getTmp() + "/binding_" + serverID + "_" + backup; + } + + protected String getPageDir(int serverID, boolean backup) { + return getTmp() + "/paging_" + serverID + "_" + backup; + } + + protected String getLargeMessagesDir(int serverID, boolean backup) { + return getTmp() + "/paging_" + serverID + "_" + backup; + } + + protected static String newURI(String localhostAddress, int serverID) { + return "tcp://" + localhostAddress + ":" + (61616 + serverID); } /** @@ -114,20 +162,44 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { return new ActiveMQConnectionFactory(bindAddress); } - /** - * Factory method to create a new broker - * - * @throws Exception - */ + + public EmbeddedJMS createArtemisBroker() throws Exception { + Configuration config0 = createConfig("localhost", 0); + EmbeddedJMS newbroker = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + return newbroker; + } + + protected Configuration createConfig(final String hostAddress, final int serverID) throws Exception { + ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false). + setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO). + setJournalDirectory(getJournalDir(serverID, false)). + setBindingsDirectory(getBindingsDir(serverID, false)). + setPagingDirectory(getPageDir(serverID, false)). + setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)). + setJournalCompactMinFiles(0). + setJournalCompactPercentage(0). + setClusterPassword(CLUSTER_PASSWORD); + + configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true)); + + configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID)); + configuration.addConnectorConfiguration("netty-connector", newURI(hostAddress, serverID)); + + return configuration; + } + + //we keep this because some other tests uses it. + //we'll delete this when those tests are dealt with. protected BrokerService createBroker() throws Exception { BrokerService answer = new BrokerService(); answer.setPersistent(isPersistent()); + answer.getManagementContext().setCreateConnector(false); answer.addConnector(bindAddress); return answer; } protected void startBroker() throws Exception { - broker.start(); + artemisBroker.start(); } /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java index 06fbbbe..1e0555c 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java @@ -98,6 +98,7 @@ public class ExclusiveConsumerStartupDestinationTest extends EmbeddedBrokerTestS } } + //Exclusive consumer not implemented yet. public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException, InterruptedException { Connection conn = createConnection(true); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java index 0287a77..5bc647e 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java @@ -26,7 +26,9 @@ import javax.jms.Session; import junit.framework.TestCase; +import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.transport.tcp.TcpTransportFactory; public class ExclusiveConsumerTest extends TestCase { @@ -43,6 +45,7 @@ public class ExclusiveConsumerTest extends TestCase { @Override protected void tearDown() throws Exception { + TcpTransportFactory.clearService(); super.tearDown(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java index 6bf47f6..6274890 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -35,11 +35,18 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; import javax.management.ObjectName; import junit.framework.Test; -import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; +import org.apache.activemq.artemis.api.jms.management.DestinationControl; +import org.apache.activemq.artemis.api.jms.management.JMSQueueControl; +import org.apache.activemq.artemis.api.jms.management.JMSServerControl; +import org.apache.activemq.artemis.api.jms.management.TopicControl; +import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.slf4j.Logger; @@ -855,7 +862,7 @@ public class JMSConsumerTest extends JmsTestSupport { } public void initCombosForTestAckOfExpired() { - addCombinationValues("destinationType", new Object[]{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)}); + addCombinationValues("destinationType", new Object[]{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)}); } public void testAckOfExpired() throws Exception { @@ -867,6 +874,7 @@ public class JMSConsumerTest extends JmsTestSupport { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = (ActiveMQDestination) (destinationType == ActiveMQDestination.QUEUE_TYPE ? session.createQueue("test") : session.createTopic("test")); + createManagedDestinationOnServer(destination); MessageConsumer consumer = session.createConsumer(destination); connection.setStatsEnabled(true); @@ -900,25 +908,43 @@ public class JMSConsumerTest extends JmsTestSupport { } assertEquals("consumer has expiredMessages", count, amqConsumer.getConsumerStats().getExpiredMessageCount().getCount()); - DestinationViewMBean view = createView(destination); + DestinationControl view = createView(destination); + + assertEquals("Wrong inFlightCount: " + view.getDeliveringCount(), 0, view.getDeliveringCount()); + assertEquals("Wrong dispatch count: " + view.getMessagesAdded(), 8, view.getMessagesAdded()); + } - assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount()); - assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, view.getDispatchCount()); - assertEquals("Wrong dequeue count: " + view.getDequeueCount(), 8, view.getDequeueCount()); - assertEquals("Wrong expired count: " + view.getExpiredCount(), 4, view.getExpiredCount()); + private void createManagedDestinationOnServer(ActiveMQDestination destination) throws Exception { + String destName = destination.getPhysicalName(); + ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker(); + MBeanServer beanServer = wrapper.getMbeanServer(); + ObjectName objName = ObjectNameBuilder.DEFAULT.getJMSServerObjectName(); + JMSServerControl serverControl = MBeanServerInvocationHandler.newProxyInstance(beanServer, objName, JMSServerControl.class, false); + serverControl.createQueue(destName); } - protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { + protected DestinationControl createView(ActiveMQDestination destination) throws Exception { - String domain = "org.apache.activemq"; - ObjectName name; + String destName = destination.getPhysicalName(); if (destination.isQueue()) { - name = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test"); + return createJMSQueueControl(destName); } else { - name = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test"); + return createJMSTopicControl(destName); } - return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); } + private JMSQueueControl createJMSQueueControl(String destName) throws Exception { + ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker(); + MBeanServer beanServer = wrapper.getMbeanServer(); + ObjectName objName = ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(destName); + return MBeanServerInvocationHandler.newProxyInstance(beanServer, objName, JMSQueueControl.class, false); + } + + private TopicControl createJMSTopicControl(String destName) throws Exception { + ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker(); + MBeanServer beanServer = wrapper.getMbeanServer(); + ObjectName objName = ObjectNameBuilder.DEFAULT.getJMSTopicObjectName(destName); + return MBeanServerInvocationHandler.newProxyInstance(beanServer, objName, TopicControl.class, false); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java index bf1535a..309fec9 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java @@ -21,7 +21,7 @@ import javax.jms.DeliveryMode; import org.apache.activemq.test.JmsTopicSendReceiveTest; /** - * + * https://issues.apache.org/jira/browse/ARTEMIS-189 */ public class JmsDurableQueueWildcardSendReceiveTest extends JmsTopicSendReceiveTest { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java index 12e7827..6a3cd19 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java @@ -263,6 +263,7 @@ public class JmsQueueBrowserTest extends JmsTestSupport { consumer.close(); } + //ref: https://issues.apache.org/jira/browse/ARTEMIS-384 public void testBrowseReceive() throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ActiveMQQueue destination = new ActiveMQQueue("TEST"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java new file mode 100755 index 0000000..b7c2e94 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.ArrayList; +import java.util.Enumeration; + +import org.apache.activemq.test.JmsResourceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JmsQueueTransactionTest extends JmsTransactionTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(JmsQueueTransactionTest.class); + + /** + * @see org.apache.activemq.JmsTransactionTestSupport#getJmsResourceProvider() + */ + protected JmsResourceProvider getJmsResourceProvider() { + JmsResourceProvider p = new JmsResourceProvider(); + p.setTopic(false); + return p; + } + + /** + * Tests if the the connection gets reset, the messages will still be + * received. + * + * @throws Exception + */ + public void testReceiveTwoThenCloseConnection() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // lets consume any outstanding messages from previous test runs + beginTx(); + while (consumer.receive(1000) != null) { + } + commitTx(); + + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + commitTx(); + + LOG.info("Sent 0: " + outbound[0]); + LOG.info("Sent 1: " + outbound[1]); + + ArrayList messages = new ArrayList(); + beginTx(); + Message message = consumer.receive(2000); + assertEquals(outbound[0], message); + + message = consumer.receive(2000); + assertNotNull(message); + assertEquals(outbound[1], message); + + // Close and reopen connection. + reconnect(); + + // Consume again.. the previous message should + // get redelivered. + beginTx(); + message = consumer.receive(2000); + assertNotNull("Should have re-received the first message again!", message); + messages.add(message); + assertEquals(outbound[0], message); + + message = consumer.receive(5000); + assertNotNull("Should have re-received the second message again!", message); + messages.add(message); + assertEquals(outbound[1], message); + commitTx(); + + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + + assertTextMessagesEqual("Rollback did not work", outbound, inbound); + } + + /** + * Tests sending and receiving messages with two sessions(one for producing + * and another for consuming). + * + * @throws Exception + */ + public void testSendReceiveInSeperateSessionTest() throws Exception { + session.close(); + int batchCount = 10; + + for (int i = 0; i < batchCount; i++) { + // Session that sends messages + { + Session session = resourceProvider.createSession(connection); + this.session = session; + MessageProducer producer = resourceProvider.createProducer(session, destination); + // consumer = resourceProvider.createConsumer(session, + // destination); + beginTx(); + producer.send(session.createTextMessage("Test Message: " + i)); + commitTx(); + session.close(); + } + + // Session that consumes messages + { + Session session = resourceProvider.createSession(connection); + this.session = session; + MessageConsumer consumer = resourceProvider.createConsumer(session, destination); + + beginTx(); + TextMessage message = (TextMessage)consumer.receive(1000 * 5); + assertNotNull("Received only " + i + " messages in batch ", message); + assertEquals("Test Message: " + i, message.getText()); + + commitTx(); + session.close(); + } + } + } + + /** + * Tests the queue browser. Browses the messages then the consumer tries to + * receive them. The messages should still be in the queue even when it was + * browsed. + * + * @throws Exception + */ + public void testReceiveBrowseReceive() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")}; + + // lets consume any outstanding messages from previous test runs + beginTx(); + while (consumer.receive(1000) != null) { + } + commitTx(); + + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + producer.send(outbound[2]); + commitTx(); + + // Get the first. + beginTx(); + assertEquals(outbound[0], consumer.receive(1000)); + consumer.close(); + commitTx(); + + beginTx(); + QueueBrowser browser = session.createBrowser((Queue)destination); + Enumeration enumeration = browser.getEnumeration(); + + // browse the second + assertTrue("should have received the second message", enumeration.hasMoreElements()); + assertEquals(outbound[1], (Message)enumeration.nextElement()); + + // browse the third. + assertTrue("Should have received the third message", enumeration.hasMoreElements()); + assertEquals(outbound[2], (Message)enumeration.nextElement()); + + LOG.info("Check for more..."); + // There should be no more. + boolean tooMany = false; + while (enumeration.hasMoreElements()) { + LOG.info("Got extra message: " + ((TextMessage)enumeration.nextElement()).getText()); + tooMany = true; + } + assertFalse(tooMany); + LOG.info("close browser..."); + browser.close(); + + LOG.info("reopen and consume..."); + // Re-open the consumer. + consumer = resourceProvider.createConsumer(session, destination); + // Receive the second. + assertEquals(outbound[1], consumer.receive(1000)); + // Receive the third. + assertEquals(outbound[2], consumer.receive(1000)); + consumer.close(); + + commitTx(); + } + + public void testCloseConsumer() throws Exception { + Destination dest = session.createQueue(getSubject() + "?consumer.prefetchSize=0"); + producer = session.createProducer(dest); + beginTx(); + producer.send(session.createTextMessage("message 1")); + producer.send(session.createTextMessage("message 2")); + commitTx(); + + beginTx(); + consumer = session.createConsumer(dest); + Message message1 = consumer.receive(1000); + String text1 = ((TextMessage)message1).getText(); + assertNotNull(message1); + assertEquals("message 1", text1); + + consumer.close(); + + consumer = session.createConsumer(dest); + + Message message2 = consumer.receive(1000); + String text2 = ((TextMessage)message2).getText(); + assertNotNull(message2); + assertEquals("message 2", text2); + commitTx(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java index 296a56e..9a6dcb1 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java @@ -29,7 +29,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.test.JmsTopicSendReceiveTest; /** - * + * https://issues.apache.org/jira/browse/ARTEMIS-189 */ public class JmsQueueWildcardSendReceiveTest extends JmsTopicSendReceiveTest { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java index 8a64a85..c57845d 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java @@ -116,7 +116,7 @@ public class JmsRollbackRedeliveryTest { Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(destinationName); MessageConsumer consumer = session.createConsumer(destination); - TextMessage msg = (TextMessage) consumer.receive(6000000); + TextMessage msg = (TextMessage) consumer.receive(5000); if (msg != null) { if (rolledback.put(msg.getText(), Boolean.TRUE) != null) { LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java index b5dcacc..216ed10 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq; +import org.apache.activemq.transport.tcp.TcpTransportFactory; + import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -109,5 +111,6 @@ public class JmsTopicSendReceiveWithTwoConnectionsTest extends JmsSendReceiveTes receiveSession.close(); sendConnection.close(); receiveConnection.close(); + TcpTransportFactory.clearService(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java new file mode 100755 index 0000000..abaf52d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java @@ -0,0 +1,722 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.test.JmsResourceProvider; +import org.apache.activemq.test.TestSupport; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public abstract class JmsTransactionTestSupport extends TestSupport implements MessageListener { + + private static final Logger LOG = LoggerFactory.getLogger(JmsTransactionTestSupport.class); + private static final int MESSAGE_COUNT = 5; + private static final String MESSAGE_TEXT = "message"; + + protected ConnectionFactory connectionFactory; + protected Connection connection; + protected Session session; + protected MessageConsumer consumer; + protected MessageProducer producer; + protected JmsResourceProvider resourceProvider; + protected Destination destination; + protected int batchCount = 10; + protected int batchSize = 20; + protected BrokerService broker; + + // for message listener test + private final List unackMessages = new ArrayList(MESSAGE_COUNT); + private final List ackMessages = new ArrayList(MESSAGE_COUNT); + private boolean resendPhase; + + public JmsTransactionTestSupport() { + super(); + } + + public JmsTransactionTestSupport(String name) { + super(name); + } + + /* + * (non-Javadoc) + * + * @see junit.framework.TestCase#setUp() + */ + @Override + protected void setUp() throws Exception { + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + + resourceProvider = getJmsResourceProvider(); + topic = resourceProvider.isTopic(); + // We will be using transacted sessions. + setSessionTransacted(); + connectionFactory = newConnectionFactory(); + reconnect(); + } + + protected void setSessionTransacted() { + resourceProvider.setTransacted(true); + } + + protected ConnectionFactory newConnectionFactory() throws Exception { + return resourceProvider.createConnectionFactory(); + } + + protected void beginTx() throws Exception { + //no-op for local tx + } + + protected void commitTx() throws Exception { + session.commit(); + } + + protected void rollbackTx() throws Exception { + session.rollback(); + } + + /** + */ + protected BrokerService createBroker() throws Exception, URISyntaxException { + return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false")); + } + + /* + * (non-Javadoc) + * + * @see junit.framework.TestCase#tearDown() + */ + @Override + protected void tearDown() throws Exception { + LOG.info("Closing down connection"); + + try { + session.close(); + session = null; + connection.close(); + connection = null; + } catch (Exception e) { + LOG.info("Caught exception while closing resources."); + } + + try { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } catch (Exception e) { + LOG.info("Caught exception while shutting down the Broker", e); + } + + LOG.info("Connection closed."); + } + + protected abstract JmsResourceProvider getJmsResourceProvider(); + + /** + * Sends a batch of messages and validates that the messages are received. + * + * @throws Exception + */ + public void testSendReceiveTransactedBatches() throws Exception { + + TextMessage message = session.createTextMessage("Batch Message"); + for (int j = 0; j < batchCount; j++) { + LOG.info("Producing bacth " + j + " of " + batchSize + " messages"); + + beginTx(); + for (int i = 0; i < batchSize; i++) { + producer.send(message); + } + messageSent(); + commitTx(); + LOG.info("Consuming bacth " + j + " of " + batchSize + " messages"); + + beginTx(); + for (int i = 0; i < batchSize; i++) { + message = (TextMessage)consumer.receive(1000 * 5); + assertNotNull("Received only " + i + " messages in batch " + j, message); + assertEquals("Batch Message", message.getText()); + } + + commitTx(); + } + } + + protected void messageSent() throws Exception { + } + + /** + * Sends a batch of messages and validates that the rollbacked message was + * not consumed. + * + * @throws Exception + */ + public void testSendRollback() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // sends a message + beginTx(); + producer.send(outbound[0]); + commitTx(); + + // sends a message that gets rollbacked + beginTx(); + producer.send(session.createTextMessage("I'm going to get rolled back.")); + rollbackTx(); + + // sends a message + beginTx(); + producer.send(outbound[1]); + commitTx(); + + // receives the first message + beginTx(); + ArrayList messages = new ArrayList(); + LOG.info("About to consume message 1"); + Message message = consumer.receive(1000); + messages.add(message); + LOG.info("Received: " + message); + + // receives the second message + LOG.info("About to consume message 2"); + message = consumer.receive(4000); + messages.add(message); + LOG.info("Received: " + message); + + // validates that the rollbacked was not consumed + commitTx(); + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work.", outbound, inbound); + } + + /** + * spec section 3.6 acking a message with automation acks has no effect. + * @throws Exception + */ + public void testAckMessageInTx() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message")}; + + // sends a message + beginTx(); + producer.send(outbound[0]); + outbound[0].acknowledge(); + commitTx(); + outbound[0].acknowledge(); + + // receives the first message + beginTx(); + ArrayList messages = new ArrayList(); + LOG.info("About to consume message 1"); + Message message = consumer.receive(1000); + messages.add(message); + LOG.info("Received: " + message); + + // validates that the rollbacked was not consumed + commitTx(); + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Message not delivered.", outbound, inbound); + } + + /** + * Sends a batch of messages and validates that the message sent before + * session close is not consumed. + * + * This test only works with local transactions, not xa. + * @throws Exception + */ + public void testSendSessionClose() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // sends a message + beginTx(); + producer.send(outbound[0]); + commitTx(); + + // sends a message that gets rollbacked + beginTx(); + producer.send(session.createTextMessage("I'm going to get rolled back.")); + consumer.close(); + + reconnectSession(); + + // sends a message + producer.send(outbound[1]); + commitTx(); + + // receives the first message + ArrayList messages = new ArrayList(); + LOG.info("About to consume message 1"); + beginTx(); + Message message = consumer.receive(1000); + messages.add(message); + LOG.info("Received: " + message); + + // receives the second message + LOG.info("About to consume message 2"); + message = consumer.receive(4000); + messages.add(message); + LOG.info("Received: " + message); + + // validates that the rollbacked was not consumed + commitTx(); + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work.", outbound, inbound); + } + + /** + * Sends a batch of messages and validates that the message sent before + * session close is not consumed. + * + * @throws Exception + */ + public void testSendSessionAndConnectionClose() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // sends a message + beginTx(); + producer.send(outbound[0]); + commitTx(); + + // sends a message that gets rollbacked + beginTx(); + producer.send(session.createTextMessage("I'm going to get rolled back.")); + consumer.close(); + session.close(); + + reconnect(); + + // sends a message + beginTx(); + producer.send(outbound[1]); + commitTx(); + + // receives the first message + ArrayList messages = new ArrayList(); + LOG.info("About to consume message 1"); + beginTx(); + Message message = consumer.receive(1000); + messages.add(message); + LOG.info("Received: " + message); + + // receives the second message + LOG.info("About to consume message 2"); + message = consumer.receive(4000); + messages.add(message); + LOG.info("Received: " + message); + + // validates that the rollbacked was not consumed + commitTx(); + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work.", outbound, inbound); + } + + /** + * Sends a batch of messages and validates that the rollbacked message was + * redelivered. + * + * @throws Exception + */ + public void testReceiveRollback() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // lets consume any outstanding messages from prev test runs + beginTx(); + while (consumer.receive(1000) != null) { + } + commitTx(); + + // sent both messages + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + commitTx(); + + LOG.info("Sent 0: " + outbound[0]); + LOG.info("Sent 1: " + outbound[1]); + + ArrayList messages = new ArrayList(); + beginTx(); + Message message = consumer.receive(1000); + messages.add(message); + assertEquals(outbound[0], message); + commitTx(); + + // rollback so we can get that last message again. + beginTx(); + message = consumer.receive(1000); + assertNotNull(message); + assertEquals(outbound[1], message); + rollbackTx(); + + // Consume again.. the prev message should + // get redelivered. + beginTx(); + message = consumer.receive(5000); + assertNotNull("Should have re-received the message again!", message); + messages.add(message); + commitTx(); + + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work", outbound, inbound); + } + + /** + * Sends a batch of messages and validates that the rollbacked message was + * redelivered. + * + * @throws Exception + */ + public void testReceiveTwoThenRollback() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // lets consume any outstanding messages from prev test runs + beginTx(); + while (consumer.receive(1000) != null) { + } + commitTx(); + + // + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + commitTx(); + + LOG.info("Sent 0: " + outbound[0]); + LOG.info("Sent 1: " + outbound[1]); + + ArrayList messages = new ArrayList(); + beginTx(); + Message message = consumer.receive(1000); + assertEquals(outbound[0], message); + + message = consumer.receive(1000); + assertNotNull(message); + assertEquals(outbound[1], message); + rollbackTx(); + + // Consume again.. the prev message should + // get redelivered. + beginTx(); + message = consumer.receive(5000); + assertNotNull("Should have re-received the first message again!", message); + messages.add(message); + assertEquals(outbound[0], message); + message = consumer.receive(5000); + assertNotNull("Should have re-received the second message again!", message); + messages.add(message); + assertEquals(outbound[1], message); + + assertNull(consumer.receiveNoWait()); + commitTx(); + + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work", outbound, inbound); + } + + /** + * Sends a batch of messages and validates that the rollbacked message was + * not consumed. + * + * @throws Exception + */ + public void testSendReceiveWithPrefetchOne() throws Exception { + setPrefetchToOne(); + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message"), + session.createTextMessage("Fourth Message")}; + + beginTx(); + for (int i = 0; i < outbound.length; i++) { + // sends a message + producer.send(outbound[i]); + } + commitTx(); + + // receives the first message + beginTx(); + for (int i = 0; i < outbound.length; i++) { + LOG.info("About to consume message 1"); + Message message = consumer.receive(1000); + assertNotNull(message); + LOG.info("Received: " + message); + } + + // validates that the rollbacked was not consumed + commitTx(); + } + + /** + * Perform the test that validates if the rollbacked message was redelivered + * multiple times. + * + * @throws Exception + */ + public void testReceiveTwoThenRollbackManyTimes() throws Exception { + for (int i = 0; i < 5; i++) { + testReceiveTwoThenRollback(); + } + } + + /** + * Sends a batch of messages and validates that the rollbacked message was + * not consumed. This test differs by setting the message prefetch to one. + * + * @throws Exception + */ + public void testSendRollbackWithPrefetchOfOne() throws Exception { + setPrefetchToOne(); + testSendRollback(); + } + + /** + * Sends a batch of messages and and validates that the rollbacked message + * was redelivered. This test differs by setting the message prefetch to + * one. + * + * @throws Exception + */ + public void testReceiveRollbackWithPrefetchOfOne() throws Exception { + setPrefetchToOne(); + testReceiveRollback(); + } + + /** + * Tests if the messages can still be received if the consumer is closed + * (session is not closed). + * + * @throws Exception see http://jira.codehaus.org/browse/AMQ-143 + */ + public void testCloseConsumerBeforeCommit() throws Exception { + TextMessage[] outbound = new TextMessage[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // lets consume any outstanding messages from prev test runs + beginTx(); + while (consumer.receiveNoWait() != null) { + } + + commitTx(); + + // sends the messages + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + commitTx(); + LOG.info("Sent 0: " + outbound[0]); + LOG.info("Sent 1: " + outbound[1]); + + beginTx(); + TextMessage message = (TextMessage)consumer.receive(1000); + assertEquals(outbound[0].getText(), message.getText()); + // Close the consumer before the commit. This should not cause the + // received message + // to rollback. + consumer.close(); + commitTx(); + + // Create a new consumer + consumer = resourceProvider.createConsumer(session, destination); + LOG.info("Created consumer: " + consumer); + + beginTx(); + message = (TextMessage)consumer.receive(1000); + assertEquals(outbound[1].getText(), message.getText()); + commitTx(); + } + + public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception { + ArrayList list = new ArrayList(); + list.add("First"); + Message outbound = session.createObjectMessage(list); + outbound.setStringProperty("foo", "abc"); + + beginTx(); + producer.send(outbound); + commitTx(); + + LOG.info("About to consume message 1"); + beginTx(); + Message message = consumer.receive(5000); + + List body = assertReceivedObjectMessageWithListBody(message); + + // now lets try mutate it + try { + message.setStringProperty("foo", "def"); + fail("Cannot change properties of the object!"); + } catch (JMSException e) { + LOG.info("Caught expected exception: " + e, e); + } + body.clear(); + body.add("This should never be seen!"); + rollbackTx(); + + beginTx(); + message = consumer.receive(5000); + List secondBody = assertReceivedObjectMessageWithListBody(message); + assertNotSame("Second call should return a different body", secondBody, body); + commitTx(); + } + + @SuppressWarnings("unchecked") + protected List assertReceivedObjectMessageWithListBody(Message message) throws JMSException { + assertNotNull("Should have received a message!", message); + assertEquals("foo header", "abc", message.getStringProperty("foo")); + + assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage); + ObjectMessage objectMessage = (ObjectMessage)message; + List body = (List)objectMessage.getObject(); + LOG.info("Received body: " + body); + + assertEquals("Size of list should be 1", 1, body.size()); + assertEquals("element 0 of list", "First", body.get(0)); + return body; + } + + /** + * Recreates the connection. + * + * @throws javax.jms.JMSException + */ + protected void reconnect() throws Exception { + + if (connection != null) { + // Close the prev connection. + connection.close(); + } + session = null; + connection = resourceProvider.createConnection(connectionFactory); + reconnectSession(); + connection.start(); + } + + /** + * Recreates the connection. + * + * @throws javax.jms.JMSException + */ + protected void reconnectSession() throws JMSException { + if (session != null) { + session.close(); + } + + session = resourceProvider.createSession(connection); + destination = resourceProvider.createDestination(session, getSubject()); + producer = resourceProvider.createProducer(session, destination); + consumer = resourceProvider.createConsumer(session, destination); + } + + /** + * Sets the prefeftch policy to one. + */ + protected void setPrefetchToOne() { + ActiveMQPrefetchPolicy prefetchPolicy = getPrefetchPolicy(); + prefetchPolicy.setQueuePrefetch(1); + prefetchPolicy.setTopicPrefetch(1); + prefetchPolicy.setDurableTopicPrefetch(1); + prefetchPolicy.setOptimizeDurableTopicPrefetch(1); + } + + protected ActiveMQPrefetchPolicy getPrefetchPolicy() { + return ((ActiveMQConnection)connection).getPrefetchPolicy(); + } + + //This test won't work with xa tx so no beginTx() has been added. + public void testMessageListener() throws Exception { + // send messages + for (int i = 0; i < MESSAGE_COUNT; i++) { + producer.send(session.createTextMessage(MESSAGE_TEXT + i)); + } + commitTx(); + consumer.setMessageListener(this); + // wait receive + waitReceiveUnack(); + assertEquals(unackMessages.size(), MESSAGE_COUNT); + // resend phase + waitReceiveAck(); + assertEquals(ackMessages.size(), MESSAGE_COUNT); + // should no longer re-receive + consumer.setMessageListener(null); + assertNull(consumer.receive(500)); + reconnect(); + } + + @Override + public void onMessage(Message message) { + if (!resendPhase) { + unackMessages.add(message); + if (unackMessages.size() == MESSAGE_COUNT) { + try { + rollbackTx(); + resendPhase = true; + } catch (Exception e) { + e.printStackTrace(); + } + } + } else { + ackMessages.add(message); + if (ackMessages.size() == MESSAGE_COUNT) { + try { + commitTx(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + private void waitReceiveUnack() throws Exception { + for (int i = 0; i < 100 && !resendPhase; i++) { + Thread.sleep(100); + } + assertTrue(resendPhase); + } + + private void waitReceiveAck() throws Exception { + for (int i = 0; i < 100 && ackMessages.size() < MESSAGE_COUNT; i++) { + Thread.sleep(100); + } + assertFalse(ackMessages.size() < MESSAGE_COUNT); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java deleted file mode 100644 index 37899e8..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Destination; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author rnewson - */ -public final class LargeStreamletTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(LargeStreamletTest.class); - private static final String BROKER_URL = "vm://localhost?broker.persistent=false"; - private static final int BUFFER_SIZE = 1 * 1024; - private static final int MESSAGE_COUNT = 10 * 1024; - - protected Exception writerException; - protected Exception readerException; - - private final AtomicInteger totalRead = new AtomicInteger(); - private final AtomicInteger totalWritten = new AtomicInteger(); - private final AtomicBoolean stopThreads = new AtomicBoolean(false); - - public void testStreamlets() throws Exception { - final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL); - - final ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); - connection.start(); - try { - final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - try { - final Destination destination = session.createQueue("wibble"); - final Thread readerThread = new Thread(new Runnable() { - - @Override - public void run() { - totalRead.set(0); - try { - final InputStream inputStream = connection.createInputStream(destination); - try { - int read; - final byte[] buf = new byte[BUFFER_SIZE]; - while (!stopThreads.get() && (read = inputStream.read(buf)) != -1) { - totalRead.addAndGet(read); - } - } - finally { - inputStream.close(); - } - } - catch (Exception e) { - readerException = e; - e.printStackTrace(); - } - finally { - LOG.info(totalRead + " total bytes read."); - } - } - }); - - final Thread writerThread = new Thread(new Runnable() { - private final Random random = new Random(); - - @Override - public void run() { - totalWritten.set(0); - int count = MESSAGE_COUNT; - try { - final OutputStream outputStream = connection.createOutputStream(destination); - try { - final byte[] buf = new byte[BUFFER_SIZE]; - random.nextBytes(buf); - while (count > 0 && !stopThreads.get()) { - outputStream.write(buf); - totalWritten.addAndGet(buf.length); - count--; - } - } - finally { - outputStream.close(); - } - } - catch (Exception e) { - writerException = e; - e.printStackTrace(); - } - finally { - LOG.info(totalWritten + " total bytes written."); - } - } - }); - - readerThread.start(); - writerThread.start(); - - // Wait till reader is has finished receiving all the messages - // or he has stopped - // receiving messages. - Thread.sleep(1000); - int lastRead = totalRead.get(); - while (readerThread.isAlive()) { - readerThread.join(1000); - // No progress?? then stop waiting.. - if (lastRead == totalRead.get()) { - break; - } - lastRead = totalRead.get(); - } - - stopThreads.set(true); - - assertTrue("Should not have received a reader exception", readerException == null); - assertTrue("Should not have received a writer exception", writerException == null); - - assertEquals("Not all messages accounted for", totalWritten.get(), totalRead.get()); - - } - finally { - session.close(); - } - } - finally { - connection.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java index 0358323..4ae2feb 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java @@ -26,8 +26,11 @@ import javax.jms.Session; import junit.framework.TestCase; +import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +//https://issues.apache.org/jira/browse/ARTEMIS-196 public class QueueConsumerPriorityTest extends TestCase { private static final String VM_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=true"; @@ -43,6 +46,7 @@ public class QueueConsumerPriorityTest extends TestCase { @Override protected void tearDown() throws Exception { + TcpTransportFactory.clearService(); super.tearDown(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java index c6f60f8..d737f2c 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java @@ -96,12 +96,13 @@ public class ReconnectWithSameClientIDTest extends EmbeddedBrokerTestSupport { @Override protected ConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory((useFailover ? "failover:" : "") + broker.getTransportConnectors().get(0).getPublishableConnectString()); + return new ActiveMQConnectionFactory((useFailover ? "failover:" : "") + newURI("localhost", 0)); } @Override protected void setUp() throws Exception { bindAddress = "tcp://localhost:0"; + disableWrapper = true; super.setUp(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java index 894abe3..ff1c6c6 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java @@ -55,6 +55,7 @@ public class RemoveDestinationTest { @Before public void setUp() throws Exception { + BrokerService.disableWrapper = true; broker = BrokerFactory.createBroker(new URI(BROKER_URL)); broker.start(); broker.waitUntilStarted(); @@ -62,6 +63,7 @@ public class RemoveDestinationTest { @After public void tearDown() throws Exception { + BrokerService.disableWrapper = false; broker.stop(); broker.waitUntilStopped(); broker = null; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java index 87c5fc9..d423442 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java @@ -26,6 +26,7 @@ import javax.jms.Session; import junit.framework.TestCase; +import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; @@ -34,6 +35,16 @@ import org.apache.activemq.broker.view.ConnectionDotFilePlugin; public class TimeStampTest extends TestCase { + @Override + public void setUp() throws Exception { + BrokerService.disableWrapper = true; + } + @Override + public void tearDown() { + ArtemisBrokerHelper.stopArtemisBroker(); + BrokerService.disableWrapper = false; + } + public void test() throws Exception { BrokerService broker = new BrokerService(); broker.setPersistent(false); @@ -91,6 +102,5 @@ public class TimeStampTest extends TestCase { consumer.close(); session.close(); connection.close(); - broker.stop(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java index 8d239e7..d2e917d 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java @@ -23,9 +23,14 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.jms.TransactionRolledBackException; +import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.transaction.Synchronization; +import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; public class TransactionContextTest { @@ -36,13 +41,22 @@ public class TransactionContextTest { @Before public void setup() throws Exception { - connection = factory.createActiveMQConnection(); - underTest = new TransactionContext(connection); + try { + connection = factory.createActiveMQConnection(); + underTest = new TransactionContext(connection); + } + catch (Exception e) { + e.printStackTrace(); + throw e; + } } @After public void tearDown() throws Exception { - connection.close(); + if (connection != null) { + connection.close(); + } + TcpTransportFactory.clearService(); } @Test @@ -104,6 +118,7 @@ public class TransactionContextTest { @Test public void testSyncIndexCleared() throws Exception { + System.out.println("================================= test testSyncIndexCleared ==========="); final AtomicInteger beforeEndCountA = new AtomicInteger(0); final AtomicInteger rollbackCountA = new AtomicInteger(0); Synchronization sync = new Synchronization() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java index d207da7..a9a564b 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java @@ -26,10 +26,11 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ConsumerControl; @@ -349,8 +350,10 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport { assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize()); // verify sub view broker - Subscription sub = broker.getRegionBroker().getDestinationMap().get(ActiveMQDestination.transform(brokerZeroQueue)).getConsumers().get(0); - assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize()); + // I comment out this because it checks broker internal + // which doesn't apply to artemis broker. + //Subscription sub = broker.getRegionBroker().getDestinationMap().get(ActiveMQDestination.transform(brokerZeroQueue)).getConsumers().get(0); + //assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize()); // manipulate Prefetch (like failover and stomp) ConsumerControl consumerControl = new ConsumerControl(); @@ -361,22 +364,22 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport { Object reply = ((ActiveMQConnection) connection).getTransport().request(consumerControl); assertTrue("good request", !(reply instanceof ExceptionResponse)); assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize()); - assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize()); } @Override - protected BrokerService createBroker() throws Exception { - BrokerService brokerService = super.createBroker(); - PolicyMap policyMap = new PolicyMap(); - PolicyEntry zeroPrefetchPolicy = new PolicyEntry(); - zeroPrefetchPolicy.setQueuePrefetch(0); - policyMap.put(ActiveMQDestination.transform(brokerZeroQueue), zeroPrefetchPolicy); - brokerService.setDestinationPolicy(policyMap); - return brokerService; + public EmbeddedJMS createArtemisBroker() throws Exception { + Configuration config0 = createConfig("localhost", 0); + String coreQueueAddress = "jms.queue." + brokerZeroQueue.getQueueName(); + AddressSettings addrSettings = new AddressSettings(); + addrSettings.setQueuePrefetch(0); + config0.getAddressesSettings().put(coreQueueAddress, addrSettings); + EmbeddedJMS newbroker = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + return newbroker; } @Override protected void setUp() throws Exception { + disableWrapper = true; bindAddress = "tcp://localhost:0"; super.setUp(); @@ -388,11 +391,12 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport { @Override protected void startBroker() throws Exception { super.startBroker(); - bindAddress = broker.getTransportConnectors().get(0).getConnectUri().toString(); + bindAddress = newURI("localhost", 0); } @Override protected void tearDown() throws Exception { + BrokerService.disableWrapper = false; connection.close(); super.tearDown(); }