Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9058C187B2 for ; Thu, 18 Feb 2016 17:04:09 +0000 (UTC) Received: (qmail 88635 invoked by uid 500); 18 Feb 2016 17:04:09 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 88530 invoked by uid 500); 18 Feb 2016 17:04:09 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 88304 invoked by uid 99); 18 Feb 2016 17:04:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Feb 2016 17:04:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AA3BCE38C8; Thu, 18 Feb 2016 17:04:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Thu, 18 Feb 2016 17:04:15 -0000 Message-Id: <0b876d6b3ed742f9a2bedc8c3a9ea27f@git.apache.org> In-Reply-To: <2c86ca8b16fd49a4bfff9e0b12c6faf0@git.apache.org> References: <2c86ca8b16fd49a4bfff9e0b12c6faf0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/39] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a55b8686/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java index f80b09a..3d75905 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Collection; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,22 +33,26 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TransactionRolledBackException; - -import junit.framework.TestCase; +import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.Queue; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; /** * TestCase showing the message-destroying described in AMQ-1925 */ -public class AMQ1925Test extends TestCase implements ExceptionListener { +public class AMQ1925Test extends OpenwireArtemisBaseTest implements ExceptionListener { private static final Logger log = Logger.getLogger(AMQ1925Test.class); @@ -57,7 +60,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { private static final String PROPERTY_MSG_NUMBER = "NUMBER"; private static final int MESSAGE_COUNT = 10000; - private BrokerService bs; + private EmbeddedJMS bs; private URI tcpUri; private ActiveMQConnectionFactory cf; @@ -74,17 +77,13 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { final CountDownLatch starter = new CountDownLatch(1); final AtomicBoolean restarted = new AtomicBoolean(); new Thread(new Runnable() { - @Override public void run() { try { starter.await(); // Simulate broker failure & restart bs.stop(); - bs = new BrokerService(); - bs.setPersistent(true); - bs.setUseJmx(true); - bs.addConnector(tcpUri); + bs = createNewServer(); bs.start(); restarted.set(true); @@ -97,21 +96,21 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { for (int i = 0; i < MESSAGE_COUNT; i++) { Message message = consumer.receive(500); - assertNotNull("No Message " + i + " found", message); + Assert.assertNotNull("No Message " + i + " found", message); if (i < 10) - assertFalse("Timing problem, restarted too soon", restarted.get()); + Assert.assertFalse("Timing problem, restarted too soon", restarted.get()); if (i == 10) { starter.countDown(); } if (i > MESSAGE_COUNT - 100) { - assertTrue("Timing problem, restarted too late", restarted.get()); + Assert.assertTrue("Timing problem, restarted too late", restarted.get()); } - assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER)); + Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER)); session.commit(); } - assertNull(consumer.receive(500)); + Assert.assertNull(consumer.receive(500)); consumer.close(); session.close(); @@ -133,17 +132,13 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { final CountDownLatch starter = new CountDownLatch(1); final AtomicBoolean restarted = new AtomicBoolean(); new Thread(new Runnable() { - @Override public void run() { try { starter.await(); // Simulate broker failure & restart bs.stop(); - bs = new BrokerService(); - bs.setPersistent(true); - bs.setUseJmx(true); - bs.addConnector(tcpUri); + bs = createNewServer(); bs.start(); restarted.set(true); @@ -172,12 +167,12 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { } if (i < 10) - assertFalse("Timing problem, restarted too soon", restarted.get()); + Assert.assertFalse("Timing problem, restarted too soon", restarted.get()); if (i == 10) { starter.countDown(); } if (i > MESSAGE_COUNT - 50) { - assertTrue("Timing problem, restarted too late", restarted.get()); + Assert.assertTrue("Timing problem, restarted too late", restarted.get()); } if (message1 != null) { @@ -189,8 +184,8 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { session2.commit(); } } - assertNull(consumer1.receive(500)); - assertNull(consumer2.receive(500)); + Assert.assertNull(consumer1.receive(500)); + Assert.assertNull(consumer2.receive(500)); consumer1.close(); session1.close(); @@ -203,7 +198,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { foundMissingMessages = tryToFetchMissingMessages(); } for (int i = 0; i < MESSAGE_COUNT; i++) { - assertTrue("Message-Nr " + i + " not found (" + results.size() + " total, " + foundMissingMessages + " have been found 'lingering' in the queue)", results.contains(i)); + Assert.assertTrue("Message-Nr " + i + " not found (" + results.size() + " total, " + foundMissingMessages + " have been found 'lingering' in the queue)", results.contains(i)); } assertQueueEmpty(); } @@ -231,6 +226,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { return count; } + @Test public void testAMQ1925_TXBegin() throws Exception { Connection connection = cf.createConnection(); connection.start(); @@ -241,20 +237,17 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { boolean restartDone = false; for (int i = 0; i < MESSAGE_COUNT; i++) { Message message = consumer.receive(5000); - assertNotNull(message); + Assert.assertNotNull(message); if (i == 222 && !restartDone) { // Simulate broker failure & restart bs.stop(); - bs = new BrokerService(); - bs.setPersistent(true); - bs.setUseJmx(true); - bs.addConnector(tcpUri); + bs = createNewServer(); bs.start(); restartDone = true; } - assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER)); + Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER)); try { session.commit(); } @@ -263,16 +256,17 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { i--; } } - assertNull(consumer.receive(500)); + Assert.assertNull(consumer.receive(500)); consumer.close(); session.close(); connection.close(); assertQueueEmpty(); - assertNull("no exception on connection listener: " + exception, exception); + Assert.assertNull("no exception on connection listener: " + exception, exception); } + @Test public void testAMQ1925_TXCommited() throws Exception { Connection connection = cf.createConnection(); connection.start(); @@ -281,22 +275,19 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { for (int i = 0; i < MESSAGE_COUNT; i++) { Message message = consumer.receive(5000); - assertNotNull(message); + Assert.assertNotNull(message); - assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER)); + Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER)); session.commit(); if (i == 222) { // Simulate broker failure & restart bs.stop(); - bs = new BrokerService(); - bs.setPersistent(true); - bs.setUseJmx(true); - bs.addConnector(tcpUri); + bs = createNewServer(); bs.start(); } } - assertNull(consumer.receive(500)); + Assert.assertNull(consumer.receive(500)); consumer.close(); session.close(); @@ -313,7 +304,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { Message msg = consumer.receive(500); if (msg != null) { - fail(msg.toString()); + Assert.fail(msg.toString()); } consumer.close(); @@ -324,9 +315,12 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { } private void assertQueueLength(int len) throws Exception, IOException { - Set destinations = bs.getBroker().getDestinations(new ActiveMQQueue(QUEUE_NAME)); - Queue queue = (Queue) destinations.iterator().next(); - assertEquals(len, queue.getMessageStore().getMessageCount()); + QueueImpl queue = (QueueImpl) bs.getActiveMQServer().getPostOffice().getBinding(new SimpleString("jms.queue." + QUEUE_NAME)).getBindable(); + if (len > queue.getMessageCount()) { + //we wait for a moment as the tx might still in afterCommit stage (async op) + Thread.sleep(5000); + } + Assert.assertEquals(len, queue.getMessageCount()); } private void sendMessagesToQueue() throws Exception { @@ -349,30 +343,41 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { assertQueueLength(MESSAGE_COUNT); } - @Override - protected void setUp() throws Exception { + @Before + public void setUp() throws Exception { exception = null; - bs = new BrokerService(); - bs.setDeleteAllMessagesOnStartup(true); - bs.setPersistent(true); - bs.setUseJmx(true); - TransportConnector connector = bs.addConnector("tcp://localhost:0"); + bs = createNewServer(); bs.start(); - tcpUri = connector.getConnectUri(); + //auto created queue can't survive a restart, so we need this + bs.getJMSServerManager().createQueue(false, QUEUE_NAME, null, true, QUEUE_NAME); + + tcpUri = new URI(newURI(0)); cf = new ActiveMQConnectionFactory("failover://(" + tcpUri + ")"); sendMessagesToQueue(); } - @Override - protected void tearDown() throws Exception { - new ServiceStopper().stop(bs); + @After + public void tearDown() throws Exception { + try { + if (bs != null) { + bs.stop(); + bs = null; + } + } catch (Exception e) { + log.error(e); + } + } - @Override public void onException(JMSException exception) { this.exception = exception; } + private EmbeddedJMS createNewServer() throws Exception { + Configuration config = createConfig("localhost", 0); + EmbeddedJMS server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl()); + return server; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a55b8686/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java deleted file mode 100644 index 8cac09a..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.failover; - -import java.io.IOException; -import java.net.URI; - -import junit.framework.TestCase; - -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportFactory; -import org.apache.activemq.transport.TransportListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - */ -public class BadConnectionTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(BadConnectionTest.class); - - protected Transport transport; - - public void testConnectingToUnavailableServer() throws Exception { - try { - transport.asyncRequest(new ActiveMQMessage(), null); - fail("This should never succeed"); - } - catch (IOException e) { - LOG.info("Caught expected exception: " + e, e); - } - } - - protected Transport createTransport() throws Exception { - return TransportFactory.connect(new URI("failover://(tcp://doesNotExist:1234)?useExponentialBackOff=false&maxReconnectAttempts=3&initialReconnectDelay=100")); - } - - @Override - protected void setUp() throws Exception { - transport = createTransport(); - transport.setTransportListener(new TransportListener() { - - @Override - public void onCommand(Object command) { - } - - @Override - public void onException(IOException error) { - } - - @Override - public void transportInterupted() { - } - - @Override - public void transportResumed() { - } - }); - transport.start(); - } - - @Override - protected void tearDown() throws Exception { - if (transport != null) { - transport.stop(); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a55b8686/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ClusterUtil.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ClusterUtil.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ClusterUtil.java new file mode 100644 index 0000000..42f199f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ClusterUtil.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.failover; + +/** + * Utilities to create broker clusters + */ +public class ClusterUtil { + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a55b8686/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ConnectionHangOnStartupTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ConnectionHangOnStartupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ConnectionHangOnStartupTest.java index 110e2fc..f40ee4f 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ConnectionHangOnStartupTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ConnectionHangOnStartupTest.java @@ -22,18 +22,19 @@ import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.xbean.BrokerFactoryBean; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.junit.After; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.core.io.ClassPathResource; /** * Tests for AMQ-3719 */ -public class ConnectionHangOnStartupTest { +public class ConnectionHangOnStartupTest extends OpenwireArtemisBaseTest { private static final Logger LOG = LoggerFactory.getLogger(ConnectionHangOnStartupTest.class); @@ -41,13 +42,13 @@ public class ConnectionHangOnStartupTest { // maxReconnectDelay so that the test runs faster (because it will retry // connection sooner) protected String uriString = "failover://(tcp://localhost:62001?wireFormat.maxInactivityDurationInitalDelay=1,tcp://localhost:62002?wireFormat.maxInactivityDurationInitalDelay=1)?randomize=false&maxReconnectDelay=200"; - protected BrokerService master = null; - protected AtomicReference slave = new AtomicReference<>(); + protected EmbeddedJMS master = null; + protected AtomicReference slave = new AtomicReference(); @After public void tearDown() throws Exception { - BrokerService brokerService = slave.get(); + EmbeddedJMS brokerService = slave.get(); if (brokerService != null) { brokerService.stop(); } @@ -60,28 +61,18 @@ public class ConnectionHangOnStartupTest { } protected void createMaster() throws Exception { - BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getMasterXml())); - brokerFactory.afterPropertiesSet(); - master = brokerFactory.getBroker(); + Configuration config = createConfig("localhost", 0, 62001); + master = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl()); master.start(); } protected void createSlave() throws Exception { - BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getSlaveXml())); - brokerFactory.afterPropertiesSet(); - BrokerService broker = brokerFactory.getBroker(); + Configuration config = createConfig("localhost", 1, 62002); + EmbeddedJMS broker = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl()); broker.start(); slave.set(broker); } - protected String getSlaveXml() { - return "org/apache/activemq/broker/ft/sharedFileSlave.xml"; - } - - protected String getMasterXml() { - return "org/apache/activemq/broker/ft/sharedFileMaster.xml"; - } - @Test(timeout = 60000) public void testInitialWireFormatNegotiationTimeout() throws Exception { final AtomicReference conn = new AtomicReference<>(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a55b8686/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverBackupLeakTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverBackupLeakTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverBackupLeakTest.java index cf1d43d..0875a61 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverBackupLeakTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverBackupLeakTest.java @@ -22,58 +22,59 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Session; +import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.artemis.api.jms.management.JMSServerControl; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.management.ManagementService; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.apache.activemq.util.Wait; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import java.util.concurrent.TimeUnit; + /** * Ensures connections aren't leaked when when we use backup=true and randomize=false */ -public class FailoverBackupLeakTest { +public class FailoverBackupLeakTest extends OpenwireArtemisBaseTest { + + private EmbeddedJMS s1, s2; + + @Before + public void setUp() throws Exception { - private static BrokerService s1, s2; + Configuration config0 = createConfig("127.0.0.1", 0); + Configuration config1 = createConfig("127.0.0.1", 1); - @BeforeClass - public static void setUp() throws Exception { - s1 = buildBroker("broker1"); - s2 = buildBroker("broker2"); + deployClusterConfiguration(config0, 1); + deployClusterConfiguration(config1, 0); + s1 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + s2 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl()); s1.start(); - s1.waitUntilStarted(); s2.start(); - s2.waitUntilStarted(); + + Assert.assertTrue(s1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + Assert.assertTrue(s2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); } - @AfterClass - public static void tearDown() throws Exception { + @After + public void tearDown() throws Exception { if (s2 != null) { s2.stop(); - s2.waitUntilStopped(); } if (s1 != null) { s1.stop(); - s1.waitUntilStopped(); } } - private static String getConnectString(BrokerService service) throws Exception { - return service.getTransportConnectors().get(0).getPublishableConnectString(); - } - - private static BrokerService buildBroker(String brokerName) throws Exception { - BrokerService service = new BrokerService(); - service.setBrokerName(brokerName); - service.setUseJmx(false); - service.setPersistent(false); - service.setUseShutdownHook(false); - service.addConnector("tcp://0.0.0.0:0?transport.closeAsync=false"); - return service; - } - @Test public void backupNoRandomize() throws Exception { check("backup=true&randomize=false"); @@ -85,9 +86,12 @@ public class FailoverBackupLeakTest { } private void check(String connectionProperties) throws Exception { - String s1URL = getConnectString(s1), s2URL = getConnectString(s2); + String s1URL = newURI(0), s2URL = newURI(1); String uri = "failover://(" + s1URL + "," + s2URL + ")?" + connectionProperties; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); + final int initCount1 = getConnectionCount(s1); + final int initCount2 = getConnectionCount(s2); + for (int i = 0; i < 10; i++) { buildConnection(factory); } @@ -96,7 +100,7 @@ public class FailoverBackupLeakTest { @Override public boolean isSatisified() throws Exception { - return getConnectionCount(s1) == 0; + return getConnectionCount(s1) == initCount1; } })); @@ -104,16 +108,22 @@ public class FailoverBackupLeakTest { @Override public boolean isSatisified() throws Exception { - return getConnectionCount(s2) == 0; + return getConnectionCount(s2) == initCount2; } })); } - private int getConnectionCount(BrokerService service) { - return service.getTransportConnectors().get(0).getConnections().size(); + private int getConnectionCount(EmbeddedJMS server) throws Exception { + ManagementService managementService = server.getActiveMQServer().getManagementService(); + JMSServerControl jmsControl = (JMSServerControl) managementService.getResource("jms.server"); + String[] ids = jmsControl.listConnectionIDs(); + if (ids != null) { + return ids.length; + } + return 0; } - private void buildConnection(ConnectionFactory local) throws JMSException { + private void buildConnection(ConnectionFactory local) throws Exception { Connection conn = null; Session sess = null; try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a55b8686/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java index c0c529d..bf43caa 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java @@ -16,146 +16,127 @@ */ package org.apache.activemq.transport.failover; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; import java.net.URI; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.Queue; -import javax.jms.Session; - -import junit.framework.TestCase; +import java.util.concurrent.TimeUnit; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; -public class FailoverClusterTest extends TestCase { +public class FailoverClusterTest extends OpenwireArtemisBaseTest { private static final int NUMBER = 10; - private static final String BROKER_BIND_ADDRESS = "tcp://0.0.0.0:0"; - private static final String BROKER_A_NAME = "BROKERA"; - private static final String BROKER_B_NAME = "BROKERB"; - private BrokerService brokerA; - private BrokerService brokerB; private String clientUrl; private final List connections = new ArrayList<>(); + EmbeddedJMS server1; + EmbeddedJMS server2; + + + @Before + public void setUp() throws Exception { + Configuration config1 = createConfig(1); + Configuration config2 = createConfig(2); + + deployClusterConfiguration(config1, 2); + deployClusterConfiguration(config2, 1); + + server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl()); + server2 = new EmbeddedJMS().setConfiguration(config2).setJmsConfiguration(new JMSConfigurationImpl()); + clientUrl = "failover://(" + newURI(1) + "," + newURI(2) + ")"; + } + + @After + public void tearDown() throws Exception { + for (Connection c : connections) { + c.close(); + } + server1.stop(); + server2.stop(); + } + + @Test public void testClusterConnectedAfterClients() throws Exception { + server1.start(); createClients(); - if (brokerB == null) { - brokerB = createBrokerB(BROKER_BIND_ADDRESS); - } - Thread.sleep(3000); Set set = new HashSet<>(); + server2.start(); + Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + Assert.assertTrue(server2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + + Thread.sleep(3000); + for (ActiveMQConnection c : connections) { + System.out.println("======> adding address: " + c.getTransportChannel().getRemoteAddress()); set.add(c.getTransportChannel().getRemoteAddress()); } - assertTrue(set.size() > 1); + System.out.println("============final size: " + set.size()); + Assert.assertTrue(set.size() > 1); } + //this test seems the same as the above one as long as artemis broker + //is concerned. + @Test public void testClusterURIOptionsStrip() throws Exception { + server1.start(); + createClients(); - if (brokerB == null) { - // add in server side only url param, should not be propagated - brokerB = createBrokerB(BROKER_BIND_ADDRESS + "?transport.closeAsync=false"); - } + server2.start(); + Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + Assert.assertTrue(server2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + Thread.sleep(3000); - Set set = new HashSet<>(); + + Set set = new HashSet(); for (ActiveMQConnection c : connections) { set.add(c.getTransportChannel().getRemoteAddress()); } - assertTrue(set.size() > 1); + Assert.assertTrue(set.size() > 1); } + @Test public void testClusterConnectedBeforeClients() throws Exception { - if (brokerB == null) { - brokerB = createBrokerB(BROKER_BIND_ADDRESS); - } - Thread.sleep(5000); + server1.start(); + server2.start(); + Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + Assert.assertTrue(server2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + createClients(); - Thread.sleep(2000); - brokerA.stop(); - Thread.sleep(2000); + server1.stop(); + Thread.sleep(1000); - URI brokerBURI = new URI(brokerB.getTransportConnectors().get(0).getPublishableConnectString()); + URI brokerBURI = new URI(newURI(2)); for (ActiveMQConnection c : connections) { String addr = c.getTransportChannel().getRemoteAddress(); - assertTrue(addr.indexOf("" + brokerBURI.getPort()) > 0); + Assert.assertTrue(addr.indexOf("" + brokerBURI.getPort()) > 0); } } - @Override - protected void setUp() throws Exception { - if (brokerA == null) { - brokerA = createBrokerA(BROKER_BIND_ADDRESS + "?transport.closeAsync=false"); - clientUrl = "failover://(" + brokerA.getTransportConnectors().get(0).getPublishableConnectString() + ")"; - } - } - - @Override - protected void tearDown() throws Exception { - for (Connection c : connections) { - c.close(); - } - if (brokerB != null) { - brokerB.stop(); - brokerB = null; - } - if (brokerA != null) { - brokerA.stop(); - brokerA = null; - } - } - - protected BrokerService createBrokerA(String uri) throws Exception { - BrokerService answer = new BrokerService(); - answer.setUseJmx(false); - configureConsumerBroker(answer, uri); - answer.start(); - return answer; - } - - protected void configureConsumerBroker(BrokerService answer, String uri) throws Exception { - answer.setBrokerName(BROKER_A_NAME); - answer.setPersistent(false); - TransportConnector connector = answer.addConnector(uri); - connector.setRebalanceClusterClients(true); - connector.setUpdateClusterClients(true); - answer.setUseShutdownHook(false); - } - - protected BrokerService createBrokerB(String uri) throws Exception { - BrokerService answer = new BrokerService(); - answer.setUseJmx(false); - configureNetwork(answer, uri); - answer.start(); - return answer; - } - - protected void configureNetwork(BrokerService answer, String uri) throws Exception { - answer.setBrokerName(BROKER_B_NAME); - answer.setPersistent(false); - NetworkConnector network = answer.addNetworkConnector("static://" + brokerA.getTransportConnectors().get(0).getPublishableConnectString()); - network.setDuplex(true); - TransportConnector connector = answer.addConnector(uri); - connector.setRebalanceClusterClients(true); - connector.setUpdateClusterClients(true); - answer.setUseShutdownHook(false); - } - - @SuppressWarnings("unused") protected void createClients() throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl); for (int i = 0; i < NUMBER; i++) { + System.out.println("*****create connection using url: " + clientUrl); ActiveMQConnection c = (ActiveMQConnection) factory.createConnection(); + System.out.println("got connection, starting it ..."); c.start(); + System.out.println("******Started"); Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = s.createQueue(getClass().getName()); MessageConsumer consumer = s.createConsumer(queue); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a55b8686/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java index 53f0689..1d902e3 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java @@ -16,7 +16,29 @@ */ package org.apache.activemq.transport.failover; -import org.apache.activemq.broker.TransportConnector; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; /** * Complex cluster test that will exercise the dynamic failover capabilities of @@ -25,36 +47,71 @@ import org.apache.activemq.broker.TransportConnector; * connections on the client should start with 3, then have two after the 3rd * broker is removed and then show 3 after the 3rd broker is reintroduced. */ -public class FailoverComplexClusterTest extends FailoverClusterTestSupport { +public class FailoverComplexClusterTest extends OpenwireArtemisBaseTest { private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616"; private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617"; - private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618"; - private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://127.0.0.1:61626"; - private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://127.0.0.1:61627"; - private static final String BROKER_C_NOB_TC_ADDRESS = "tcp://127.0.0.1:61628"; - private static final String BROKER_A_NAME = "BROKERA"; - private static final String BROKER_B_NAME = "BROKERB"; - private static final String BROKER_C_NAME = "BROKERC"; + + private String clientUrl; + private EmbeddedJMS[] servers = new EmbeddedJMS[3]; + + private static final int NUMBER_OF_CLIENTS = 30; + private final List connections = new ArrayList(); + + + @Before + public void setUp() throws Exception { + } + + //default setup for most tests + private void commonSetup() throws Exception { + Configuration config0 = createConfig(0); + Configuration config1 = createConfig(1); + Configuration config2 = createConfig(2); + + deployClusterConfiguration(config0, 1, 2); + deployClusterConfiguration(config1, 0, 2); + deployClusterConfiguration(config2, 0, 1); + + servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl()); + servers[2] = new EmbeddedJMS().setConfiguration(config2).setJmsConfiguration(new JMSConfigurationImpl()); + + servers[0].start(); + servers[1].start(); + servers[2].start(); + + Assert.assertTrue(servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3)); + Assert.assertTrue(servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3)); + Assert.assertTrue(servers[2].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3)); + } + + @After + public void tearDown() throws Exception { + shutdownClients(); + for (EmbeddedJMS server : servers) { + if (server != null) { + server.stop(); + } + } + } /** * Basic dynamic failover 3 broker test * * @throws Exception */ + @Test public void testThreeBrokerClusterSingleConnectorBasic() throws Exception { - - initSingleTcBroker("", null, null); - - Thread.sleep(2000); - + commonSetup(); setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")"); createClients(); - Thread.sleep(2000); + Thread.sleep(3000); runTests(false, null, null, null); } + /** * Tests a 3 broker configuration to ensure that the backup is random and * supported in a cluster. useExponentialBackOff is set to false and @@ -63,10 +120,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport { * * @throws Exception */ + @Test public void testThreeBrokerClusterSingleConnectorBackupFailoverConfig() throws Exception { - - initSingleTcBroker("", null, null); - + commonSetup(); Thread.sleep(2000); setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?backup=true&backupPoolSize=2&useExponentialBackOff=false&initialReconnectDelay=500"); @@ -84,10 +140,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport { * * @throws Exception */ + @Test public void testThreeBrokerClusterSingleConnectorWithParams() throws Exception { - - initSingleTcBroker("?transport.closeAsync=false", null, null); - + commonSetup(); Thread.sleep(2000); setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")"); createClients(); @@ -101,10 +156,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport { * * @throws Exception */ + @Test public void testThreeBrokerClusterWithClusterFilter() throws Exception { - - initSingleTcBroker("?transport.closeAsync=false", null, null); - + commonSetup(); Thread.sleep(2000); setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")"); createClients(); @@ -118,10 +172,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport { * * @throws Exception */ + @Test public void testThreeBrokerClusterMultipleConnectorBasic() throws Exception { - - initMultiTcCluster("", null); - + commonSetup(); Thread.sleep(2000); setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")"); @@ -136,9 +189,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport { * * @throws Exception */ + @Test public void testOriginalBrokerRestart() throws Exception { - initSingleTcBroker("", null, null); - + commonSetup(); Thread.sleep(2000); setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")"); @@ -147,16 +200,13 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport { assertClientsConnectedToThreeBrokers(); - getBroker(BROKER_A_NAME).stop(); - getBroker(BROKER_A_NAME).waitUntilStopped(); - removeBroker(BROKER_A_NAME); + stopServer(0); Thread.sleep(5000); assertClientsConnectedToTwoBrokers(); - createBrokerA(false, null, null, null); - getBroker(BROKER_A_NAME).waitUntilStarted(); + restartServer(0); Thread.sleep(5000); assertClientsConnectedToThreeBrokers(); @@ -168,10 +218,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport { * * @throws Exception */ + @Test public void testThreeBrokerClusterClientDistributions() throws Exception { - - initSingleTcBroker("", null, null); - + commonSetup(); Thread.sleep(2000); setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false&initialReconnectDelay=500"); createClients(100); @@ -186,10 +235,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport { * * @throws Exception */ + @Test public void testThreeBrokerClusterDestinationFilter() throws Exception { - - initSingleTcBroker("", null, null); - + commonSetup(); Thread.sleep(2000); setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")"); createClients(); @@ -197,28 +245,25 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport { runTests(false, null, null, "Queue.TEST.FOO.>"); } + @Test public void testFailOverWithUpdateClientsOnRemove() throws Exception { // Broker A - addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME)); - TransportConnector connectorA = getBroker(BROKER_A_NAME).addConnector(BROKER_A_CLIENT_TC_ADDRESS); - connectorA.setName("openwire"); - connectorA.setRebalanceClusterClients(true); - connectorA.setUpdateClusterClients(true); - connectorA.setUpdateClusterClientsOnRemove(true); //If set to false the test succeeds. - addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); - getBroker(BROKER_A_NAME).start(); - + Configuration config0 = createConfig(0, "?rebalance-cluster-client=true&update-cluster-clients=true&update-cluster-clients-on-remove=true"); // Broker B - addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME)); - TransportConnector connectorB = getBroker(BROKER_B_NAME).addConnector(BROKER_B_CLIENT_TC_ADDRESS); - connectorB.setName("openwire"); - connectorB.setRebalanceClusterClients(true); - connectorB.setUpdateClusterClients(true); - connectorB.setUpdateClusterClientsOnRemove(true); //If set to false the test succeeds. - addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); - getBroker(BROKER_B_NAME).start(); - - getBroker(BROKER_B_NAME).waitUntilStarted(); + Configuration config1 = createConfig(1, "?rebalance-cluster-client=true&update-cluster-clients=true&update-cluster-clients-on-remove=true"); + + deployClusterConfiguration(config0, 1); + deployClusterConfiguration(config1, 0); + + servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + servers[0].start(); + + servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl()); + servers[1].start(); + + servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2); + servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2); + Thread.sleep(1000); // create client connecting only to A. It should receive broker B address whet it connects to A. @@ -227,9 +272,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport { Thread.sleep(5000); // We stop broker A. - logger.info("Stopping broker A whose address is: {}", BROKER_A_CLIENT_TC_ADDRESS); - getBroker(BROKER_A_NAME).stop(); - getBroker(BROKER_A_NAME).waitUntilStopped(); + servers[0].stop(); + servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 1); + Thread.sleep(5000); // Client should failover to B. @@ -258,138 +303,150 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport { String destinationFilter) throws Exception, InterruptedException { assertClientsConnectedToThreeBrokers(); - getBroker(BROKER_C_NAME).stop(); - getBroker(BROKER_C_NAME).waitUntilStopped(); - removeBroker(BROKER_C_NAME); + stopServer(2); Thread.sleep(5000); assertClientsConnectedToTwoBrokers(); - createBrokerC(multi, tcParams, clusterFilter, destinationFilter); - getBroker(BROKER_C_NAME).waitUntilStarted(); + restartServer(2); + Thread.sleep(5000); assertClientsConnectedToThreeBrokers(); } - /** - * @param multi - * @param tcParams - * @param clusterFilter - * @param destinationFilter - * @throws Exception - * @throws InterruptedException - */ + public void setClientUrl(String clientUrl) { + this.clientUrl = clientUrl; + } + + protected void createClients() throws Exception { + createClients(NUMBER_OF_CLIENTS); + } + + protected void createClients(int numOfClients) throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl); + for (int i = 0; i < numOfClients; i++) { + ActiveMQConnection c = (ActiveMQConnection) factory.createConnection(); + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = s.createQueue(getClass().getName()); + MessageConsumer consumer = s.createConsumer(queue); + connections.add(c); + } + } + + protected void shutdownClients() throws JMSException { + for (Connection c : connections) { + c.close(); + } + } + + protected void assertClientsConnectedToThreeBrokers() { + Set set = new HashSet(); + for (ActiveMQConnection c : connections) { + if (c.getTransportChannel().getRemoteAddress() != null) { + set.add(c.getTransportChannel().getRemoteAddress()); + } + } + Assert.assertTrue("Only 3 connections should be found: " + set, set.size() == 3); + } + + protected void assertClientsConnectedToTwoBrokers() { + Set set = new HashSet(); + for (ActiveMQConnection c : connections) { + if (c.getTransportChannel().getRemoteAddress() != null) { + set.add(c.getTransportChannel().getRemoteAddress()); + } + } + Assert.assertTrue("Only 2 connections should be found: " + set, set.size() == 2); + } + + private void stopServer(int serverID) throws Exception { + servers[serverID].stop(); + for (int i = 0; i < servers.length; i++) { + if (i != serverID) { + Assert.assertTrue(servers[i].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, servers.length - 1)); + } + } + } + + private void restartServer(int serverID) throws Exception { + servers[serverID].start(); + + for (int i = 0; i < servers.length; i++) { + Assert.assertTrue(servers[i].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, servers.length)); + } + } + private void runClientDistributionTests(boolean multi, String tcParams, String clusterFilter, String destinationFilter) throws Exception, InterruptedException { assertClientsConnectedToThreeBrokers(); - assertClientsConnectionsEvenlyDistributed(.25); + //if 2/3 or more of total connections connect to one node, we consider it wrong + //if 1/4 or less of total connects to one node, we consider it wrong + assertClientsConnectionsEvenlyDistributed(.25, .67); - getBroker(BROKER_C_NAME).stop(); - getBroker(BROKER_C_NAME).waitUntilStopped(); - removeBroker(BROKER_C_NAME); + stopServer(2); Thread.sleep(5000); assertClientsConnectedToTwoBrokers(); - assertClientsConnectionsEvenlyDistributed(.35); + //now there are only 2 nodes + //if 2/3 or more of total connections go to either node, we consider it wrong + //if 1/3 or less of total connections go to either node, we consider it wrong + assertClientsConnectionsEvenlyDistributed(.34, .67); - createBrokerC(multi, tcParams, clusterFilter, destinationFilter); - getBroker(BROKER_C_NAME).waitUntilStarted(); + restartServer(2); Thread.sleep(5000); assertClientsConnectedToThreeBrokers(); - assertClientsConnectionsEvenlyDistributed(.20); - } - - @Override - protected void setUp() throws Exception { + //now back to 3 nodes. We assume at least the new node will + //have 1/10 of the total connections, and any node's connections + //won't exceed 50% + assertClientsConnectionsEvenlyDistributed(.10, .50); } - @Override - protected void tearDown() throws Exception { - shutdownClients(); - Thread.sleep(2000); - destroyBrokerCluster(); - } - - private void initSingleTcBroker(String params, String clusterFilter, String destinationFilter) throws Exception { - createBrokerA(false, params, clusterFilter, null); - createBrokerB(false, params, clusterFilter, null); - createBrokerC(false, params, clusterFilter, null); - getBroker(BROKER_C_NAME).waitUntilStarted(); - } - - private void initMultiTcCluster(String params, String clusterFilter) throws Exception { - createBrokerA(true, params, clusterFilter, null); - createBrokerB(true, params, clusterFilter, null); - createBrokerC(true, params, clusterFilter, null); - getBroker(BROKER_C_NAME).waitUntilStarted(); - } - - private void createBrokerA(boolean multi, - String params, - String clusterFilter, - String destinationFilter) throws Exception { - final String tcParams = (params == null) ? "" : params; - if (getBroker(BROKER_A_NAME) == null) { - addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME)); - addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + tcParams, true); - if (multi) { - addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS + tcParams, false); - addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); - addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); - } - else { - addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); - addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + protected void assertClientsConnectionsEvenlyDistributed(double minimumPercentage, double maximumPercentage) { + Map clientConnectionCounts = new HashMap(); + int total = 0; + for (ActiveMQConnection c : connections) { + String key = c.getTransportChannel().getRemoteAddress(); + if (key != null) { + total++; + if (clientConnectionCounts.containsKey(key)) { + double count = clientConnectionCounts.get(key); + count += 1.0; + clientConnectionCounts.put(key, count); + } + else { + clientConnectionCounts.put(key, 1.0); + } } - getBroker(BROKER_A_NAME).start(); } - } - - private void createBrokerB(boolean multi, - String params, - String clusterFilter, - String destinationFilter) throws Exception { - final String tcParams = (params == null) ? "" : params; - if (getBroker(BROKER_B_NAME) == null) { - addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME)); - addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + tcParams, true); - if (multi) { - addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS + tcParams, false); - addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); - addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + Set keys = clientConnectionCounts.keySet(); + List errorMsgs = new ArrayList(); + for (String key : keys) { + double count = clientConnectionCounts.get(key); + double percentage = count / total; + if (percentage < minimumPercentage || percentage > maximumPercentage) { + errorMsgs.add("Connections distribution expected to be within range [ " + minimumPercentage + + ", " + maximumPercentage + "]. Actuall distribution was " + percentage + " for connection " + key); } - else { - addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); - addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + if (errorMsgs.size() > 0) { + for (String err : errorMsgs) { + System.err.println(err); + } + Assert.fail("Test failed. Please see the log message for details"); } - getBroker(BROKER_B_NAME).start(); } } - private void createBrokerC(boolean multi, - String params, - String clusterFilter, - String destinationFilter) throws Exception { - final String tcParams = (params == null) ? "" : params; - if (getBroker(BROKER_C_NAME) == null) { - addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME)); - addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS + tcParams, true); - if (multi) { - addTransportConnector(getBroker(BROKER_C_NAME), "network", BROKER_C_NOB_TC_ADDRESS + tcParams, false); - addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); - addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); - } - else { - addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); - addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); - } - getBroker(BROKER_C_NAME).start(); + protected void assertAllConnectedTo(String url) throws Exception { + for (ActiveMQConnection c : connections) { + Assert.assertEquals(url, c.getTransportChannel().getRemoteAddress()); } } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a55b8686/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java index e33e7ea..78a8a0b 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java @@ -39,63 +39,61 @@ import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerPluginSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.TransactionId; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.junit.After; import org.junit.Test; -public class FailoverConsumerOutstandingCommitTest { - +@RunWith(BMUnitRunner.class) +public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTest { private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerOutstandingCommitTest.class); private static final String QUEUE_NAME = "FailoverWithOutstandingCommit"; private static final String MESSAGE_TEXT = "Test message "; - private static final String TRANSPORT_URI = "tcp://localhost:0"; - private String url; + private static final String url = newURI(0); final int prefetch = 10; - BrokerService broker; + private static EmbeddedJMS server; + private static final AtomicBoolean doByteman = new AtomicBoolean(false); + private static CountDownLatch brokerStopLatch = new CountDownLatch(1); @After public void stopBroker() throws Exception { - if (broker != null) { - broker.stop(); + if (server != null) { + server.stop(); } } - public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = createBroker(deleteAllMessagesOnStartup); - broker.start(); - } - - public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { - return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI); - } - - public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { - broker = new BrokerService(); - broker.addConnector(bindAddress); - broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); - PolicyMap policyMap = new PolicyMap(); - PolicyEntry defaultEntry = new PolicyEntry(); - - // optimizedDispatche and sync dispatch ensure that the dispatch happens - // before the commit reply that the consumer.clearDispatchList is waiting for. - defaultEntry.setOptimizedDispatch(true); - policyMap.setDefaultEntry(defaultEntry); - broker.setDestinationPolicy(policyMap); - - url = broker.getTransportConnectors().get(0).getConnectUri().toString(); - - return broker; + public void startServer() throws Exception { + server = createBroker(); + server.start(); } @Test + @BMRules( + rules = { + @BMRule( + name = "set no return response", + targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection", + targetMethod = "processCommitTransactionOnePhase", + targetLocation = "ENTRY", + binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()", + action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse(context)"), + @BMRule( + name = "stop broker before commit", + targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl", + targetMethod = "commit", + targetLocation = "ENTRY", + action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction()"), + } + ) public void testFailoverConsumerDups() throws Exception { doTestFailoverConsumerDups(true); } @@ -103,30 +101,9 @@ public class FailoverConsumerOutstandingCommitTest { @SuppressWarnings("unchecked") public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { - broker = createBroker(true); - - broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { - @Override - public void commitTransaction(ConnectionContext context, - TransactionId xid, - boolean onePhase) throws Exception { - // so commit will hang as if reply is lost - context.setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override - public void run() { - LOG.info("Stopping broker before commit..."); - try { - broker.stop(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - }}); - broker.start(); + server = createBroker(); + server.start(); + brokerStopLatch = new CountDownLatch(1); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); cf.setWatchTopicAdvisories(watchTopicAdvisories); @@ -144,9 +121,9 @@ public class FailoverConsumerOutstandingCommitTest { final CountDownLatch messagesReceived = new CountDownLatch(2); final MessageConsumer testConsumer = consumerSession.createConsumer(destination); + doByteman.set(true); testConsumer.setMessageListener(new MessageListener() { - @Override public void onMessage(Message message) { LOG.info("consume one and commit"); @@ -166,7 +143,6 @@ public class FailoverConsumerOutstandingCommitTest { // may block if broker shutodwn happens quickly Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override public void run() { LOG.info("producer started"); try { @@ -183,9 +159,11 @@ public class FailoverConsumerOutstandingCommitTest { }); // will be stopped by the plugin - broker.waitUntilStopped(); - broker = createBroker(false, url); - broker.start(); + brokerStopLatch.await(); + server.stop(); + server = createBroker(); + doByteman.set(false); + server.start(); assertTrue("consumer added through failover", commitDoneLatch.await(20, TimeUnit.SECONDS)); assertTrue("another message was received after failover", messagesReceived.await(20, TimeUnit.SECONDS)); @@ -194,11 +172,41 @@ public class FailoverConsumerOutstandingCommitTest { } @Test + @BMRules( + rules = { + @BMRule( + name = "set no return response", + targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection", + targetMethod = "processCommitTransactionOnePhase", + targetLocation = "ENTRY", + binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()", + action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse(context)"), + @BMRule( + name = "stop broker before commit", + targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl", + targetMethod = "commit", + targetLocation = "ENTRY", + action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction();return")}) public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception { doTestFailoverConsumerOutstandingSendTx(false); } @Test + @BMRules( + rules = { + @BMRule( + name = "set no return response", + targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection", + targetMethod = "processCommitTransactionOnePhase", + targetLocation = "ENTRY", + binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()", + action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse(context)"), + @BMRule( + name = "stop broker after commit", + targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl", + targetMethod = "commit", + targetLocation = "AT EXIT", + action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction()")}) public void TestFailoverConsumerOutstandingSendTxComplete() throws Exception { doTestFailoverConsumerOutstandingSendTx(true); } @@ -206,36 +214,9 @@ public class FailoverConsumerOutstandingCommitTest { @SuppressWarnings("unchecked") public void doTestFailoverConsumerOutstandingSendTx(final boolean doActualBrokerCommit) throws Exception { final boolean watchTopicAdvisories = true; - broker = createBroker(true); - - broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { - @Override - public void commitTransaction(ConnectionContext context, - TransactionId xid, - boolean onePhase) throws Exception { - // from the consumer perspective whether the commit completed on the broker or - // not is irrelevant, the transaction is still in doubt in the absence of a reply - if (doActualBrokerCommit) { - LOG.info("doing actual broker commit..."); - super.commitTransaction(context, xid, onePhase); - } - // so commit will hang as if reply is lost - context.setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override - public void run() { - LOG.info("Stopping broker before commit..."); - try { - broker.stop(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - }}); - broker.start(); + server = createBroker(); + server.start(); + brokerStopLatch = new CountDownLatch(1); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); cf.setWatchTopicAdvisories(watchTopicAdvisories); @@ -254,11 +235,11 @@ public class FailoverConsumerOutstandingCommitTest { final CountDownLatch commitDoneLatch = new CountDownLatch(1); final CountDownLatch messagesReceived = new CountDownLatch(3); final AtomicBoolean gotCommitException = new AtomicBoolean(false); - final ArrayList receivedMessages = new ArrayList<>(); + final ArrayList receivedMessages = new ArrayList(); final MessageConsumer testConsumer = consumerSession.createConsumer(destination); + doByteman.set(true); testConsumer.setMessageListener(new MessageListener() { - @Override public void onMessage(Message message) { LOG.info("consume one and commit: " + message); assertNotNull("got message", message); @@ -279,7 +260,6 @@ public class FailoverConsumerOutstandingCommitTest { // may block if broker shutdown happens quickly Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override public void run() { LOG.info("producer started"); try { @@ -296,9 +276,11 @@ public class FailoverConsumerOutstandingCommitTest { }); // will be stopped by the plugin - broker.waitUntilStopped(); - broker = createBroker(false, url); - broker.start(); + brokerStopLatch.await(); + server.stop(); + doByteman.set(false); + server = createBroker(); + server.start(); assertTrue("commit done through failover", commitDoneLatch.await(20, TimeUnit.SECONDS)); assertTrue("commit failed", gotCommitException.get()); @@ -313,12 +295,13 @@ public class FailoverConsumerOutstandingCommitTest { assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(receivedIndex++).getText()); connection.close(); + server.stop(); } @Test public void testRollbackFailoverConsumerTx() throws Exception { - broker = createBroker(true); - broker.start(); + server = createBroker(); + server.start(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); cf.setConsumerFailoverRedeliveryWaitPeriod(10000); @@ -340,10 +323,9 @@ public class FailoverConsumerOutstandingCommitTest { assertNotNull(msg); // restart with outstanding delivered message - broker.stop(); - broker.waitUntilStopped(); - broker = createBroker(false, url); - broker.start(); + server.stop(); + server = createBroker(); + server.start(); consumerSession.rollback(); @@ -379,4 +361,29 @@ public class FailoverConsumerOutstandingCommitTest { } producer.close(); } + + public static void holdResponse(AMQConnectionContext context) { + if (doByteman.get()) { + context.setDontSendReponse(true); + } + } + + public static void stopServerInTransaction() { + if (doByteman.get()) { + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("Stopping broker in transaction..."); + try { + server.stop(); + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + brokerStopLatch.countDown(); + } + } + }); + } + } }