Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8168A18647 for ; Tue, 15 Mar 2016 20:21:46 +0000 (UTC) Received: (qmail 92671 invoked by uid 500); 15 Mar 2016 20:21:45 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 92563 invoked by uid 500); 15 Mar 2016 20:21:45 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 91929 invoked by uid 99); 15 Mar 2016 20:21:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Mar 2016 20:21:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4F62AE03EC; Tue, 15 Mar 2016 20:21:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 15 Mar 2016 20:21:59 -0000 Message-Id: <160e4b6e30c74f2b93d1b27d08d99045@git.apache.org> In-Reply-To: <738f05eef7984484a9153210c822d18d@git.apache.org> References: <738f05eef7984484a9153210c822d18d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/59] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java deleted file mode 100644 index 8d94998..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.advisory.AdvisorySupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.DeadLetterStrategy; -import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ4517Test { - - private BrokerService brokerService; - private String connectionUri; - - @Before - public void setup() throws Exception { - brokerService = new BrokerService(); - - connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString(); - - // Configure Dead Letter Strategy - DeadLetterStrategy strategy = new IndividualDeadLetterStrategy(); - ((IndividualDeadLetterStrategy) strategy).setUseQueueForQueueMessages(true); - ((IndividualDeadLetterStrategy) strategy).setQueuePrefix("DLQ."); - strategy.setProcessNonPersistent(false); - strategy.setProcessExpired(false); - - // Add policy and individual DLQ strategy - PolicyEntry policy = new PolicyEntry(); - policy.setTimeBeforeDispatchStarts(3000); - policy.setDeadLetterStrategy(strategy); - - PolicyMap pMap = new PolicyMap(); - pMap.setDefaultEntry(policy); - - brokerService.setDestinationPolicy(pMap); - brokerService.setPersistent(false); - brokerService.start(); - } - - @After - public void stop() throws Exception { - brokerService.stop(); - } - - @Test(timeout = 360000) - public void test() throws Exception { - - final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri); - - final AtomicBoolean advised = new AtomicBoolean(false); - Connection connection = cf.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination dlqDestination = session.createTopic(AdvisorySupport.MESSAGE_DLQ_TOPIC_PREFIX + ">"); - MessageConsumer consumer = session.createConsumer(dlqDestination); - consumer.setMessageListener(new MessageListener() { - - @Override - public void onMessage(Message message) { - advised.set(true); - } - }); - connection.start(); - - ExecutorService service = Executors.newSingleThreadExecutor(); - - service.execute(new Runnable() { - @Override - public void run() { - try { - ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTemporaryQueue(); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - producer.setTimeToLive(400); - producer.send(session.createTextMessage()); - producer.send(session.createTextMessage()); - TimeUnit.MILLISECONDS.sleep(500); - connection.close(); - } - catch (Exception e) { - } - } - }); - - service.shutdown(); - assertTrue(service.awaitTermination(1, TimeUnit.MINUTES)); - assertFalse("Should not get any Advisories for DLQ'd Messages", advised.get()); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java deleted file mode 100644 index 92021bf..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.advisory.AdvisorySupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.DeadLetterStrategy; -import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ4518Test { - - private BrokerService brokerService; - private String connectionUri; - - @Before - public void setup() throws Exception { - brokerService = new BrokerService(); - - connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString(); - - // Configure Dead Letter Strategy - DeadLetterStrategy strategy = new IndividualDeadLetterStrategy(); - ((IndividualDeadLetterStrategy) strategy).setUseQueueForQueueMessages(true); - ((IndividualDeadLetterStrategy) strategy).setQueuePrefix("DLQ."); - strategy.setProcessNonPersistent(false); - strategy.setProcessExpired(false); - - // Add policy and individual DLQ strategy - PolicyEntry policy = new PolicyEntry(); - policy.setTimeBeforeDispatchStarts(3000); - policy.setDeadLetterStrategy(strategy); - - PolicyMap pMap = new PolicyMap(); - pMap.setDefaultEntry(policy); - - brokerService.setDestinationPolicy(pMap); - brokerService.setPersistent(false); - brokerService.start(); - } - - @After - public void stop() throws Exception { - brokerService.stop(); - } - - @Test(timeout = 360000) - public void test() throws Exception { - - final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri); - - final AtomicBoolean advised = new AtomicBoolean(false); - Connection connection = cf.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination dlqDestination = session.createTopic(AdvisorySupport.EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + ">"); - MessageConsumer consumer = session.createConsumer(dlqDestination); - consumer.setMessageListener(new MessageListener() { - - @Override - public void onMessage(Message message) { - advised.set(true); - } - }); - connection.start(); - - ExecutorService service = Executors.newSingleThreadExecutor(); - - service.execute(new Runnable() { - @Override - public void run() { - try { - ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTemporaryQueue(); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - producer.setTimeToLive(400); - producer.send(session.createTextMessage()); - producer.send(session.createTextMessage()); - TimeUnit.MILLISECONDS.sleep(500); - connection.close(); - } - catch (Exception e) { - } - } - }); - - service.shutdown(); - assertTrue(service.awaitTermination(1, TimeUnit.MINUTES)); - assertFalse("Should not get any Advisories for Expired Messages", advised.get()); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java deleted file mode 100644 index d57501e..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; - -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.TabularDataSupport; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.CompositeDataConstants; -import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.command.ActiveMQQueue; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ4530Test { - - private static BrokerService brokerService; - private static String TEST_QUEUE = "testQueue"; - private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE); - private static String BROKER_ADDRESS = "tcp://localhost:0"; - private static String KEY = "testproperty"; - private static String VALUE = "propvalue"; - - private ActiveMQConnectionFactory connectionFactory; - private String connectionUri; - - @Before - public void setUp() throws Exception { - brokerService = new BrokerService(); - brokerService.setPersistent(false); - brokerService.setUseJmx(true); - connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); - brokerService.start(); - brokerService.waitUntilStarted(); - - connectionFactory = new ActiveMQConnectionFactory(connectionUri); - sendMessage(); - } - - public void sendMessage() throws Exception { - final Connection conn = connectionFactory.createConnection(); - try { - conn.start(); - final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Destination queue = session.createQueue(TEST_QUEUE); - final Message toSend = session.createMessage(); - toSend.setStringProperty(KEY, VALUE); - final MessageProducer producer = session.createProducer(queue); - producer.send(queue, toSend); - } - finally { - conn.close(); - } - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - } - - @SuppressWarnings("unchecked") - @Test - public void testStringPropertiesFromCompositeData() throws Exception { - final QueueViewMBean queueView = getProxyToQueueViewMBean(); - final CompositeData message = queueView.browse()[0]; - assertNotNull(message); - TabularDataSupport stringProperties = (TabularDataSupport) message.get(CompositeDataConstants.STRING_PROPERTIES); - assertNotNull(stringProperties); - assertThat(stringProperties.size(), is(greaterThan(0))); - Map.Entry compositeDataEntry = (Map.Entry) stringProperties.entrySet().toArray()[0]; - CompositeData stringEntry = (CompositeData) compositeDataEntry.getValue(); - assertThat(String.valueOf(stringEntry.get("key")), equalTo(KEY)); - assertThat(String.valueOf(stringEntry.get("value")), equalTo(VALUE)); - } - - private QueueViewMBean getProxyToQueueViewMBean() throws MalformedObjectNameException, NullPointerException, JMSException { - final ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName()); - final QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); - return proxy; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java deleted file mode 100644 index d303561..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.io.Writer; -import java.lang.management.ManagementFactory; -import java.util.concurrent.CountDownLatch; - -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Unit test for simple App. - */ -public class AMQ4531Test extends TestCase { - - private final Logger LOG = LoggerFactory.getLogger(AMQ4531Test.class); - - private String connectionURI; - private MBeanServer mbeanServer; - private BrokerService broker; - - @Override - protected void setUp() throws Exception { - super.setUp(); - broker = new BrokerService(); - connectionURI = broker.addConnector("tcp://0.0.0.0:0?maximumConnections=1").getPublishableConnectString(); - broker.setPersistent(false); - broker.start(); - mbeanServer = ManagementFactory.getPlatformMBeanServer(); - } - - @Override - protected void tearDown() throws Exception { - broker.stop(); - super.tearDown(); - } - - /** - * Create the test case - * - * @param testName name of the test case - */ - public AMQ4531Test(String testName) { - super(testName); - } - - /** - * @return the suite of tests being tested - */ - public static Test suite() { - return new TestSuite(AMQ4531Test.class); - } - - public void testFDSLeak() throws Exception { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI); - ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); - connection.start(); - - int connections = 100; - final long original = openFileDescriptorCount(); - LOG.info("FD count: " + original); - final CountDownLatch done = new CountDownLatch(connections); - for (int i = 0; i < connections; i++) { - new Thread("worker: " + i) { - @Override - public void run() { - ActiveMQConnection connection = null; - try { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI); - connection = (ActiveMQConnection) factory.createConnection(); - connection.start(); - } - catch (Exception e) { - LOG.debug(getStack(e)); - } - finally { - try { - connection.close(); - } - catch (Exception e) { - LOG.debug(getStack(e)); - } - done.countDown(); - LOG.debug("Latch count down called."); - } - } - }.start(); - } - - // Wait for all the clients to finish - LOG.info("Waiting for latch..."); - done.await(); - LOG.info("Latch complete."); - LOG.info("FD count: " + openFileDescriptorCount()); - - assertTrue("Too many open file descriptors: " + openFileDescriptorCount(), Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - long openFDs = openFileDescriptorCount(); - LOG.info("Current FD count [{}], original FD count[{}]", openFDs, original); - return (openFDs - original) < 10; - } - })); - } - - private long openFileDescriptorCount() throws Exception { - return ((Long) mbeanServer.getAttribute(new ObjectName("java.lang:type=OperatingSystem"), "OpenFileDescriptorCount")).longValue(); - } - - private String getStack(Throwable aThrowable) { - final Writer result = new StringWriter(); - final PrintWriter printWriter = new PrintWriter(result); - aThrowable.printStackTrace(printWriter); - return result.toString(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java deleted file mode 100644 index 1113ee4..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Unit test for simple App. - */ -public class AMQ4554Test extends TestCase { - - private final Logger LOG = LoggerFactory.getLogger(AMQ4554Test.class); - - private String connectionURI; - private BrokerService broker; - - @Override - protected void setUp() throws Exception { - super.setUp(); - broker = new BrokerService(); - connectionURI = broker.addConnector("tcp://0.0.0.0:0?maximumConnections=1").getPublishableConnectString(); - broker.setPersistent(false); - broker.start(); - } - - @Override - protected void tearDown() throws Exception { - broker.stop(); - super.tearDown(); - } - - /** - * Create the test case - * - * @param testName name of the test case - */ - public AMQ4554Test(String testName) { - super(testName); - } - - /** - * @return the suite of tests being tested - */ - public static Test suite() { - return new TestSuite(AMQ4554Test.class); - } - - public void testMSXProducerTXID() throws Exception { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI); - Connection connection = factory.createConnection(); - connection.start(); - - Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer producer = producerSession.createProducer(producerSession.createQueue("myQueue")); - TextMessage producerMessage = producerSession.createTextMessage("Test Message"); - producer.send(producerMessage); - producer.close(); - producerSession.commit(); - producerSession.close(); - - Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue("myQueue")); - Message consumerMessage = consumer.receive(1000); - try { - String txId = consumerMessage.getStringProperty("JMSXProducerTXID"); - assertNotNull(txId); - } - catch (Exception e) { - LOG.info("Caught Exception that was not expected:", e); - fail("Should not throw"); - } - consumer.close(); - consumerSession.commit(); - consumerSession.close(); - connection.close(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java deleted file mode 100644 index 9612a34..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.IOException; - -import javax.jms.Connection; -import javax.jms.Session; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.util.ConsumerThread; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4582Test { - - private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4582Test.class); - - BrokerService broker; - Connection connection; - Session session; - - public static final String KEYSTORE_TYPE = "jks"; - public static final String PASSWORD = "password"; - public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore"; - public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore"; - - public static final int PRODUCER_COUNT = 10; - public static final int CONSUMER_COUNT = 10; - public static final int MESSAGE_COUNT = 1000; - - final ConsumerThread[] consumers = new ConsumerThread[CONSUMER_COUNT]; - - @Before - public void setUp() throws Exception { - System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE); - System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD); - System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE); - System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE); - System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE); - System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD); - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - try { - broker.stop(); - } - catch (Exception e) { - } - } - } - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Test - public void simpleTest() throws Exception { - thrown.expect(IOException.class); - thrown.expectMessage("enabledCipherSuites=BADSUITE"); - - broker = new BrokerService(); - broker.setPersistent(false); - broker.setUseJmx(false); - try { - broker.addConnector("ssl://localhost:0?transport.needClientAuth=true&transport.enabledCipherSuites=BADSUITE"); - broker.start(); - broker.waitUntilStarted(); - } - catch (Exception e) { - LOG.info("BrokerService threw:", e); - throw e; - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java deleted file mode 100644 index 3c16bab..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.net.URI; -import java.util.Date; -import java.util.Enumeration; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.QueueBrowser; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.command.ActiveMQQueue; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertEquals; - -public class AMQ4595Test { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ4595Test.class); - - private BrokerService broker; - private URI connectUri; - private ActiveMQConnectionFactory factory; - - @Before - public void startBroker() throws Exception { - broker = new BrokerService(); - TransportConnector connector = broker.addConnector("vm://localhost"); - broker.deleteAllMessages(); - - //PolicyMap pMap = new PolicyMap(); - //PolicyEntry policyEntry = new PolicyEntry(); - //policyEntry.setMaxBrowsePageSize(10000); - //pMap.put(new ActiveMQQueue(">"), policyEntry); - // when no policy match, browserSub has maxMessages==0 - //broker.setDestinationPolicy(pMap); - - broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024); - broker.start(); - broker.waitUntilStarted(); - connectUri = connector.getConnectUri(); - factory = new ActiveMQConnectionFactory(connectUri); - } - - @After - public void stopBroker() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - } - - @Test(timeout = 120000) - public void testBrowsingSmallBatch() throws JMSException { - doTestBrowsing(100); - } - - @Test(timeout = 160000) - public void testBrowsingMediumBatch() throws JMSException { - doTestBrowsing(1000); - } - - @Test(timeout = 300000) - public void testBrowsingLargeBatch() throws JMSException { - doTestBrowsing(10000); - } - - private void doTestBrowsing(int messageToSend) throws JMSException { - ActiveMQQueue queue = new ActiveMQQueue("TEST"); - - // Send the messages to the Queue. - ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection(); - producerConnection.setUseAsyncSend(true); - producerConnection.start(); - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - for (int i = 1; i <= messageToSend; i++) { - String msgStr = provideMessageText(i, 8192); - producer.send(producerSession.createTextMessage(msgStr)); - if ((i % 1000) == 0) { - LOG.info("P&C: {}", msgStr.substring(0, 100)); - } - } - producerConnection.close(); - - LOG.info("Mem usage after producer done: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%"); - - // Browse the queue. - Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - - QueueBrowser browser = session.createBrowser(queue); - Enumeration enumeration = browser.getEnumeration(); - int browsed = 0; - while (enumeration.hasMoreElements()) { - TextMessage m = (TextMessage) enumeration.nextElement(); - browsed++; - if ((browsed % 1000) == 0) { - LOG.info("B[{}]: {}", browsed, m.getText().substring(0, 100)); - } - } - browser.close(); - session.close(); - connection.close(); - - LOG.info("Mem usage after browser closed: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%"); - - // The number of messages browsed should be equal to the number of messages sent. - assertEquals(messageToSend, browsed); - - browser.close(); - } - - public String provideMessageText(int messageNumber, int messageSize) { - StringBuilder buf = new StringBuilder(); - buf.append("Message: "); - if (messageNumber > 0) { - buf.append(messageNumber); - } - buf.append(" sent at: ").append(new Date()); - - if (buf.length() > messageSize) { - return buf.substring(0, messageSize); - } - for (int i = buf.length(); i < messageSize; i++) { - buf.append(' '); - } - return buf.toString(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java deleted file mode 100644 index 527309b..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java +++ /dev/null @@ -1,263 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.lang.Thread.UncaughtExceptionHandler; -import java.net.URI; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Map.Entry; -import javax.jms.Destination; -import javax.jms.MessageConsumer; - -import junit.framework.Test; - -import org.apache.activemq.JmsMultipleBrokersTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.ManagementContext; -import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory; -import org.apache.activemq.network.NetworkConnector; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4607Test extends JmsMultipleBrokersTestSupport implements UncaughtExceptionHandler { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ4607Test.class); - - public static final int BROKER_COUNT = 3; - public static final int CONSUMER_COUNT = 1; - public static final int MESSAGE_COUNT = 0; - public static final boolean CONDUIT = true; - public static final int TIMEOUT = 20000; - - public boolean duplex = true; - protected Map consumerMap; - final Map unhandeledExceptions = new HashMap<>(); - - private void assertNoUnhandeledExceptions() { - for (Entry e : unhandeledExceptions.entrySet()) { - LOG.error("Thread:" + e.getKey() + " Had unexpected: " + e.getValue()); - } - assertTrue("There are no unhandelled exceptions, see: log for detail on: " + unhandeledExceptions, unhandeledExceptions.isEmpty()); - } - - public NetworkConnector bridge(String from, String to) throws Exception { - NetworkConnector networkConnector = bridgeBrokers(from, to, true, -1, CONDUIT); - networkConnector.setSuppressDuplicateQueueSubscriptions(true); - networkConnector.setDecreaseNetworkConsumerPriority(true); - networkConnector.setConsumerTTL(1); - networkConnector.setDuplex(duplex); - return networkConnector; - } - - public static Test suite() { - return suite(AMQ4607Test.class); - } - - public void initCombos() { - addCombinationValues("duplex", new Boolean[]{Boolean.TRUE, Boolean.FALSE}); - } - - public void testMigratingConsumer() throws Exception { - bridge("Broker0", "Broker1"); - if (!duplex) - bridge("Broker1", "Broker0"); - - bridge("Broker1", "Broker2"); - if (!duplex) - bridge("Broker2", "Broker1"); - - bridge("Broker0", "Broker2"); - if (!duplex) - bridge("Broker2", "Broker0"); - - startAllBrokers(); - this.waitForBridgeFormation(); - - Destination dest = createDestination("TEST.FOO", false); - sendMessages("Broker0", dest, 1); - - for (int i = 0; i < BROKER_COUNT; i++) { - MessageConsumer messageConsumer = createConsumer("Broker" + i, dest, "DoNotConsume = 'true'"); - - for (int J = 0; J < BROKER_COUNT; J++) { - assertExactConsumersConnect("Broker" + J, dest, CONSUMER_COUNT, TIMEOUT); - } - - assertNoUnhandeledExceptions(); - - assertExactMessageCount("Broker" + i, dest, 1, TIMEOUT); - - messageConsumer.close(); - LOG.info("Check for no consumers.."); - for (int J = 0; J < BROKER_COUNT; J++) { - assertExactConsumersConnect("Broker" + J, dest, 0, TIMEOUT); - } - } - - // now consume the message - final String brokerId = "Broker2"; - MessageConsumer messageConsumer = createConsumer(brokerId, dest); - assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return brokers.get(brokerId).allMessages.getMessageIds().size() == 1; - } - })); - messageConsumer.close(); - - } - - public void testMigratingConsumerFullCircle() throws Exception { - bridge("Broker0", "Broker1"); - if (!duplex) - bridge("Broker1", "Broker0"); - - bridge("Broker1", "Broker2"); - if (!duplex) - bridge("Broker2", "Broker1"); - - bridge("Broker0", "Broker2"); - if (!duplex) - bridge("Broker2", "Broker0"); - - // allow full loop, immediate replay back to 0 from 2 - ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory(); - conditionalNetworkBridgeFilterFactory.setReplayDelay(0); - conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true); - brokers.get("Broker2").broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory); - startAllBrokers(); - this.waitForBridgeFormation(); - - Destination dest = createDestination("TEST.FOO", false); - - sendMessages("Broker0", dest, 1); - - for (int i = 0; i < BROKER_COUNT; i++) { - MessageConsumer messageConsumer = createConsumer("Broker" + i, dest, "DoNotConsume = 'true'"); - - for (int J = 0; J < BROKER_COUNT; J++) { - assertExactConsumersConnect("Broker" + J, dest, CONSUMER_COUNT, TIMEOUT); - } - - assertNoUnhandeledExceptions(); - - // validate the message has been forwarded - assertExactMessageCount("Broker" + i, dest, 1, TIMEOUT); - - messageConsumer.close(); - LOG.info("Check for no consumers.."); - for (int J = 0; J < BROKER_COUNT; J++) { - assertExactConsumersConnect("Broker" + J, dest, 0, TIMEOUT); - } - } - - // now consume the message from the origin - LOG.info("Consume from origin..."); - final String brokerId = "Broker0"; - MessageConsumer messageConsumer = createConsumer(brokerId, dest); - assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return brokers.get(brokerId).allMessages.getMessageIds().size() == 1; - } - })); - messageConsumer.close(); - - } - - protected void assertExactMessageCount(final String brokerName, - Destination destination, - final int count, - long timeout) throws Exception { - ManagementContext context = brokers.get(brokerName).broker.getManagementContext(); - final QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false); - assertTrue("Excepected queue depth: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - long currentCount = queueViewMBean.getQueueSize(); - LOG.info("On " + brokerName + " current queue size for " + queueViewMBean + ", " + currentCount); - if (count != currentCount) { - LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions())); - } - return currentCount == count; - } - }, timeout)); - } - - protected void assertExactConsumersConnect(final String brokerName, - Destination destination, - final int count, - long timeout) throws Exception { - final ManagementContext context = brokers.get(brokerName).broker.getManagementContext(); - assertTrue("Excepected consumers count: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - try { - QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false); - long currentCount = queueViewMBean.getConsumerCount(); - LOG.info("On " + brokerName + " current consumer count for " + queueViewMBean + ", " + currentCount); - if (count != currentCount) { - LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions())); - } - return currentCount == count; - } - catch (Exception e) { - LOG.warn("Unexpected: " + e, e); - return false; - } - } - }, timeout)); - } - - @Override - public void setUp() throws Exception { - super.setUp(); - - unhandeledExceptions.clear(); - Thread.setDefaultUncaughtExceptionHandler(this); - - // Setup n brokers - for (int i = 0; i < BROKER_COUNT; i++) { - createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true")); - } - - consumerMap = new LinkedHashMap<>(); - } - - @Override - protected void configureBroker(BrokerService brokerService) { - PolicyEntry policyEntry = new PolicyEntry(); - policyEntry.setExpireMessagesPeriod(0); - PolicyMap policyMap = new PolicyMap(); - policyMap.setDefaultEntry(policyEntry); - brokerService.setDestinationPolicy(policyMap); - } - - @Override - public void uncaughtException(Thread t, Throwable e) { - synchronized (unhandeledExceptions) { - unhandeledExceptions.put(t, e); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java deleted file mode 100644 index 9cb9c66..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java +++ /dev/null @@ -1,263 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.IOException; -import java.sql.SQLException; -import java.util.concurrent.CountDownLatch; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.store.jdbc.DataSourceServiceSupport; -import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; -import org.apache.activemq.store.jdbc.LeaseDatabaseLocker; -import org.apache.activemq.store.jdbc.TransactionContext; -import org.apache.activemq.util.IOHelper; -import org.apache.activemq.util.LeaseLockerIOExceptionHandler; -import org.apache.derby.jdbc.EmbeddedDataSource; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.fail; - -/** - * Testing how the broker reacts when a SQL Exception is thrown from - * org.apache.activemq.store.jdbc.TransactionContext.executeBatch(). - *
- * see https://issues.apache.org/jira/browse/AMQ-4636 - */ -public class AMQ4636Test { - - private static final String MY_TEST_TOPIC = "MY_TEST_TOPIC"; - private static final Logger LOG = LoggerFactory.getLogger(AMQ4636Test.class); - private String transportUrl = "tcp://0.0.0.0:0"; - private BrokerService broker; - EmbeddedDataSource embeddedDataSource; - CountDownLatch throwSQLException = new CountDownLatch(0); - - @Before - public void startBroker() throws Exception { - broker = createBroker(); - broker.deleteAllMessages(); - broker.start(); - broker.waitUntilStarted(); - LOG.info("Broker started..."); - } - - @After - public void stopBroker() throws Exception { - if (broker != null) { - LOG.info("Stopping broker..."); - broker.stop(); - broker.waitUntilStopped(); - } - try { - if (embeddedDataSource != null) { - // ref http://svn.apache.org/viewvc/db/derby/code/trunk/java/testing/org/apache/derbyTesting/junit/JDBCDataSource.java?view=markup - embeddedDataSource.setShutdownDatabase("shutdown"); - embeddedDataSource.getConnection(); - } - } - catch (Exception ignored) { - } - finally { - embeddedDataSource.setShutdownDatabase(null); - } - } - - protected BrokerService createBroker() throws Exception { - - embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()); - embeddedDataSource.setCreateDatabase("create"); - embeddedDataSource.getConnection().close(); - - //wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch() - // method that can be configured to throw a SQL exception on demand - JDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter(); - jdbc.setDataSource(embeddedDataSource); - - jdbc.setLockKeepAlivePeriod(1000L); - LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); - leaseDatabaseLocker.setLockAcquireSleepInterval(2000L); - jdbc.setLocker(leaseDatabaseLocker); - - broker = new BrokerService(); - PolicyMap policyMap = new PolicyMap(); - PolicyEntry defaultEntry = new PolicyEntry(); - defaultEntry.setExpireMessagesPeriod(0); - policyMap.setDefaultEntry(defaultEntry); - broker.setDestinationPolicy(policyMap); - broker.setPersistenceAdapter(jdbc); - - broker.setIoExceptionHandler(new LeaseLockerIOExceptionHandler()); - - transportUrl = broker.addConnector(transportUrl).getPublishableConnectString(); - return broker; - } - - /** - * adding a TestTransactionContext (wrapper to TransactionContext) so an SQLException is triggered - * during TransactionContext.executeBatch() when called in the broker. - *
- * Expectation: SQLException triggers a connection shutdown and failover should kick and try to redeliver the - * message. SQLException should NOT be returned to client - */ - @Test - public void testProducerWithDBShutdown() throws Exception { - - // failover but timeout in 1 seconds so the test does not hang - String failoverTransportURL = "failover:(" + transportUrl + ")?timeout=1000"; - - this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL); - - this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, false, false); - - } - - @Test - public void testTransactedProducerCommitWithDBShutdown() throws Exception { - - // failover but timeout in 1 seconds so the test does not hang - String failoverTransportURL = "failover:(" + transportUrl + ")?timeout=1000"; - - this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL); - - try { - this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, true); - fail("Expect rollback after failover - inddoubt commit"); - } - catch (javax.jms.TransactionRolledBackException expectedInDoubt) { - LOG.info("Got rollback after failover failed commit", expectedInDoubt); - } - } - - @Test - public void testTransactedProducerRollbackWithDBShutdown() throws Exception { - - // failover but timeout in 1 seconds so the test does not hang - String failoverTransportURL = "failover:(" + transportUrl + ")?timeout=1000"; - - this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL); - - this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, false); - } - - public void createDurableConsumer(String topic, String transportURL) throws JMSException { - Connection connection = null; - LOG.info("*** createDurableConsumer() called ..."); - - try { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL); - - connection = factory.createConnection(); - connection.setClientID("myconn1"); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTopic(topic); - - TopicSubscriber topicSubscriber = session.createDurableSubscriber((Topic) destination, "MySub1"); - } - finally { - if (connection != null) { - connection.close(); - } - } - } - - public void sendMessage(String topic, String transportURL, boolean transacted, boolean commit) throws JMSException { - Connection connection = null; - - try { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL); - - connection = factory.createConnection(); - Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTopic(topic); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - Message m = session.createTextMessage("testMessage"); - LOG.info("*** send message to broker..."); - - // trigger SQL exception in transactionContext - throwSQLException = new CountDownLatch(1); - producer.send(m); - - if (transacted) { - if (commit) { - session.commit(); - } - else { - session.rollback(); - } - } - - LOG.info("*** Finished send message to broker"); - - } - finally { - if (connection != null) { - connection.close(); - } - } - } - - /* - * Mock classes used for testing - */ - - public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter { - - @Override - public TransactionContext getTransactionContext() throws IOException { - return new TestTransactionContext(this); - } - } - - public class TestTransactionContext extends TransactionContext { - - public TestTransactionContext(JDBCPersistenceAdapter jdbcPersistenceAdapter) throws IOException { - super(jdbcPersistenceAdapter); - } - - @Override - public void executeBatch() throws SQLException { - if (throwSQLException.getCount() > 0) { - // only throw exception once - throwSQLException.countDown(); - throw new SQLException("TEST SQL EXCEPTION"); - } - super.executeBatch(); - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java deleted file mode 100644 index 0fb900a..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.Arrays; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.BrokerView; -import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; -import org.apache.activemq.broker.region.policy.FilePendingDurableSubscriberMessageStoragePolicy; -import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@RunWith(value = Parameterized.class) -public class AMQ4656Test { - - private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4656Test.class); - private static BrokerService brokerService; - private static String BROKER_ADDRESS = "tcp://localhost:0"; - - private String connectionUri; - - @Parameterized.Parameter - public PendingDurableSubscriberMessageStoragePolicy pendingDurableSubPolicy; - - @Parameterized.Parameters(name = "{0}") - public static Iterable getTestParameters() { - return Arrays.asList(new Object[][]{{new FilePendingDurableSubscriberMessageStoragePolicy()}, {new StorePendingDurableSubscriberMessageStoragePolicy()}}); - } - - @Before - public void setUp() throws Exception { - brokerService = new BrokerService(); - PolicyMap policyMap = new PolicyMap(); - PolicyEntry defaultEntry = new PolicyEntry(); - defaultEntry.setPendingDurableSubscriberPolicy(pendingDurableSubPolicy); - policyMap.setDefaultEntry(defaultEntry); - brokerService.setDestinationPolicy(policyMap); - brokerService.setPersistent(false); - brokerService.setUseJmx(true); - brokerService.setDeleteAllMessagesOnStartup(true); - connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); - brokerService.start(); - brokerService.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - } - - @Test - public void testDurableConsumerEnqueueCountWithZeroPrefetch() throws Exception { - - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); - - Connection connection = connectionFactory.createConnection(); - connection.setClientID(getClass().getName()); - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTopic("DurableTopic"); - - MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "EnqueueSub"); - - BrokerView view = brokerService.getAdminView(); - view.getDurableTopicSubscribers(); - - ObjectName subName = view.getDurableTopicSubscribers()[0]; - - DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) brokerService.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true); - - assertEquals(0, sub.getEnqueueCounter()); - assertEquals(0, sub.getDequeueCounter()); - assertEquals(0, sub.getPendingQueueSize()); - assertEquals(0, sub.getDispatchedCounter()); - assertEquals(0, sub.getDispatchedQueueSize()); - - consumer.close(); - - MessageProducer producer = session.createProducer(destination); - for (int i = 0; i < 20; i++) { - producer.send(session.createMessage()); - } - producer.close(); - - consumer = session.createDurableSubscriber((Topic) destination, "EnqueueSub"); - - Thread.sleep(1000); - - assertEquals(20, sub.getEnqueueCounter()); - assertEquals(0, sub.getDequeueCounter()); - assertEquals(0, sub.getPendingQueueSize()); - assertEquals(20, sub.getDispatchedCounter()); - assertEquals(20, sub.getDispatchedQueueSize()); - - LOG.info("Pending Queue Size with no receives: {}", sub.getPendingQueueSize()); - - assertNotNull(consumer.receive(1000)); - assertNotNull(consumer.receive(1000)); - - consumer.close(); - - Thread.sleep(2000); - - LOG.info("Pending Queue Size with two receives: {}", sub.getPendingQueueSize()); - - assertEquals(20, sub.getEnqueueCounter()); - assertEquals(2, sub.getDequeueCounter()); - assertEquals(18, sub.getPendingQueueSize()); - assertEquals(20, sub.getDispatchedCounter()); - assertEquals(0, sub.getDispatchedQueueSize()); - - session.close(); - connection.close(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java deleted file mode 100644 index 165d5fd..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.fail; - -import javax.jms.Connection; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4671Test { - - private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4671Test.class); - private static BrokerService brokerService; - private static String BROKER_ADDRESS = "tcp://localhost:0"; - - private String connectionUri; - - @Before - public void setUp() throws Exception { - brokerService = new BrokerService(); - brokerService.setPersistent(false); - brokerService.setUseJmx(true); - brokerService.setDeleteAllMessagesOnStartup(true); - connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); - connectionUri = connectionUri + "?trace=true"; - brokerService.start(); - brokerService.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - } - - @Test - public void testNonDurableSubscriberInvalidUnsubscribe() throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); - - Connection connection = connectionFactory.createConnection(); - connection.setClientID(getClass().getName()); - connection.start(); - - try { - Session ts = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - try { - ts.unsubscribe("invalid-subscription-name"); - fail("this should fail"); - } - catch (javax.jms.InvalidDestinationException e) { - LOG.info("Test caught correct invalid destination exception"); - } - } - finally { - connection.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java deleted file mode 100644 index d7da045..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.FilenameFilter; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.*; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.leveldb.LevelDBStore; -import org.apache.activemq.leveldb.LevelDBStoreViewMBean; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4677Test { - - private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4677Test.class); - private static BrokerService brokerService; - - @Rule - public TestName name = new TestName(); - - private File dataDirFile; - - @Before - public void setUp() throws Exception { - - dataDirFile = new File("target/LevelDBCleanupTest"); - - brokerService = new BrokerService(); - brokerService.setBrokerName("LevelDBBroker"); - brokerService.setPersistent(true); - brokerService.setUseJmx(true); - brokerService.setAdvisorySupport(false); - brokerService.setDeleteAllMessagesOnStartup(true); - brokerService.setDataDirectoryFile(dataDirFile); - - LevelDBStore persistenceFactory = new LevelDBStore(); - persistenceFactory.setDirectory(dataDirFile); - brokerService.setPersistenceAdapter(persistenceFactory); - brokerService.start(); - brokerService.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - } - - @Test - public void testSendAndReceiveAllMessages() throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://LevelDBBroker"); - - Connection connection = connectionFactory.createConnection(); - connection.setClientID(getClass().getName()); - connection.start(); - - final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue(name.toString()); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - final LevelDBStoreViewMBean levelDBView = getLevelDBStoreMBean(); - assertNotNull(levelDBView); - levelDBView.compact(); - - final int SIZE = 6 * 1024 * 5; - final int MSG_COUNT = 60000; - final CountDownLatch done = new CountDownLatch(MSG_COUNT); - - byte buffer[] = new byte[SIZE]; - for (int i = 0; i < SIZE; ++i) { - buffer[i] = (byte) 128; - } - - for (int i = 0; i < MSG_COUNT; ++i) { - BytesMessage message = session.createBytesMessage(); - message.writeBytes(buffer); - producer.send(message); - - if ((i % 1000) == 0) { - LOG.info("Sent message #{}", i); - session.commit(); - } - } - - session.commit(); - - LOG.info("Finished sending all messages."); - - MessageConsumer consumer = session.createConsumer(destination); - consumer.setMessageListener(new MessageListener() { - - @Override - public void onMessage(Message message) { - if ((done.getCount() % 1000) == 0) { - try { - LOG.info("Received message #{}", MSG_COUNT - done.getCount()); - session.commit(); - } - catch (JMSException e) { - } - } - done.countDown(); - } - }); - - done.await(15, TimeUnit.MINUTES); - session.commit(); - LOG.info("Finished receiving all messages."); - - assertTrue("Should < 3 logfiles left.", Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - levelDBView.compact(); - return countLogFiles() < 3; - } - }, TimeUnit.MINUTES.toMillis(5), (int) TimeUnit.SECONDS.toMillis(30))); - - levelDBView.compact(); - LOG.info("Current number of logs {}", countLogFiles()); - } - - protected long countLogFiles() { - String[] logFiles = dataDirFile.list(new FilenameFilter() { - - @Override - public boolean accept(File dir, String name) { - if (name.endsWith("log")) { - return true; - } - return false; - } - }); - - LOG.info("Current number of logs {}", logFiles.length); - return logFiles.length; - } - - protected LevelDBStoreViewMBean getLevelDBStoreMBean() throws Exception { - ObjectName levelDbViewMBeanQuery = new ObjectName("org.apache.activemq:type=Broker,brokerName=LevelDBBroker,service=PersistenceAdapter,instanceName=LevelDB*"); - - Set names = brokerService.getManagementContext().queryNames(null, levelDbViewMBeanQuery); - if (names.isEmpty() || names.size() > 1) { - throw new java.lang.IllegalStateException("Can't find levelDB store name."); - } - - LevelDBStoreViewMBean proxy = (LevelDBStoreViewMBean) brokerService.getManagementContext().newProxyInstance(names.iterator().next(), LevelDBStoreViewMBean.class, true); - return proxy; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java deleted file mode 100644 index ad04d96..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java +++ /dev/null @@ -1,304 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.advisory.AdvisoryBroker; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.SessionId; -import org.apache.activemq.command.SessionInfo; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4853Test { - - private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4853Test.class); - private static BrokerService brokerService; - private static final String BROKER_ADDRESS = "tcp://localhost:0"; - private static final ActiveMQQueue DESTINATION = new ActiveMQQueue("TEST.QUEUE"); - private CountDownLatch cycleDoneLatch; - - private String connectionUri; - - @Before - public void setUp() throws Exception { - brokerService = new BrokerService(); - brokerService.setPersistent(false); - brokerService.setUseJmx(false); - brokerService.setAdvisorySupport(true); - brokerService.setDeleteAllMessagesOnStartup(true); - connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); - - brokerService.start(); - brokerService.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - } - - /** - * Test to shows the performance of the removing consumers while other stay active. - * - * @throws Exception - */ - @Ignore - @Test - public void test() throws Exception { - - // Create a stable set of consumers to fill in the advisory broker's consumer list. - ArrayList fixedConsumers = new ArrayList<>(100); - for (int i = 0; i < 200; ++i) { - fixedConsumers.add(new Consumer()); - } - - // Create a set of consumers that comes online for a short time and then - // goes offline again. Cycles will repeat as each batch completes - final int fixedDelayConsumers = 300; - final int fixedDelayCycles = 25; - - final CountDownLatch fixedDelayCycleLatch = new CountDownLatch(fixedDelayCycles); - - // Update so done method can track state. - cycleDoneLatch = fixedDelayCycleLatch; - - CyclicBarrier barrier = new CyclicBarrier(fixedDelayConsumers, new Runnable() { - @Override - public void run() { - LOG.info("Fixed delay consumers cycle {} completed.", fixedDelayCycleLatch.getCount()); - fixedDelayCycleLatch.countDown(); - } - }); - - for (int i = 0; i < fixedDelayConsumers; ++i) { - new Thread(new FixedDelyConsumer(barrier)).start(); - } - - fixedDelayCycleLatch.await(10, TimeUnit.MINUTES); - - // Clean up. - - for (Consumer consumer : fixedConsumers) { - consumer.close(); - } - fixedConsumers.clear(); - } - - private ConnectionInfo createConnectionInfo() { - ConnectionId id = new ConnectionId(); - id.setValue("ID:123456789:0:1"); - - ConnectionInfo info = new ConnectionInfo(); - info.setConnectionId(id); - return info; - } - - private SessionInfo createSessionInfo(ConnectionInfo connection) { - SessionId id = new SessionId(connection.getConnectionId(), 1); - - SessionInfo info = new SessionInfo(); - info.setSessionId(id); - - return info; - } - - public ConsumerInfo createConsumerInfo(SessionInfo session, int value, ActiveMQDestination destination) { - ConsumerId id = new ConsumerId(); - id.setConnectionId(session.getSessionId().getConnectionId()); - id.setSessionId(1); - id.setValue(value); - - ConsumerInfo info = new ConsumerInfo(); - info.setConsumerId(id); - info.setDestination(destination); - return info; - } - - /** - * Test to shows the performance impact of removing consumers in various scenarios. - * - * @throws Exception - */ - @Ignore - @Test - public void testPerformanceOfRemovals() throws Exception { - // setup - AdvisoryBroker testObj = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); - ActiveMQDestination destination = new ActiveMQQueue("foo"); - ConnectionInfo connectionInfo = createConnectionInfo(); - ConnectionContext connectionContext = new ConnectionContext(connectionInfo); - connectionContext.setBroker(brokerService.getBroker()); - SessionInfo sessionInfo = createSessionInfo(connectionInfo); - - long start = System.currentTimeMillis(); - - for (int i = 0; i < 200; ++i) { - - for (int j = 1; j <= 500; j++) { - ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); - testObj.addConsumer(connectionContext, consumerInfo); - } - - for (int j = 500; j > 0; j--) { - ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); - testObj.removeConsumer(connectionContext, consumerInfo); - } - - for (int j = 1; j <= 500; j++) { - ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); - testObj.addConsumer(connectionContext, consumerInfo); - } - - for (int j = 1; j <= 500; j++) { - ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); - testObj.removeConsumer(connectionContext, consumerInfo); - } - } - - long finish = System.currentTimeMillis(); - - long totalTime = finish - start; - - LOG.info("Total test time: {} seconds", TimeUnit.MILLISECONDS.toSeconds(totalTime)); - - assertEquals(0, testObj.getAdvisoryConsumers().size()); - } - - @Test - public void testEqualsNeeded() throws Exception { - // setup - AdvisoryBroker testObj = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); - ActiveMQDestination destination = new ActiveMQQueue("foo"); - ConnectionInfo connectionInfo = createConnectionInfo(); - ConnectionContext connectionContext = new ConnectionContext(connectionInfo); - connectionContext.setBroker(brokerService.getBroker()); - SessionInfo sessionInfo = createSessionInfo(connectionInfo); - - for (int j = 1; j <= 5; j++) { - ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); - testObj.addConsumer(connectionContext, consumerInfo); - } - - for (int j = 1; j <= 5; j++) { - ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); - testObj.removeConsumer(connectionContext, consumerInfo); - } - - assertEquals(0, testObj.getAdvisoryConsumers().size()); - } - - private boolean done() { - if (cycleDoneLatch == null) { - return true; - } - return cycleDoneLatch.getCount() == 0; - } - - class Consumer implements MessageListener { - - Connection connection; - Session session; - Destination destination; - MessageConsumer consumer; - - Consumer() throws JMSException { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - connection = factory.createConnection(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer = session.createConsumer(DESTINATION); - consumer.setMessageListener(this); - connection.start(); - } - - @Override - public void onMessage(Message message) { - } - - public void close() { - try { - connection.close(); - } - catch (Exception e) { - } - - connection = null; - session = null; - consumer = null; - } - } - - class FixedDelyConsumer implements Runnable { - - private final CyclicBarrier barrier; - private final int sleepInterval; - - public FixedDelyConsumer(CyclicBarrier barrier) { - this.barrier = barrier; - this.sleepInterval = 1000; - } - - public FixedDelyConsumer(CyclicBarrier barrier, int sleepInterval) { - this.barrier = barrier; - this.sleepInterval = sleepInterval; - } - - @Override - public void run() { - while (!done()) { - - try { - Consumer consumer = new Consumer(); - TimeUnit.MILLISECONDS.sleep(sleepInterval); - consumer.close(); - barrier.await(); - } - catch (Exception ex) { - return; - } - } - } - } - -}