Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6DA8419347 for ; Wed, 16 Mar 2016 15:21:32 +0000 (UTC) Received: (qmail 20807 invoked by uid 500); 16 Mar 2016 15:21:32 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 20712 invoked by uid 500); 16 Mar 2016 15:21:32 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 20516 invoked by uid 99); 16 Mar 2016 15:21:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Mar 2016 15:21:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E6E4CDFD40; Wed, 16 Mar 2016 15:21:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 16 Mar 2016 15:21:47 -0000 Message-Id: <1d35f8af000d4211b117267368cdfc51@git.apache.org> In-Reply-To: <407517e763c6496895a78ae408224406@git.apache.org> References: <407517e763c6496895a78ae408224406@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/61] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java deleted file mode 100644 index 3504c1f..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java +++ /dev/null @@ -1,357 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.disk.journal.DataFile; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4212Test { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ4212Test.class); - - private BrokerService service; - private String connectionUri; - private ActiveMQConnectionFactory cf; - - private final int MSG_COUNT = 256; - - @Before - public void setUp() throws IOException, Exception { - createBroker(true, false); - } - - public void createBroker(boolean deleteAllMessages, boolean recover) throws Exception { - service = new BrokerService(); - service.setBrokerName("InactiveSubTest"); - service.setDeleteAllMessagesOnStartup(deleteAllMessages); - service.setAdvisorySupport(false); - service.setPersistent(true); - service.setUseJmx(true); - service.setKeepDurableSubsActive(false); - - KahaDBPersistenceAdapter pa = new KahaDBPersistenceAdapter(); - File dataFile = new File("KahaDB"); - pa.setDirectory(dataFile); - pa.setJournalMaxFileLength(10 * 1024); - pa.setCheckpointInterval(TimeUnit.SECONDS.toMillis(5)); - pa.setCleanupInterval(TimeUnit.SECONDS.toMillis(5)); - pa.setForceRecoverIndex(recover); - - service.setPersistenceAdapter(pa); - service.start(); - service.waitUntilStarted(); - - connectionUri = "vm://InactiveSubTest?create=false"; - cf = new ActiveMQConnectionFactory(connectionUri); - } - - private void restartBroker() throws Exception { - stopBroker(); - createBroker(false, false); - } - - private void recoverBroker() throws Exception { - stopBroker(); - createBroker(false, true); - } - - @After - public void stopBroker() throws Exception { - if (service != null) { - service.stop(); - service.waitUntilStopped(); - service = null; - } - } - - @Test - public void testDirableSubPrefetchRecovered() throws Exception { - - ActiveMQQueue queue = new ActiveMQQueue("MyQueue"); - ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic"); - - // Send to a Queue to create some journal files - sendMessages(queue); - - LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); - - createInactiveDurableSub(topic); - - assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); - return subs != null && subs.length == 1 ? true : false; - } - })); - - // Now send some more to the queue to create even more files. - sendMessages(queue); - - LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); - assertTrue(getNumberOfJournalFiles() > 1); - - LOG.info("Restarting the broker."); - restartBroker(); - LOG.info("Restarted the broker."); - - LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); - assertTrue(getNumberOfJournalFiles() > 1); - - assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); - return subs != null && subs.length == 1 ? true : false; - } - })); - - // Clear out all queue data - service.getAdminView().removeQueue(queue.getQueueName()); - - assertTrue("Less than two journal files expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return getNumberOfJournalFiles() <= 2; - } - }, TimeUnit.MINUTES.toMillis(2))); - - LOG.info("Sending {} Messages to the Topic.", MSG_COUNT); - // Send some messages to the inactive destination - sendMessages(topic); - - LOG.info("Attempt to consume {} messages from the Topic.", MSG_COUNT); - assertEquals(MSG_COUNT, consumeFromInactiveDurableSub(topic)); - - LOG.info("Recovering the broker."); - recoverBroker(); - LOG.info("Recovering the broker."); - - assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); - return subs != null && subs.length == 1 ? true : false; - } - })); - } - - @Test - public void testDurableAcksNotDropped() throws Exception { - - ActiveMQQueue queue = new ActiveMQQueue("MyQueue"); - ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic"); - - // Create durable sub in first data file. - createInactiveDurableSub(topic); - - assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); - return subs != null && subs.length == 1 ? true : false; - } - })); - - // Send to a Topic - sendMessages(topic, 1); - - // Send to a Queue to create some journal files - sendMessages(queue); - - LOG.info("Before consume there are currently [{}] journal log files.", getNumberOfJournalFiles()); - - // Consume all the Messages leaving acks behind. - consumeDurableMessages(topic, 1); - - LOG.info("After consume there are currently [{}] journal log files.", getNumberOfJournalFiles()); - - // Now send some more to the queue to create even more files. - sendMessages(queue); - - LOG.info("More Queued. There are currently [{}] journal log files.", getNumberOfJournalFiles()); - assertTrue(getNumberOfJournalFiles() > 1); - - LOG.info("Restarting the broker."); - restartBroker(); - LOG.info("Restarted the broker."); - - LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); - assertTrue(getNumberOfJournalFiles() > 1); - - assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); - return subs != null && subs.length == 1 ? true : false; - } - })); - - // Clear out all queue data - service.getAdminView().removeQueue(queue.getQueueName()); - - assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return getNumberOfJournalFiles() <= 3; - } - }, TimeUnit.MINUTES.toMillis(3))); - - // See if we receive any message they should all be acked. - tryConsumeExpectNone(topic); - - LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); - - LOG.info("Recovering the broker."); - recoverBroker(); - LOG.info("Recovering the broker."); - - LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); - - assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); - return subs != null && subs.length == 1 ? true : false; - } - })); - - // See if we receive any message they should all be acked. - tryConsumeExpectNone(topic); - - assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return getNumberOfJournalFiles() == 1; - } - }, TimeUnit.MINUTES.toMillis(1))); - } - - private int getNumberOfJournalFiles() throws IOException { - Collection files = ((KahaDBPersistenceAdapter) service.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); - int reality = 0; - for (DataFile file : files) { - if (file != null) { - reality++; - } - } - - return reality; - } - - private void createInactiveDurableSub(Topic topic) throws Exception { - Connection connection = cf.createConnection(); - connection.setClientID("Inactive"); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive"); - consumer.close(); - connection.close(); - } - - private void consumeDurableMessages(Topic topic, int count) throws Exception { - Connection connection = cf.createConnection(); - connection.setClientID("Inactive"); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive"); - connection.start(); - for (int i = 0; i < count; ++i) { - if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) == null) { - fail("should have received a message"); - } - } - consumer.close(); - connection.close(); - } - - private void tryConsumeExpectNone(Topic topic) throws Exception { - Connection connection = cf.createConnection(); - connection.setClientID("Inactive"); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive"); - connection.start(); - if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) != null) { - fail("Should be no messages for this durable."); - } - consumer.close(); - connection.close(); - } - - private int consumeFromInactiveDurableSub(Topic topic) throws Exception { - Connection connection = cf.createConnection(); - connection.setClientID("Inactive"); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive"); - - int count = 0; - - while (consumer.receive(10000) != null) { - count++; - } - - consumer.close(); - connection.close(); - - return count; - } - - private void sendMessages(Destination destination) throws Exception { - sendMessages(destination, MSG_COUNT); - } - - private void sendMessages(Destination destination, int count) throws Exception { - Connection connection = cf.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - for (int i = 0; i < count; ++i) { - TextMessage message = session.createTextMessage("Message #" + i + " for destination: " + destination); - producer.send(message); - } - connection.close(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java deleted file mode 100644 index c033e97..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.fail; - -import javax.jms.JMSException; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerPluginSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ProducerInfo; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ4213Test { - - private static BrokerService brokerService; - private static String BROKER_ADDRESS = "tcp://localhost:0"; - private static String TEST_QUEUE = "testQueue"; - private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE); - - private String connectionUri; - - @SuppressWarnings("unchecked") - @Before - public void setUp() throws Exception { - brokerService = new BrokerService(); - brokerService.setPersistent(false); - brokerService.setUseJmx(true); - brokerService.setDeleteAllMessagesOnStartup(true); - connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); - - brokerService.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { - - @Override - public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { - throw new javax.jms.JMSSecurityException(connectionUri); - } - }}); - - brokerService.start(); - brokerService.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - } - - @Test - public void testExceptionOnProducerCreateThrows() throws Exception { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); - - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - connection.start(); - - try { - session.createProducer(queue); - fail("Should not be able to create this producer."); - } - catch (JMSException ex) { - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java deleted file mode 100644 index 7433b18..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4220Test { - - static final Logger LOG = LoggerFactory.getLogger(AMQ4220Test.class); - private final static int maxFileLength = 1024 * 1024 * 32; - private final static String destinationName = "TEST.QUEUE"; - BrokerService broker; - - @Before - public void setUp() throws Exception { - prepareBrokerWithMultiStore(true); - broker.start(); - broker.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - broker.stop(); - } - - protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception { - BrokerService broker = new BrokerService(); - broker.setUseJmx(true); - broker.setBrokerName("localhost"); - broker.setPersistenceAdapter(kaha); - return broker; - } - - @Test - public void testRestartAfterQueueDelete() throws Exception { - - // Ensure we have an Admin View. - assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return (broker.getAdminView()) != null; - } - })); - - LOG.info("Adding initial destination: {}", destinationName); - - broker.getAdminView().addQueue(destinationName); - - assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName))); - - LOG.info("Removing initial destination: {}", destinationName); - - broker.getAdminView().removeQueue(destinationName); - - LOG.info("Adding back destination: {}", destinationName); - - broker.getAdminView().addQueue(destinationName); - - assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName))); - } - - protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException { - KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); - kaha.setJournalMaxFileLength(maxFileLength); - kaha.setCleanupInterval(5000); - if (delete) { - kaha.deleteAllMessages(); - } - return kaha; - } - - public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception { - - MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter(); - if (deleteAllMessages) { - multiKahaDBPersistenceAdapter.deleteAllMessages(); - } - ArrayList adapters = new ArrayList<>(); - - FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter(); - template.setPersistenceAdapter(createStore(deleteAllMessages)); - template.setPerDestination(true); - adapters.add(template); - - multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters); - broker = createBroker(multiKahaDBPersistenceAdapter); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java deleted file mode 100644 index 0e9c488..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.HashSet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.Test; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQPrefetchPolicy; -import org.apache.activemq.TestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.broker.region.DestinationStatistics; -import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.kahadb.plist.PListStoreImpl; -import org.apache.activemq.util.DefaultTestAppender; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; -import org.apache.log4j.spi.LoggingEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4221Test extends TestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ4221Test.class); - public int PAYLOAD_SIZE_BYTES = 4 * 1024; - public int NUM_TO_SEND = 60000; - public int NUM_CONCURRENT_PRODUCERS = 20; - public int QUEUE_COUNT = 1; - public int TMP_JOURNAL_MAX_FILE_SIZE = 10 * 1024 * 1024; - - public int DLQ_PURGE_INTERVAL = 30000; - - public int MESSAGE_TIME_TO_LIVE = 20000; - public int EXPIRE_SWEEP_PERIOD = 200; - public int TMP_JOURNAL_GC_PERIOD = 50; - public int RECEIVE_POLL_PERIOD = 4000; - private int RECEIVE_BATCH = 5000; - - final byte[] payload = new byte[PAYLOAD_SIZE_BYTES]; - final AtomicInteger counter = new AtomicInteger(0); - final HashSet exceptions = new HashSet<>(); - BrokerService brokerService; - private String brokerUrlString; - ExecutorService executorService = Executors.newCachedThreadPool(); - final AtomicBoolean done = new AtomicBoolean(false); - - public static Test suite() { - return suite(AMQ4221Test.class); - } - - @Override - public void setUp() throws Exception { - - LogManager.getRootLogger().addAppender(new DefaultTestAppender() { - - @Override - public void doAppend(LoggingEvent event) { - if (event.getLevel().isGreaterOrEqual(Level.ERROR)) { - System.err.println("exit on error: " + event.getMessage()); - done.set(true); - new Thread() { - @Override - public void run() { - System.exit(787); - } - }.start(); - } - } - }); - - done.set(false); - brokerService = new BrokerService(); - brokerService.setDeleteAllMessagesOnStartup(true); - brokerService.setDestinations(new ActiveMQDestination[]{new ActiveMQQueue("ActiveMQ.DLQ")}); - - PolicyEntry defaultPolicy = new PolicyEntry(); - defaultPolicy.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy()); - defaultPolicy.setExpireMessagesPeriod(EXPIRE_SWEEP_PERIOD); - defaultPolicy.setProducerFlowControl(false); - defaultPolicy.setMemoryLimit(50 * 1024 * 1024); - - brokerService.getSystemUsage().getMemoryUsage().setLimit(50 * 1024 * 1024); - - PolicyMap destinationPolicyMap = new PolicyMap(); - destinationPolicyMap.setDefaultEntry(defaultPolicy); - brokerService.setDestinationPolicy(destinationPolicyMap); - - PListStoreImpl tempDataStore = new PListStoreImpl(); - tempDataStore.setDirectory(brokerService.getTmpDataDirectory()); - tempDataStore.setJournalMaxFileLength(TMP_JOURNAL_MAX_FILE_SIZE); - tempDataStore.setCleanupInterval(TMP_JOURNAL_GC_PERIOD); - tempDataStore.setIndexPageSize(200); - tempDataStore.setIndexEnablePageCaching(false); - - brokerService.setTempDataStore(tempDataStore); - brokerService.setAdvisorySupport(false); - TransportConnector tcp = brokerService.addConnector("tcp://localhost:0"); - brokerService.start(); - brokerUrlString = tcp.getPublishableConnectString(); - } - - @Override - public void tearDown() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - executorService.shutdownNow(); - } - - public void testProduceConsumeExpireHalf() throws Exception { - - final org.apache.activemq.broker.region.Queue dlq = (org.apache.activemq.broker.region.Queue) getDestination(brokerService, new ActiveMQQueue("ActiveMQ.DLQ")); - - if (DLQ_PURGE_INTERVAL > 0) { - executorService.execute(new Runnable() { - @Override - public void run() { - while (!done.get()) { - try { - Thread.sleep(DLQ_PURGE_INTERVAL); - LOG.info("Purge DLQ, current size: " + dlq.getDestinationStatistics().getMessages().getCount()); - dlq.purge(); - } - catch (InterruptedException allDone) { - } - catch (Throwable e) { - e.printStackTrace(); - exceptions.add(e); - } - } - } - }); - - } - - final CountDownLatch latch = new CountDownLatch(QUEUE_COUNT); - for (int i = 0; i < QUEUE_COUNT; i++) { - final int id = i; - executorService.execute(new Runnable() { - @Override - public void run() { - try { - doProduceConsumeExpireHalf(id, latch); - } - catch (Throwable e) { - e.printStackTrace(); - exceptions.add(e); - } - } - }); - } - - while (!done.get()) { - done.set(latch.await(5, TimeUnit.SECONDS)); - } - executorService.shutdown(); - executorService.awaitTermination(5, TimeUnit.MINUTES); - - assertTrue("no exceptions:" + exceptions, exceptions.isEmpty()); - - } - - public void doProduceConsumeExpireHalf(int id, CountDownLatch latch) throws Exception { - - final ActiveMQQueue queue = new ActiveMQQueue("Q" + id); - - final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlString); - ActiveMQPrefetchPolicy prefecthPolicy = new ActiveMQPrefetchPolicy(); - prefecthPolicy.setAll(0); - factory.setPrefetchPolicy(prefecthPolicy); - Connection connection = factory.createConnection(); - connection.start(); - final MessageConsumer consumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue, "on = 'true'"); - - executorService.execute(new Runnable() { - @Override - public void run() { - try { - while (!done.get()) { - Thread.sleep(RECEIVE_POLL_PERIOD); - for (int i = 0; i < RECEIVE_BATCH && !done.get(); i++) { - - Message message = consumer.receive(1000); - if (message != null) { - counter.incrementAndGet(); - if (counter.get() > 0 && counter.get() % 500 == 0) { - LOG.info("received: " + counter.get() + ", " + message.getJMSDestination().toString()); - } - } - } - } - } - catch (JMSException ignored) { - - } - catch (Exception e) { - e.printStackTrace(); - exceptions.add(e); - } - } - }); - - final AtomicInteger accumulator = new AtomicInteger(0); - final CountDownLatch producersDone = new CountDownLatch(NUM_CONCURRENT_PRODUCERS); - - for (int i = 0; i < NUM_CONCURRENT_PRODUCERS; i++) { - executorService.execute(new Runnable() { - @Override - public void run() { - try { - Connection sendConnection = factory.createConnection(); - sendConnection.start(); - Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = sendSession.createProducer(queue); - producer.setTimeToLive(MESSAGE_TIME_TO_LIVE); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - while (accumulator.incrementAndGet() < NUM_TO_SEND && !done.get()) { - BytesMessage message = sendSession.createBytesMessage(); - message.writeBytes(payload); - message.setStringProperty("on", String.valueOf(accumulator.get() % 2 == 0)); - producer.send(message); - - } - producersDone.countDown(); - } - catch (Exception e) { - e.printStackTrace(); - exceptions.add(e); - } - } - }); - } - - producersDone.await(10, TimeUnit.MINUTES); - - final DestinationStatistics view = getDestinationStatistics(brokerService, queue); - LOG.info("total expired so far " + view.getExpired().getCount() + ", " + queue.getQueueName()); - latch.countDown(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java deleted file mode 100644 index adaf0e5..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.lang.reflect.Field; -import java.net.URI; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.TestSupport; -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ProducerBrokerExchange; -import org.apache.activemq.broker.TransportConnection; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.transport.vm.VMTransportFactory; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Christian Posta - */ -public class AMQ4222Test extends TestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ4222Test.class); - - protected BrokerService brokerService; - - @Override - protected void setUp() throws Exception { - super.setUp(); - topic = false; - brokerService = createBroker(); - } - - @Override - protected void tearDown() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - } - - protected BrokerService createBroker() throws Exception { - BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false")); - broker.start(); - broker.waitUntilStarted(); - return broker; - } - - @Override - protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory("vm://localhost"); - } - - public void testTempQueueCleanedUp() throws Exception { - - Destination requestQueue = createDestination(); - - Connection producerConnection = createConnection(); - producerConnection.start(); - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer producer = producerSession.createProducer(requestQueue); - Destination replyTo = producerSession.createTemporaryQueue(); - MessageConsumer producerSessionConsumer = producerSession.createConsumer(replyTo); - - final CountDownLatch countDownLatch = new CountDownLatch(1); - // let's listen to the response on the queue - producerSessionConsumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - if (message instanceof TextMessage) { - LOG.info("You got a message: " + ((TextMessage) message).getText()); - countDownLatch.countDown(); - } - } - catch (JMSException e) { - e.printStackTrace(); - } - } - }); - - producer.send(createRequest(producerSession, replyTo)); - - Connection consumerConnection = createConnection(); - consumerConnection.start(); - Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(requestQueue); - final MessageProducer consumerProducer = consumerSession.createProducer(null); - - consumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - consumerProducer.send(message.getJMSReplyTo(), message); - } - catch (JMSException e) { - LOG.error("error sending a response on the temp queue"); - e.printStackTrace(); - } - } - }); - - countDownLatch.await(2, TimeUnit.SECONDS); - - // producer has not gone away yet... - org.apache.activemq.broker.region.Destination tempDestination = getDestination(brokerService, (ActiveMQDestination) replyTo); - assertNotNull(tempDestination); - - // clean up - producer.close(); - producerSession.close(); - producerConnection.close(); - - // producer has gone away.. so the temp queue should not exist anymore... let's see.. - // producer has not gone away yet... - tempDestination = getDestination(brokerService, (ActiveMQDestination) replyTo); - assertNull(tempDestination); - - // now.. the connection on the broker side for the dude producing to the temp dest will - // still have a reference in his producerBrokerExchange.. this will keep the destination - // from being reclaimed by GC if there is never another send that producer makes... - // let's see if that reference is there... - final TransportConnector connector = VMTransportFactory.CONNECTORS.get("localhost"); - assertNotNull(connector); - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return connector.getConnections().size() == 1; - } - })); - TransportConnection transportConnection = connector.getConnections().get(0); - Map exchanges = getProducerExchangeFromConn(transportConnection); - assertEquals(1, exchanges.size()); - ProducerBrokerExchange exchange = exchanges.values().iterator().next(); - - // so this is the reason for the test... we don't want these exchanges to hold a reference - // to a region destination.. after a send is completed, the destination is not used anymore on - // a producer exchange - assertNull(exchange.getRegionDestination()); - assertNull(exchange.getRegion()); - - } - - @SuppressWarnings("unchecked") - private Map getProducerExchangeFromConn(TransportConnection transportConnection) throws NoSuchFieldException, IllegalAccessException { - Field f = TransportConnection.class.getDeclaredField("producerExchanges"); - f.setAccessible(true); - Map producerExchanges = (Map) f.get(transportConnection); - return producerExchanges; - } - - private Message createRequest(Session session, Destination replyTo) throws JMSException { - Message message = session.createTextMessage("Payload"); - message.setJMSReplyTo(replyTo); - return message; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java deleted file mode 100644 index 415dad6..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.File; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.util.ConsumerThread; -import org.apache.activemq.util.ProducerThread; -import org.apache.activemq.util.Wait; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class AMQ4323Test { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ4323Test.class); - - BrokerService broker = null; - File kahaDbDir = null; - private final Destination destination = new ActiveMQQueue("q"); - final String payload = new String(new byte[1024]); - - protected void startBroker(boolean delete) throws Exception { - broker = new BrokerService(); - - //Start with a clean directory - kahaDbDir = new File(broker.getBrokerDataDirectory(), "KahaDB"); - deleteDir(kahaDbDir); - - broker.setSchedulerSupport(false); - broker.setDeleteAllMessagesOnStartup(delete); - broker.setPersistent(true); - broker.setUseJmx(false); - broker.addConnector("tcp://localhost:0"); - - PolicyMap map = new PolicyMap(); - PolicyEntry entry = new PolicyEntry(); - entry.setUseCache(false); - map.setDefaultEntry(entry); - broker.setDestinationPolicy(map); - - configurePersistence(broker, delete); - - broker.start(); - LOG.info("Starting broker.."); - } - - protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception { - KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); - - // ensure there are a bunch of data files but multiple entries in each - adapter.setJournalMaxFileLength(1024 * 20); - - // speed up the test case, checkpoint and cleanup early and often - adapter.setCheckpointInterval(500); - adapter.setCleanupInterval(500); - - if (!deleteAllOnStart) { - adapter.setForceRecoverIndex(true); - } - - } - - private boolean deleteDir(File dir) { - if (dir.isDirectory()) { - String[] children = dir.list(); - for (int i = 0; i < children.length; i++) { - boolean success = deleteDir(new File(dir, children[i])); - if (!success) { - return false; - } - } - } - - return dir.delete(); - } - - private int getFileCount(File dir) { - if (dir.isDirectory()) { - String[] children = dir.list(); - return children.length; - } - - return 0; - } - - @Test - public void testCleanupOfFiles() throws Exception { - final int messageCount = 500; - startBroker(true); - int fileCount = getFileCount(kahaDbDir); - assertEquals(4, fileCount); - - Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); - connection.start(); - Session producerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Session consumerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - ProducerThread producer = new ProducerThread(producerSess, destination) { - @Override - protected Message createMessage(int i) throws Exception { - return sess.createTextMessage(payload + "::" + i); - } - }; - producer.setMessageCount(messageCount); - ConsumerThread consumer = new ConsumerThread(consumerSess, destination); - consumer.setBreakOnNull(false); - consumer.setMessageCount(messageCount); - - producer.start(); - producer.join(); - - consumer.start(); - consumer.join(); - - assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived()); - - // verify cleanup - assertTrue("gc worked", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - int fileCount = getFileCount(kahaDbDir); - LOG.info("current filecount:" + fileCount); - return 4 == fileCount; - } - })); - - broker.stop(); - broker.waitUntilStopped(); - - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java deleted file mode 100644 index 3d4ec84..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ4356Test { - - private static BrokerService brokerService; - private static String BROKER_ADDRESS = "tcp://localhost:0"; - - private String connectionUri; - private ActiveMQConnectionFactory cf; - private final String CLIENT_ID = "AMQ4356Test"; - private final String SUBSCRIPTION_NAME = "AMQ4356Test"; - - private void createBroker(boolean deleteOnStart) throws Exception { - brokerService = new BrokerService(); - brokerService.setUseJmx(true); - brokerService.setDeleteAllMessagesOnStartup(deleteOnStart); - connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); - brokerService.start(); - brokerService.waitUntilStarted(); - - } - - private void startBroker() throws Exception { - createBroker(true); - } - - private void restartBroker() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - createBroker(false); - } - - @Before - public void setUp() throws Exception { - startBroker(); - cf = new ActiveMQConnectionFactory(connectionUri); - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - } - - @Test - public void testVirtualTopicUnsubDurable() throws Exception { - Connection connection = cf.createConnection(); - connection.setClientID(CLIENT_ID); - connection.start(); - - // create consumer 'cluster' - ActiveMQQueue queue1 = new ActiveMQQueue(getVirtualTopicConsumerName()); - ActiveMQQueue queue2 = new ActiveMQQueue(getVirtualTopicConsumerName()); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer c1 = session.createConsumer(queue1); - c1.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - } - }); - MessageConsumer c2 = session.createConsumer(queue2); - c2.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - } - }); - - ActiveMQTopic topic = new ActiveMQTopic(getVirtualTopicName()); - MessageConsumer c3 = session.createDurableSubscriber(topic, SUBSCRIPTION_NAME); - - assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); - assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); - - c3.close(); - - // create topic producer - MessageProducer producer = session.createProducer(topic); - assertNotNull(producer); - - int total = 10; - for (int i = 0; i < total; i++) { - producer.send(session.createTextMessage("message: " + i)); - } - - assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length); - assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); - - session.unsubscribe(SUBSCRIPTION_NAME); - connection.close(); - - assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length); - assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); - - restartBroker(); - - assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length); - assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); - } - - protected String getVirtualTopicName() { - return "VirtualTopic.TEST"; - } - - protected String getVirtualTopicConsumerName() { - return "Consumer.A.VirtualTopic.TEST"; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java deleted file mode 100644 index 27c4f64..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - -import java.util.Random; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; -import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; -import org.apache.activemq.command.ActiveMQDestination; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4361Test { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ4361Test.class); - - private BrokerService service; - private String brokerUrlString; - - @Before - public void setUp() throws Exception { - service = new BrokerService(); - service.setDeleteAllMessagesOnStartup(true); - service.setUseJmx(false); - - PolicyMap policyMap = new PolicyMap(); - PolicyEntry policy = new PolicyEntry(); - policy.setMemoryLimit(1); - policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy()); - policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); - policy.setProducerFlowControl(true); - policyMap.setDefaultEntry(policy); - service.setDestinationPolicy(policyMap); - - service.setAdvisorySupport(false); - brokerUrlString = service.addConnector("tcp://localhost:0").getPublishableConnectString(); - service.start(); - service.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - if (service != null) { - service.stop(); - service.waitUntilStopped(); - } - } - - @Test - public void testCloseWhenHunk() throws Exception { - - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrlString); - connectionFactory.setProducerWindowSize(1024); - - // TINY QUEUE is flow controlled after 1024 bytes - final ActiveMQDestination destination = ActiveMQDestination.createDestination("queue://TINY_QUEUE", (byte) 0xff); - - Connection connection = connectionFactory.createConnection(); - connection.start(); - final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageProducer producer = session.createProducer(destination); - producer.setTimeToLive(0); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - final AtomicReference publishException = new AtomicReference<>(null); - final AtomicReference closeException = new AtomicReference<>(null); - final AtomicLong lastLoop = new AtomicLong(System.currentTimeMillis() + 100); - - Thread pubThread = new Thread(new Runnable() { - @Override - public void run() { - try { - byte[] data = new byte[1000]; - new Random(0xdeadbeef).nextBytes(data); - for (int i = 0; i < 10000; i++) { - lastLoop.set(System.currentTimeMillis()); - ObjectMessage objMsg = session.createObjectMessage(); - objMsg.setObject(data); - producer.send(destination, objMsg); - } - } - catch (Exception e) { - publishException.set(e); - } - } - }, "PublishingThread"); - pubThread.start(); - - // wait for publisher to deadlock - while (System.currentTimeMillis() - lastLoop.get() < 2000) { - Thread.sleep(100); - } - LOG.info("Publisher deadlock detected."); - - Thread closeThread = new Thread(new Runnable() { - @Override - public void run() { - try { - LOG.info("Attempting close.."); - producer.close(); - } - catch (Exception e) { - closeException.set(e); - } - } - }, "ClosingThread"); - closeThread.start(); - - try { - closeThread.join(30000); - } - catch (InterruptedException ie) { - assertFalse("Closing thread didn't complete in 10 seconds", true); - } - - try { - pubThread.join(30000); - } - catch (InterruptedException ie) { - assertFalse("Publishing thread didn't complete in 10 seconds", true); - } - - assertNull(closeException.get()); - assertNotNull(publishException.get()); - } -} - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java deleted file mode 100644 index ef53a0a..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java +++ /dev/null @@ -1,256 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4368Test { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ4368Test.class); - - private BrokerService broker; - private ActiveMQConnectionFactory connectionFactory; - private final Destination destination = new ActiveMQQueue("large_message_queue"); - private String connectionUri; - - @Before - public void setUp() throws Exception { - broker = createBroker(); - connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); - broker.start(); - connectionFactory = new ActiveMQConnectionFactory(connectionUri); - } - - @After - public void tearDown() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - } - - protected BrokerService createBroker() throws Exception { - BrokerService broker = new BrokerService(); - - PolicyEntry policy = new PolicyEntry(); - policy.setUseCache(false); - broker.setDestinationPolicy(new PolicyMap()); - broker.getDestinationPolicy().setDefaultEntry(policy); - - KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter(); - kahadb.setCheckForCorruptJournalFiles(true); - kahadb.setCleanupInterval(1000); - - kahadb.deleteAllMessages(); - broker.setPersistenceAdapter(kahadb); - broker.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 100); - broker.setUseJmx(false); - - return broker; - } - - abstract class Client implements Runnable { - - private final String name; - final AtomicBoolean done = new AtomicBoolean(); - CountDownLatch startedLatch; - CountDownLatch doneLatch = new CountDownLatch(1); - Connection connection; - Session session; - final AtomicLong size = new AtomicLong(); - - Client(String name, CountDownLatch startedLatch) { - this.name = name; - this.startedLatch = startedLatch; - } - - public void start() { - LOG.info("Starting: " + name); - new Thread(this, name).start(); - } - - public void stopAsync() { - done.set(true); - } - - public void stop() throws InterruptedException { - stopAsync(); - if (!doneLatch.await(20, TimeUnit.MILLISECONDS)) { - try { - connection.close(); - doneLatch.await(); - } - catch (Exception e) { - } - } - } - - @Override - public void run() { - try { - connection = createConnection(); - connection.start(); - try { - session = createSession(); - work(); - } - finally { - try { - connection.close(); - } - catch (JMSException ignore) { - } - LOG.info("Stopped: " + name); - } - } - catch (Exception e) { - e.printStackTrace(); - done.set(true); - } - finally { - doneLatch.countDown(); - } - } - - protected Session createSession() throws JMSException { - return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - protected Connection createConnection() throws JMSException { - return connectionFactory.createConnection(); - } - - abstract protected void work() throws Exception; - } - - class ProducingClient extends Client { - - ProducingClient(String name, CountDownLatch startedLatch) { - super(name, startedLatch); - } - - private String createMessage() { - StringBuffer stringBuffer = new StringBuffer(); - for (long i = 0; i < 1000000; i++) { - stringBuffer.append("1234567890"); - } - return stringBuffer.toString(); - } - - @Override - protected void work() throws Exception { - String data = createMessage(); - MessageProducer producer = session.createProducer(destination); - startedLatch.countDown(); - while (!done.get()) { - producer.send(session.createTextMessage(data)); - long i = size.incrementAndGet(); - if ((i % 1000) == 0) { - LOG.info("produced " + i + "."); - } - } - } - } - - class ConsumingClient extends Client { - - public ConsumingClient(String name, CountDownLatch startedLatch) { - super(name, startedLatch); - } - - @Override - protected void work() throws Exception { - MessageConsumer consumer = session.createConsumer(destination); - startedLatch.countDown(); - while (!done.get()) { - Message msg = consumer.receive(100); - if (msg != null) { - size.incrementAndGet(); - } - } - } - } - - @Test - public void testENTMQ220() throws Exception { - LOG.info("Start test."); - CountDownLatch producer1Started = new CountDownLatch(1); - CountDownLatch producer2Started = new CountDownLatch(1); - CountDownLatch listener1Started = new CountDownLatch(1); - - final ProducingClient producer1 = new ProducingClient("1", producer1Started); - final ProducingClient producer2 = new ProducingClient("2", producer2Started); - final ConsumingClient listener1 = new ConsumingClient("subscriber-1", listener1Started); - final AtomicLong lastSize = new AtomicLong(); - - try { - - producer1.start(); - producer2.start(); - listener1.start(); - - producer1Started.await(15, TimeUnit.SECONDS); - producer2Started.await(15, TimeUnit.SECONDS); - listener1Started.await(15, TimeUnit.SECONDS); - - lastSize.set(listener1.size.get()); - for (int i = 0; i < 10; i++) { - Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return listener1.size.get() > lastSize.get(); - } - }); - long size = listener1.size.get(); - LOG.info("Listener 1: consumed: " + (size - lastSize.get())); - assertTrue("No messages received on iteration: " + i, size > lastSize.get()); - lastSize.set(size); - } - } - finally { - LOG.info("Stopping clients"); - producer1.stop(); - producer2.stop(); - listener1.stop(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java deleted file mode 100644 index 38a9398..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4407Test { - - static final Logger LOG = LoggerFactory.getLogger(AMQ4407Test.class); - private final static int maxFileLength = 1024 * 1024 * 32; - - private final static String PREFIX_DESTINATION_NAME = "queue"; - - private final static String DESTINATION_NAME = PREFIX_DESTINATION_NAME + ".test"; - private final static String DESTINATION_NAME_2 = PREFIX_DESTINATION_NAME + "2.test"; - private final static String DESTINATION_NAME_3 = PREFIX_DESTINATION_NAME + "3.test"; - - BrokerService broker; - - @Before - public void setUp() throws Exception { - prepareBrokerWithMultiStore(true); - broker.start(); - broker.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - broker.stop(); - } - - protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception { - BrokerService broker = new BrokerService(); - broker.setUseJmx(true); - broker.setBrokerName("localhost"); - broker.setPersistenceAdapter(kaha); - return broker; - } - - @Test - public void testRestartAfterQueueDelete() throws Exception { - - // Ensure we have an Admin View. - assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return (broker.getAdminView()) != null; - } - })); - - LOG.info("Adding destinations: {}, {}, {}", new Object[]{DESTINATION_NAME, DESTINATION_NAME_3, DESTINATION_NAME_3}); - sendMessage(DESTINATION_NAME, "test 1"); - sendMessage(DESTINATION_NAME_2, "test 1"); - sendMessage(DESTINATION_NAME_3, "test 1"); - - assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME))); - assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2))); - assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_3))); - - LOG.info("Removing destination: {}", DESTINATION_NAME_2); - broker.getAdminView().removeQueue(DESTINATION_NAME_2); - - LOG.info("Recreating destination: {}", DESTINATION_NAME_2); - sendMessage(DESTINATION_NAME_2, "test 1"); - - Destination destination2 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)); - assertNotNull(destination2); - assertEquals(1, destination2.getMessageStore().getMessageCount()); - } - - protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException { - KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); - kaha.setJournalMaxFileLength(maxFileLength); - kaha.setCleanupInterval(5000); - if (delete) { - kaha.deleteAllMessages(); - } - return kaha; - } - - public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception { - - MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter(); - if (deleteAllMessages) { - multiKahaDBPersistenceAdapter.deleteAllMessages(); - } - ArrayList adapters = new ArrayList<>(); - - adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME, deleteAllMessages)); - adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME + "2", deleteAllMessages)); - adapters.add(createFilteredKahaDBByDestinationPrefix(null, deleteAllMessages)); - - multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters); - broker = createBroker(multiKahaDBPersistenceAdapter); - } - - /** - * Create filtered KahaDB adapter by destination prefix. - * - * @param destinationPrefix - * @param deleteAllMessages - * @return - * @throws IOException - */ - private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix, - boolean deleteAllMessages) throws IOException { - FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter(); - template.setPersistenceAdapter(createStore(deleteAllMessages)); - if (destinationPrefix != null) { - template.setQueue(destinationPrefix + ".>"); - } - return template; - } - - /** - * Send message to particular destination. - * - * @param destinationName - * @param message - * @throws JMSException - */ - private void sendMessage(String destinationName, String message) throws JMSException { - ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost"); - f.setAlwaysSyncSend(true); - Connection c = f.createConnection(); - c.start(); - Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = s.createProducer(new ActiveMQQueue(destinationName)); - producer.send(s.createTextMessage(message)); - producer.close(); - s.close(); - c.stop(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java deleted file mode 100644 index cd3ed95..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java +++ /dev/null @@ -1,246 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class AMQ4413Test { - - static final Logger LOG = LoggerFactory.getLogger(AMQ4413Test.class); - - final String brokerUrl = "tcp://localhost:0"; - private String connectionUri; - final int numMsgsTriggeringReconnection = 2; - final int numMsgs = 30; - final int numTests = 75; - final ExecutorService threadPool = Executors.newCachedThreadPool(); - - @Test - public void testDurableSubMessageLoss() throws Exception { - // start embedded broker - BrokerService brokerService = new BrokerService(); - connectionUri = brokerService.addConnector(brokerUrl).getPublishableConnectString(); - brokerService.setPersistent(false); - brokerService.setUseJmx(false); - brokerService.setKeepDurableSubsActive(true); - brokerService.setAdvisorySupport(false); - brokerService.start(); - LOG.info("##### broker started"); - - // repeat test 50 times - try { - for (int i = 0; i < numTests; ++i) { - LOG.info("##### test " + i + " started"); - test(); - } - - LOG.info("##### tests are done"); - } - catch (Exception e) { - e.printStackTrace(); - LOG.info("##### tests failed!"); - } - finally { - threadPool.shutdown(); - brokerService.stop(); - LOG.info("##### broker stopped"); - } - } - - private void test() throws Exception { - - final String topicName = "topic-" + UUID.randomUUID(); - final String clientId = "client-" + UUID.randomUUID(); - final String subName = "sub-" + UUID.randomUUID(); - - // create (and only create) subscription first - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - factory.setWatchTopicAdvisories(false); - Connection connection = factory.createConnection(); - connection.setClientID(clientId); - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic(topicName); - TopicSubscriber durableSubscriptionCreator = session.createDurableSubscriber(topic, subName); - - connection.stop(); - durableSubscriptionCreator.close(); - session.close(); - connection.close(); - - // publisher task - Callable publisher = new Callable() { - @Override - public Boolean call() throws Exception { - Connection connection = null; - - try { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - factory.setWatchTopicAdvisories(false); - connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic(topicName); - - MessageProducer producer = session.createProducer(topic); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - producer.setPriority(Message.DEFAULT_PRIORITY); - producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE); - - for (int seq = 1; seq <= numMsgs; ++seq) { - TextMessage msg = session.createTextMessage(String.valueOf(seq)); - producer.send(msg); - LOG.info("pub sent msg: " + seq); - Thread.sleep(1L); - } - - LOG.info("pub is done"); - } - finally { - if (connection != null) { - try { - connection.close(); - } - catch (JMSException e) { - e.printStackTrace(); - } - } - } - return Boolean.TRUE; - } - }; - - // subscriber task - Callable durableSubscriber = new Callable() { - ActiveMQConnectionFactory factory; - Connection connection; - Session session; - Topic topic; - TopicSubscriber consumer; - - @Override - public Boolean call() throws Exception { - factory = new ActiveMQConnectionFactory(connectionUri); - factory.setWatchTopicAdvisories(false); - - try { - connect(); - - for (int seqExpected = 1; seqExpected <= numMsgs; ++seqExpected) { - TextMessage msg = (TextMessage) consumer.receive(3000L); - if (msg == null) { - LOG.info("expected: " + seqExpected + ", actual: timed out", msg); - return Boolean.FALSE; - } - - int seq = Integer.parseInt(msg.getText()); - - LOG.info("sub received msg: " + seq); - - if (seqExpected != seq) { - LOG.info("expected: " + seqExpected + ", actual: " + seq); - return Boolean.FALSE; - } - - if (seq % numMsgsTriggeringReconnection == 0) { - close(false); - connect(); - - LOG.info("sub reconnected"); - } - } - - LOG.info("sub is done"); - } - finally { - try { - close(true); - } - catch (Exception e) { - e.printStackTrace(); - } - } - - return Boolean.TRUE; - } - - void connect() throws Exception { - connection = factory.createConnection(); - connection.setClientID(clientId); - connection.start(); - - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - topic = session.createTopic(topicName); - consumer = session.createDurableSubscriber(topic, subName); - } - - void close(boolean unsubscribe) throws Exception { - if (connection != null) { - connection.stop(); - } - - if (consumer != null) { - consumer.close(); - } - - if (session != null) { - if (unsubscribe) { - session.unsubscribe(subName); - } - session.close(); - } - - if (connection != null) { - connection.close(); - } - } - }; - - ArrayList> results = new ArrayList<>(); - results.add(threadPool.submit(publisher)); - results.add(threadPool.submit(durableSubscriber)); - - for (Future result : results) { - assertTrue(result.get()); - } - } -}