Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2DE5A195C1 for ; Wed, 16 Mar 2016 01:53:26 +0000 (UTC) Received: (qmail 65008 invoked by uid 500); 16 Mar 2016 01:53:26 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 64876 invoked by uid 500); 16 Mar 2016 01:53:26 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 64758 invoked by uid 99); 16 Mar 2016 01:53:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Mar 2016 01:53:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BB0D0DFFB9; Wed, 16 Mar 2016 01:53:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 16 Mar 2016 01:53:28 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [04/60] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java index 2779f52..75c27d7 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java @@ -22,6 +22,8 @@ import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Destination; import javax.jms.JMSException; @@ -37,97 +39,105 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQMessageConsumer; import org.apache.activemq.ActiveMQMessageTransformation; import org.apache.activemq.ActiveMQSession; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerPluginSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.SessionId; import org.apache.activemq.util.Wait; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Assert; +import org.junit.Before; +import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.junit.After; import org.junit.Test; // see https://issues.apache.org/activemq/browse/AMQ-2573 -public class FailoverConsumerUnconsumedTest { - +@RunWith(BMUnitRunner.class) +public class FailoverConsumerUnconsumedTest extends OpenwireArtemisBaseTest { private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerUnconsumedTest.class); private static final String QUEUE_NAME = "FailoverWithUnconsumed"; - private static final String TRANSPORT_URI = "tcp://localhost:0"; - private String url; + private static final AtomicBoolean doByteman = new AtomicBoolean(false); + + private static int maxConsumers = 2; + private static AtomicInteger consumerCount = new AtomicInteger(0); + private static CountDownLatch brokerStopLatch = new CountDownLatch(1); + private static AtomicBoolean watchTopicAdvisories = new AtomicBoolean(false); + + private String url = newURI(0); final int prefetch = 10; - BrokerService broker; + private static EmbeddedJMS broker; @After - public void stopBroker() throws Exception { + public void tearDown() throws Exception { if (broker != null) { broker.stop(); + broker = null; } } - public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = createBroker(deleteAllMessagesOnStartup); - broker.start(); - } - - public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { - return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI); - } - - public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { - broker = new BrokerService(); - broker.addConnector(bindAddress); - broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); - - this.url = broker.getTransportConnectors().get(0).getConnectUri().toString(); - - return broker; + @Before + public void setUp() throws Exception { + consumerCount.set(0); } @Test + @BMRules( + rules = { + @BMRule( + name = "set no return response and stop the broker", + targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection", + targetMethod = "processAddConsumer", + targetLocation = "ENTRY", + binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()", + action = "org.apache.activemq.transport.failover.FailoverConsumerUnconsumedTest.holdResponseAndStopBroker2(context)") + } + ) public void testFailoverConsumerDups() throws Exception { + watchTopicAdvisories.set(true); doTestFailoverConsumerDups(true); } @Test + @BMRules( + rules = { + @BMRule( + name = "set no return response and stop the broker", + targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection", + targetMethod = "processAddConsumer", + targetLocation = "ENTRY", + binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()", + action = "org.apache.activemq.transport.failover.FailoverConsumerUnconsumedTest.holdResponseAndStopBroker2(context)") + } + ) public void testFailoverConsumerDupsNoAdvisoryWatch() throws Exception { + watchTopicAdvisories.set(false); doTestFailoverConsumerDups(false); } @SuppressWarnings("unchecked") @Test + @BMRules( + rules = { + @BMRule( + name = "set no return response and stop the broker", + targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection", + targetMethod = "processAddConsumer", + targetLocation = "ENTRY", + binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()", + action = "org.apache.activemq.transport.failover.FailoverConsumerUnconsumedTest.holdResponseAndStopBroker(context)") + } + ) public void testFailoverClientAckMissingRedelivery() throws Exception { - - final int maxConsumers = 2; - broker = createBroker(true); - - broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { - int consumerCount; - - // broker is killed on x create consumer - @Override - public Subscription addConsumer(ConnectionContext context, final ConsumerInfo info) throws Exception { - if (++consumerCount == maxConsumers) { - context.setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override - public void run() { - LOG.info("Stopping broker on consumer: " + info.getConsumerId()); - try { - broker.stop(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - return super.addConsumer(context, info); - } - }}); + maxConsumers = 2; + brokerStopLatch = new CountDownLatch(1); + broker = createBroker(); broker.start(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); @@ -139,7 +149,9 @@ public class FailoverConsumerUnconsumedTest { final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); final Queue destination = consumerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch=" + prefetch); - final Vector testConsumers = new Vector<>(); + doByteman.set(true); + + final Vector testConsumers = new Vector(); TestConsumer testConsumer = new TestConsumer(consumerSession, destination, connection); testConsumer.setMessageListener(new MessageListener() { @Override @@ -157,7 +169,6 @@ public class FailoverConsumerUnconsumedTest { produceMessage(consumerSession, destination, maxConsumers * prefetch); assertTrue("add messages are delivered", Wait.waitFor(new Wait.Condition() { - @Override public boolean isSatisified() throws Exception { int totalDelivered = 0; for (TestConsumer testConsumer : testConsumers) { @@ -172,7 +183,6 @@ public class FailoverConsumerUnconsumedTest { final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1); Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override public void run() { try { LOG.info("add last consumer..."); @@ -198,17 +208,16 @@ public class FailoverConsumerUnconsumedTest { } }); - // will be stopped by the plugin - broker.waitUntilStopped(); + brokerStopLatch.await(); + doByteman.set(false); - broker = createBroker(false, this.url); + broker = createBroker(); broker.start(); assertTrue("consumer added through failover", shutdownConsumerAdded.await(30, TimeUnit.SECONDS)); // each should again get prefetch messages - all unacked deliveries should be rolledback assertTrue("after restart all messages are re dispatched", Wait.waitFor(new Wait.Condition() { - @Override public boolean isSatisified() throws Exception { int totalDelivered = 0; for (TestConsumer testConsumer : testConsumers) { @@ -220,55 +229,19 @@ public class FailoverConsumerUnconsumedTest { } })); - assertTrue("after restart each got prefetch amount", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - for (TestConsumer testConsumer : testConsumers) { - long delivered = testConsumer.deliveredSize(); - LOG.info(testConsumer.getConsumerId() + " delivered: " + delivered); - if (delivered != prefetch) { - return false; - } - } - return true; - } - })); - connection.close(); } @SuppressWarnings("unchecked") public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { - final int maxConsumers = 4; - broker = createBroker(true); - - broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { - int consumerCount; - - // broker is killed on x create consumer - @Override - public Subscription addConsumer(ConnectionContext context, final ConsumerInfo info) throws Exception { - if (++consumerCount == maxConsumers + (watchTopicAdvisories ? 1 : 0)) { - context.setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override - public void run() { - LOG.info("Stopping broker on consumer: " + info.getConsumerId()); - try { - broker.stop(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - return super.addConsumer(context, info); - } - }}); + maxConsumers = 4; + broker = createBroker(); broker.start(); + brokerStopLatch = new CountDownLatch(1); + doByteman.set(true); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); cf.setWatchTopicAdvisories(watchTopicAdvisories); @@ -283,10 +256,11 @@ public class FailoverConsumerUnconsumedTest { testConsumers.add(new TestConsumer(consumerSession, destination, connection)); } + assureQueueMessages(0, new SimpleString("jms.queue." + QUEUE_NAME)); + produceMessage(consumerSession, destination, maxConsumers * prefetch); assertTrue("add messages are dispatched", Wait.waitFor(new Wait.Condition() { - @Override public boolean isSatisified() throws Exception { int totalUnconsumed = 0; for (TestConsumer testConsumer : testConsumers) { @@ -301,7 +275,6 @@ public class FailoverConsumerUnconsumedTest { final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1); Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override public void run() { try { LOG.info("add last consumer..."); @@ -315,12 +288,8 @@ public class FailoverConsumerUnconsumedTest { } }); - // will be stopped by the plugin - broker.waitUntilStopped(); - // verify interrupt assertTrue("add messages dispatched and unconsumed are cleaned up", Wait.waitFor(new Wait.Condition() { - @Override public boolean isSatisified() throws Exception { int totalUnconsumed = 0; for (TestConsumer testConsumer : testConsumers) { @@ -332,14 +301,16 @@ public class FailoverConsumerUnconsumedTest { } })); - broker = createBroker(false, this.url); + brokerStopLatch.await(); + doByteman.set(false); + + broker = createBroker(); broker.start(); assertTrue("consumer added through failover", shutdownConsumerAdded.await(30, TimeUnit.SECONDS)); // each should again get prefetch messages - all unconsumed deliveries should be rolledback assertTrue("after start all messages are re dispatched", Wait.waitFor(new Wait.Condition() { - @Override public boolean isSatisified() throws Exception { int totalUnconsumed = 0; for (TestConsumer testConsumer : testConsumers) { @@ -354,6 +325,11 @@ public class FailoverConsumerUnconsumedTest { connection.close(); } + private void assureQueueMessages(int num, SimpleString queueName) { + QueueImpl queue = (QueueImpl) broker.getActiveMQServer().getPostOffice().getBinding(queueName).getBindable(); + Assert.assertEquals(num, queue.getMessageCount()); + } + private void produceMessage(final Session producerSession, Queue destination, long count) throws JMSException { MessageProducer producer = producerSession.createProducer(destination); for (int i = 0; i < count; i++) { @@ -385,4 +361,44 @@ public class FailoverConsumerUnconsumedTest { idGen -= 5; return idGen; } + + public static void holdResponseAndStopBroker(AMQConnectionContext context) { + if (doByteman.get()) { + if (consumerCount.incrementAndGet() == maxConsumers) { + context.setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + try { + broker.stop(); + brokerStopLatch.countDown(); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + } + } + + public static void holdResponseAndStopBroker2(AMQConnectionContext context) { + if (doByteman.get()) { + if (consumerCount.incrementAndGet() == maxConsumers + (watchTopicAdvisories.get() ? 1 : 0)) { + context.setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + try { + broker.stop(); + Assert.assertEquals(1, brokerStopLatch.getCount()); + brokerStopLatch.countDown(); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + } + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java index cb15940..e801b3c 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java @@ -33,25 +33,35 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.TestSupport; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerPluginSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ProducerBrokerExchange; -import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.apache.activemq.util.Wait; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FailoverDuplicateTest extends TestSupport { +@RunWith(BMUnitRunner.class) +public class FailoverDuplicateTest extends OpenwireArtemisBaseTest { private static final Logger LOG = LoggerFactory.getLogger(FailoverDuplicateTest.class); private static final String QUEUE_NAME = "TestQueue"; - private static final String TRANSPORT_URI = "tcp://localhost:0"; - private String url; - BrokerService broker; - @Override + private static final AtomicBoolean doByteman = new AtomicBoolean(false); + private static final AtomicBoolean first = new AtomicBoolean(false); + private static final CountDownLatch gotMessageLatch = new CountDownLatch(1); + private static final CountDownLatch producersDone = new CountDownLatch(1); + + private String url = newURI(0); + EmbeddedJMS broker; + + @After public void tearDown() throws Exception { stopBroker(); } @@ -63,72 +73,38 @@ public class FailoverDuplicateTest extends TestSupport { } public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = createBroker(deleteAllMessagesOnStartup); + broker = createBroker(); broker.start(); } - public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { - broker = createBroker(deleteAllMessagesOnStartup, bindAddress); + public void startBroker() throws Exception { + broker = createBroker(); broker.start(); } - public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { - return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI); - } - - public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { - broker = new BrokerService(); - broker.setUseJmx(false); - broker.setAdvisorySupport(false); - broker.addConnector(bindAddress); - broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); - - url = broker.getTransportConnectors().get(0).getConnectUri().toString(); - - return broker; - } - public void configureConnectionFactory(ActiveMQConnectionFactory factory) { factory.setAuditMaximumProducerNumber(2048); factory.setOptimizeAcknowledge(true); } @SuppressWarnings("unchecked") + @Test + @BMRules( + rules = { + @BMRule( + name = "set no return response and stop the broker", + targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection", + targetMethod = "processMessage", + targetLocation = "EXIT", + binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()", + action = "org.apache.activemq.transport.failover.FailoverDuplicateTest.holdResponseAndStopConn(context)") + } + ) public void testFailoverSendReplyLost() throws Exception { - broker = createBroker(true); - setDefaultPersistenceAdapter(broker); - - final CountDownLatch gotMessageLatch = new CountDownLatch(1); - final CountDownLatch producersDone = new CountDownLatch(1); - final AtomicBoolean first = new AtomicBoolean(false); - broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { - @Override - public void send(final ProducerBrokerExchange producerExchange, - org.apache.activemq.command.Message messageSend) throws Exception { - // so send will hang as if reply is lost - super.send(producerExchange, messageSend); - if (first.compareAndSet(false, true)) { - producerExchange.getConnectionContext().setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override - public void run() { - try { - LOG.info("Waiting for recepit"); - assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS)); - assertTrue("new producers done on time", producersDone.await(120, TimeUnit.SECONDS)); - LOG.info("Stopping connection post send and receive and multiple producers"); - producerExchange.getConnectionContext().getConnection().stop(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - } - }}); + broker = createBroker(); broker.start(); + doByteman.set(true); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false"); configureConnectionFactory(cf); @@ -164,14 +140,14 @@ public class FailoverDuplicateTest extends TestSupport { } catch (JMSException e) { LOG.error("got send exception: ", e); - fail("got unexpected send exception" + e); + Assert.fail("got unexpected send exception" + e); } sendDoneLatch.countDown(); LOG.info("done async send"); } }); - assertTrue("one message got through on time", gotMessageLatch.await(20, TimeUnit.SECONDS)); + Assert.assertTrue("one message got through on time", gotMessageLatch.await(20, TimeUnit.SECONDS)); // send more messages, blow producer audit final int numProducers = 1050; final int numPerProducer = 2; @@ -186,7 +162,7 @@ public class FailoverDuplicateTest extends TestSupport { } } - assertTrue("message sent complete through failover", sendDoneLatch.await(30, TimeUnit.SECONDS)); + Assert.assertTrue("message sent complete through failover", sendDoneLatch.await(30, TimeUnit.SECONDS)); Wait.waitFor(new Wait.Condition() { @Override @@ -195,29 +171,16 @@ public class FailoverDuplicateTest extends TestSupport { return totalSent <= receivedCount.get(); } }); - assertEquals("we got all produced messages", totalSent, receivedCount.get()); + Assert.assertEquals("we got all produced messages", totalSent, receivedCount.get()); sendConnection.close(); receiveConnection.close(); - // verify stats - assertEquals("expect all messages are dequeued with one duplicate to dlq", totalSent + 2, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount()); - - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.info("dequeues : " + ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount()); - return totalSent + 1 <= ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount(); - } - }); - assertEquals("dequeue correct, including duplicate dispatch poisoned", totalSent + 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount()); - // ensure no dangling messages with fresh broker etc broker.stop(); - broker.waitUntilStopped(); + doByteman.set(false); LOG.info("Checking for remaining/hung messages with second restart.."); - broker = createBroker(false, url); - setDefaultPersistenceAdapter(broker); + broker = createBroker(); broker.start(); // after restart, ensure no dangling messages @@ -231,7 +194,7 @@ public class FailoverDuplicateTest extends TestSupport { if (msg == null) { msg = consumer.receive(5000); } - assertNull("no messges left dangling but got: " + msg, msg); + Assert.assertNull("no messges left dangling but got: " + msg, msg); sendConnection.close(); } @@ -247,4 +210,28 @@ public class FailoverDuplicateTest extends TestSupport { } producer.close(); } + + public static void holdResponseAndStopConn(final AMQConnectionContext context) { + if (doByteman.get()) { + if (first.compareAndSet(false, true)) { + context.setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + @Override + public void run() { + try { + LOG.info("Waiting for recepit"); + Assert.assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS)); + Assert.assertTrue("new producers done on time", producersDone.await(120, TimeUnit.SECONDS)); + LOG.info("Stopping connection post send and receive and multiple producers"); + context.getConnection().fail(null, "test Failoverduplicatetest"); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + } + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java index 57899ba..fcb60e5 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,10 +16,6 @@ */ package org.apache.activemq.transport.failover; -import java.util.Vector; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -27,31 +23,42 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerPluginSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.MessagePull; -import org.apache.activemq.command.Response; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; import org.junit.After; import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.junit.Assert.assertTrue; // see: https://issues.apache.org/activemq/browse/AMQ-2877 -public class FailoverPrefetchZeroTest { +@RunWith(BMUnitRunner.class) +public class FailoverPrefetchZeroTest extends OpenwireArtemisBaseTest { private static final Logger LOG = LoggerFactory.getLogger(FailoverPrefetchZeroTest.class); private static final String QUEUE_NAME = "FailoverPrefetchZero"; - private static final String TRANSPORT_URI = "tcp://localhost:0"; - private String url; + + private static final AtomicBoolean doByteman = new AtomicBoolean(false); + private static final CountDownLatch pullDone = new CountDownLatch(1); + private static CountDownLatch brokerStopLatch = new CountDownLatch(1); + + private String url = newURI(0); final int prefetch = 0; - BrokerService broker; + private static EmbeddedJMS broker; @After public void stopBroker() throws Exception { @@ -60,52 +67,25 @@ public class FailoverPrefetchZeroTest { } } - public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = createBroker(deleteAllMessagesOnStartup); + public void startBroker() throws Exception { + broker = createBroker(); broker.start(); } - public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { - return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI); - } - - public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { - broker = new BrokerService(); - broker.addConnector(bindAddress); - broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); - - url = broker.getTransportConnectors().get(0).getConnectUri().toString(); - - return broker; - } - @SuppressWarnings("unchecked") @Test + @BMRules( + rules = {@BMRule( + name = "set no return response and stop the broker", + targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection", + targetMethod = "processMessagePull", + targetLocation = "ENTRY", + binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()", + action = "org.apache.activemq.transport.failover.FailoverPrefetchZeroTest.holdResponseAndStopBroker(context)")}) public void testPrefetchZeroConsumerThroughRestart() throws Exception { - broker = createBroker(true); - - final CountDownLatch pullDone = new CountDownLatch(1); - broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { - @Override - public Response messagePull(ConnectionContext context, final MessagePull pull) throws Exception { - context.setDontSendReponse(true); - pullDone.countDown(); - Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override - public void run() { - LOG.info("Stopping broker on pull: " + pull); - try { - broker.stop(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - return null; - } - }}); + broker = createBroker(); broker.start(); + doByteman.set(true); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); cf.setWatchTopicAdvisories(false); @@ -122,7 +102,6 @@ public class FailoverPrefetchZeroTest { final CountDownLatch receiveDone = new CountDownLatch(1); final Vector received = new Vector<>(); Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override public void run() { try { LOG.info("receive one..."); @@ -141,8 +120,9 @@ public class FailoverPrefetchZeroTest { // will be stopped by the plugin assertTrue("pull completed on broker", pullDone.await(30, TimeUnit.SECONDS)); - broker.waitUntilStopped(); - broker = createBroker(false, url); + brokerStopLatch.await(); + doByteman.set(false); + broker = createBroker(); broker.start(); assertTrue("receive completed through failover", receiveDone.await(30, TimeUnit.SECONDS)); @@ -160,4 +140,25 @@ public class FailoverPrefetchZeroTest { } producer.close(); } + + public static void holdResponseAndStopBroker(final AMQConnectionContext context) { + if (doByteman.get()) { + context.setDontSendReponse(true); + pullDone.countDown(); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + try { + broker.stop(); + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + brokerStopLatch.countDown(); + } + } + }); + } + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java index b8860a7..6e559e7 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java @@ -16,58 +16,106 @@ */ package org.apache.activemq.transport.failover; +import java.util.ArrayList; import java.util.HashMap; - +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FailoverPriorityTest extends FailoverClusterTestSupport { +import javax.jms.Connection; +import javax.jms.JMSException; + +public class FailoverPriorityTest extends OpenwireArtemisBaseTest { protected final Logger LOG = LoggerFactory.getLogger(getClass()); private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616"; private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617"; private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618"; - private final HashMap urls = new HashMap<>(); + private final HashMap urls = new HashMap<>(); + + private final List connections = new ArrayList(); + private EmbeddedJMS[] servers = new EmbeddedJMS[3]; + private String clientUrl; - @Override + @Before public void setUp() throws Exception { - super.setUp(); - urls.put(BROKER_A_NAME, BROKER_A_CLIENT_TC_ADDRESS); - urls.put(BROKER_B_NAME, BROKER_B_CLIENT_TC_ADDRESS); + urls.put(0, BROKER_A_CLIENT_TC_ADDRESS); + urls.put(1, BROKER_B_CLIENT_TC_ADDRESS); } - private static final String BROKER_A_NAME = "BROKERA"; - private static final String BROKER_B_NAME = "BROKERB"; - private static final String BROKER_C_NAME = "BROKERC"; + @After + public void tearDown() throws Exception { + shutdownClients(); + for (EmbeddedJMS server : servers) { + if (server != null) { + server.stop(); + } + } + } + @Test public void testPriorityBackup() throws Exception { - createBrokerA(); - createBrokerB(); - getBroker(BROKER_B_NAME).waitUntilStarted(); + Configuration config0 = createConfig("127.0.0.1", 0); + Configuration config1 = createConfig("127.0.0.1", 1); + + deployClusterConfiguration(config0, 1); + deployClusterConfiguration(config1, 0); + + servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl()); + servers[0].start(); + servers[1].start(); + + Assert.assertTrue(servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + Assert.assertTrue(servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + Thread.sleep(1000); setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false"); createClients(5); - assertAllConnectedTo(urls.get(BROKER_A_NAME)); + assertAllConnectedTo(urls.get(0)); - restart(false, BROKER_A_NAME, BROKER_B_NAME); + restart(false, 0, 1); for (int i = 0; i < 3; i++) { - restart(true, BROKER_A_NAME, BROKER_B_NAME); + restart(true, 0, 1); } Thread.sleep(5000); - restart(false, BROKER_A_NAME, BROKER_B_NAME); + restart(false, 0, 1); } + @Test public void testPriorityBackupList() throws Exception { - createBrokerA(); - createBrokerB(); - getBroker(BROKER_B_NAME).waitUntilStarted(); + Configuration config0 = createConfig("127.0.0.1", 0); + Configuration config1 = createConfig("127.0.0.1", 1); + + deployClusterConfiguration(config0, 1); + deployClusterConfiguration(config1, 0); + + servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl()); + servers[0].start(); + servers[1].start(); + + Assert.assertTrue(servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + Assert.assertTrue(servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); Thread.sleep(1000); setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&priorityURIs=tcp://127.0.0.1:61617&initialReconnectDelay=1000&useExponentialBackOff=false"); @@ -75,154 +123,166 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport { Thread.sleep(3000); - assertAllConnectedTo(urls.get(BROKER_B_NAME)); + assertAllConnectedTo(urls.get(1)); - restart(false, BROKER_B_NAME, BROKER_A_NAME); + restart(false, 1, 0); for (int i = 0; i < 3; i++) { - restart(true, BROKER_B_NAME, BROKER_A_NAME); + restart(true, 1, 0); } - restart(false, BROKER_B_NAME, BROKER_A_NAME); - + restart(false, 1, 0); } + @Test public void testThreeBrokers() throws Exception { - // Broker A - addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME)); - addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, false); - addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); - addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); - getBroker(BROKER_A_NAME).start(); - - // Broker B - addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME)); - addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, false); - addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); - addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); - getBroker(BROKER_B_NAME).start(); - - // Broker C - addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME)); - addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS, false); - addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); - addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); - getBroker(BROKER_C_NAME).start(); - - getBroker(BROKER_C_NAME).waitUntilStarted(); + commonSetup(); Thread.sleep(1000); setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + "," + BROKER_C_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false"); createClients(5); - assertAllConnectedTo(urls.get(BROKER_A_NAME)); - - restart(true, BROKER_A_NAME, BROKER_B_NAME); + assertAllConnectedTo(urls.get(0)); + restart(true, 0, 1, 3); } + @Test public void testPriorityBackupAndUpdateClients() throws Exception { - // Broker A - addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME)); - addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, true); - addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); - getBroker(BROKER_A_NAME).start(); - - // Broker B - addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME)); - addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, true); - addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); - getBroker(BROKER_B_NAME).start(); - - getBroker(BROKER_B_NAME).waitUntilStarted(); + Configuration config0 = createConfig("127.0.0.1", 0); + Configuration config1 = createConfig("127.0.0.1", 1); + + deployClusterConfiguration(config0, 1); + deployClusterConfiguration(config1, 0); + + servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl()); + servers[0].start(); + servers[1].start(); + + Assert.assertTrue(servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + Assert.assertTrue(servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + Thread.sleep(1000); setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false"); - LOG.info("Client URI will be: " + getClientUrl()); - createClients(5); // Let's wait a little bit longer just in case it takes a while to realize that the // Broker A is the one with higher priority. Thread.sleep(5000); - assertAllConnectedTo(urls.get(BROKER_A_NAME)); + assertAllConnectedTo(urls.get(0)); } - private void restart(boolean primary, String primaryName, String secondaryName) throws Exception { + private void restart(boolean primary, int primaryID, int secondaryID) throws Exception { + restart(primary, primaryID, secondaryID, 2); + } + + private void restart(boolean primary, int primaryID, int secondaryID, int total) throws Exception { Thread.sleep(1000); if (primary) { - LOG.info("Stopping " + primaryName); - stopBroker(primaryName); + LOG.info("Stopping " + primaryID); + stopBroker(primaryID); + Assert.assertTrue(servers[secondaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total - 1)); } else { - LOG.info("Stopping " + secondaryName); - stopBroker(secondaryName); + LOG.info("Stopping " + secondaryID); + stopBroker(secondaryID); + Assert.assertTrue(servers[primaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total - 1)); } Thread.sleep(5000); if (primary) { - assertAllConnectedTo(urls.get(secondaryName)); + assertAllConnectedTo(urls.get(secondaryID)); } else { - assertAllConnectedTo(urls.get(primaryName)); + assertAllConnectedTo(urls.get(primaryID)); } if (primary) { - LOG.info("Starting " + primaryName); - createBrokerByName(primaryName); - getBroker(primaryName).waitUntilStarted(); + Configuration config = createConfig("127.0.0.1", primaryID); + + deployClusterConfiguration(config, secondaryID); + + servers[primaryID] = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl()); + servers[primaryID].start(); + + Assert.assertTrue(servers[primaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total)); + Assert.assertTrue(servers[secondaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total)); } else { - LOG.info("Starting " + secondaryName); - createBrokerByName(secondaryName); - getBroker(secondaryName).waitUntilStarted(); + Configuration config = createConfig("127.0.0.1", secondaryID); + + deployClusterConfiguration(config, primaryID); + + servers[secondaryID] = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl()); + servers[secondaryID].start(); + + Assert.assertTrue(servers[primaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total)); + Assert.assertTrue(servers[secondaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total)); } Thread.sleep(5000); - assertAllConnectedTo(urls.get(primaryName)); + assertAllConnectedTo(urls.get(primaryID)); } - private void createBrokerByName(String name) throws Exception { - if (name.equals(BROKER_A_NAME)) { - createBrokerA(); - } - else if (name.equals(BROKER_B_NAME)) { - createBrokerB(); - } - else { - throw new Exception("Unknown broker " + name); + private void stopBroker(int serverID) throws Exception { + servers[serverID].stop(); + } + + public void setClientUrl(String clientUrl) { + this.clientUrl = clientUrl; + } + + protected void createClients(int numOfClients) throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl); + for (int i = 0; i < numOfClients; i++) { + ActiveMQConnection c = (ActiveMQConnection) factory.createConnection(); + c.start(); + connections.add(c); } } - private void createBrokerA() throws Exception { - if (getBroker(BROKER_A_NAME) == null) { - addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME)); - addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, false); - addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); - getBroker(BROKER_A_NAME).start(); + protected void shutdownClients() throws JMSException { + for (Connection c : connections) { + c.close(); } } - private void createBrokerB() throws Exception { - if (getBroker(BROKER_B_NAME) == null) { - addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME)); - addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, false); - addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); - getBroker(BROKER_B_NAME).start(); + protected void assertAllConnectedTo(String url) throws Exception { + for (ActiveMQConnection c : connections) { + Assert.assertEquals(url, c.getTransportChannel().getRemoteAddress()); } } - @Override - protected void tearDown() throws Exception { - shutdownClients(); - destroyBrokerCluster(); + //default setup for most tests + private void commonSetup() throws Exception { + Configuration config0 = createConfig("127.0.0.1", 0); + Configuration config1 = createConfig("127.0.0.1", 1); + Configuration config2 = createConfig("127.0.0.1", 2); + + deployClusterConfiguration(config0, 1, 2); + deployClusterConfiguration(config1, 0, 2); + deployClusterConfiguration(config2, 0, 1); + + servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl()); + servers[2] = new EmbeddedJMS().setConfiguration(config2).setJmsConfiguration(new JMSConfigurationImpl()); + + servers[0].start(); + servers[1].start(); + servers[2].start(); + + Assert.assertTrue(servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3)); + Assert.assertTrue(servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3)); + Assert.assertTrue(servers[2].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3)); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRandomTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRandomTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRandomTest.java index 54dd3e3..80f83db 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRandomTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRandomTest.java @@ -17,47 +17,59 @@ package org.apache.activemq.transport.failover; -import junit.framework.TestCase; - import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; -public class FailoverRandomTest extends TestCase { +public class FailoverRandomTest extends OpenwireArtemisBaseTest { - BrokerService brokerA, brokerB; + private EmbeddedJMS server0, server1; - @Override + @Before public void setUp() throws Exception { - brokerA = createBroker("A"); - brokerB = createBroker("B"); - } + Configuration config0 = createConfig(0); + Configuration config1 = createConfig(1); - @Override - public void tearDown() throws Exception { - brokerA.stop(); - brokerB.stop(); + deployClusterConfiguration(config0, 1); + deployClusterConfiguration(config1, 0); + + server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl()); + + server0.start(); + server1.start(); + + server0.getActiveMQServer().setIdentity("BrokerA"); + server1.getActiveMQServer().setIdentity("BrokerB"); + + Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); } - private BrokerService createBroker(String name) throws Exception { - BrokerService broker = new BrokerService(); - broker.setBrokerName("Broker" + name); - broker.addConnector("tcp://localhost:0"); - broker.getManagementContext().setCreateConnector(false); - broker.setPersistent(false); - broker.setUseJmx(false); - broker.start(); - return broker; + @After + public void tearDown() throws Exception { + server0.stop(); + server1.stop(); } + @Test public void testRandomConnections() throws Exception { - String failoverUrl = "failover:(" + brokerA.getTransportConnectors().get(0).getConnectUri() + "," + brokerB.getTransportConnectors().get(0).getConnectUri() + ")"; + String failoverUrl = "failover:(" + newURI(0) + "," + newURI(1) + ")"; ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(failoverUrl); ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); connection.start(); String brokerName1 = connection.getBrokerName(); - assertNotNull(brokerName1); + Assert.assertNotNull(brokerName1); connection.close(); String brokerName2 = brokerName1; @@ -66,9 +78,9 @@ public class FailoverRandomTest extends TestCase { connection = (ActiveMQConnection) cf.createConnection(); connection.start(); brokerName2 = connection.getBrokerName(); - assertNotNull(brokerName2); + Assert.assertNotNull(brokerName2); connection.close(); } - assertTrue(brokerName1 + "!=" + brokerName2, !brokerName1.equals(brokerName2)); + Assert.assertTrue(brokerName1 + "!=" + brokerName2, !brokerName1.equals(brokerName2)); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java index 6b7a2bb..3be2593 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java @@ -16,19 +16,15 @@ */ package org.apache.activemq.transport.failover; -import junit.framework.Test; - import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.junit.Test; public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest { - public static Test suite() { - return suite(FailoverRedeliveryTransactionTest.class); - } - @Override public void configureConnectionFactory(ActiveMQConnectionFactory factory) { super.configureConnectionFactory(factory); @@ -36,26 +32,24 @@ public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest { } @Override - public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { - BrokerService brokerService = super.createBroker(deleteAllMessagesOnStartup, bindAddress); + public EmbeddedJMS createBroker() throws Exception { + EmbeddedJMS brokerService = super.createBroker(); PolicyMap policyMap = new PolicyMap(); PolicyEntry defaultEntry = new PolicyEntry(); defaultEntry.setPersistJMSRedelivered(true); policyMap.setDefaultEntry(defaultEntry); - brokerService.setDestinationPolicy(policyMap); + //revisit: do we support sth like persistJMSRedelivered? + //brokerService.setDestinationPolicy(policyMap); return brokerService; } // no point rerunning these @Override + @Test public void testFailoverProducerCloseBeforeTransaction() throws Exception { } @Override - public void initCombosForTestFailoverCommitReplyLost() { - } - - @Override public void testFailoverCommitReplyLost() throws Exception { } @@ -64,18 +58,10 @@ public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest { } @Override - public void initCombosForTestFailoverSendReplyLost() { - } - - @Override public void testFailoverSendReplyLost() throws Exception { } @Override - public void initCombosForTestFailoverConnectionSendReplyLost() { - } - - @Override public void testFailoverConnectionSendReplyLost() throws Exception { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java index 07a8436..c5ee02f 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java @@ -30,40 +30,43 @@ import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FailoverTimeoutTest { +public class FailoverTimeoutTest extends OpenwireArtemisBaseTest { private static final Logger LOG = LoggerFactory.getLogger(FailoverTimeoutTest.class); private static final String QUEUE_NAME = "test.failovertimeout"; - BrokerService bs; + EmbeddedJMS server; URI tcpUri; @Before public void setUp() throws Exception { - bs = new BrokerService(); - bs.setUseJmx(false); - bs.addConnector("tcp://localhost:0"); - bs.start(); - tcpUri = bs.getTransportConnectors().get(0).getConnectUri(); + Configuration config = createConfig(0); + server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl()); + server.start(); + tcpUri = new URI(newURI(0)); } @After public void tearDown() throws Exception { - if (bs != null) { - bs.stop(); + if (server != null) { + server.stop(); } } @Test public void testTimoutDoesNotFailConnectionAttempts() throws Exception { - bs.stop(); + server.stop(); long timeout = 1000; long startTime = System.currentTimeMillis(); @@ -99,7 +102,7 @@ public class FailoverTimeoutTest { TextMessage message = session.createTextMessage("Test message"); producer.send(message); - bs.stop(); + server.stop(); try { producer.send(message); @@ -108,15 +111,14 @@ public class FailoverTimeoutTest { assertEquals("Failover timeout of " + timeout + " ms reached.", jmse.getMessage()); } - bs = new BrokerService(); - bs.setUseJmx(false); - bs.addConnector(tcpUri); - bs.start(); - bs.waitUntilStarted(); + Configuration config = createConfig(0); + server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl()); + server.start(); producer.send(message); - bs.stop(); + server.stop(); + server = null; } @Test