Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 836EB18808 for ; Tue, 23 Feb 2016 19:39:00 +0000 (UTC) Received: (qmail 46309 invoked by uid 500); 23 Feb 2016 19:38:59 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 46115 invoked by uid 500); 23 Feb 2016 19:38:59 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 43580 invoked by uid 99); 23 Feb 2016 19:38:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Feb 2016 19:38:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C6D7EE8E84; Tue, 23 Feb 2016 19:38:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 23 Feb 2016 19:39:26 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [30/44] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java deleted file mode 100644 index 9d79a8e..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import junit.framework.Test; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQPrefetchPolicy; -import org.apache.activemq.TestSupport; -import org.apache.activemq.broker.BrokerService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicSession; - -public class AMQ2580Test extends TestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ2580Test.class); - - private static final String TOPIC_NAME = "topicName"; - private static final String CLIENT_ID = "client_id"; - private static final String textOfSelectedMsg = "good_message"; - - protected TopicConnection connection; - - private Topic topic; - private Session session; - private MessageProducer producer; - private ConnectionFactory connectionFactory; - private BrokerService service; - - public static Test suite() { - return suite(AMQ2580Test.class); - } - - @Override - protected void setUp() throws Exception { - super.setUp(); - initDurableBroker(); - initConnectionFactory(); - initTopic(); - } - - @Override - protected void tearDown() throws Exception { - shutdownClient(); - service.stop(); - super.tearDown(); - } - - private void initConnection() throws JMSException { - if (connection == null) { - LOG.info("Initializing connection"); - - connection = (TopicConnection) connectionFactory.createConnection(); - connection.start(); - } - } - - public void initCombosForTestTopicIsDurableSmokeTest() throws Exception { - addCombinationValues("defaultPersistenceAdapter", PersistenceAdapterChoice.values()); - } - - public void testTopicIsDurableSmokeTest() throws Exception { - - initClient(); - MessageConsumer consumer = createMessageConsumer(); - LOG.info("Consuming message"); - assertNull(consumer.receive(1)); - shutdownClient(); - consumer.close(); - - sendMessages(); - shutdownClient(); - - initClient(); - consumer = createMessageConsumer(); - - LOG.info("Consuming message"); - TextMessage answer1 = (TextMessage) consumer.receive(1000); - assertNotNull("we got our message", answer1); - - consumer.close(); - } - - private MessageConsumer createMessageConsumer() throws JMSException { - LOG.info("creating durable subscriber"); - return session.createDurableSubscriber(topic, TOPIC_NAME, "name='value'", false); - } - - private void initClient() throws JMSException { - LOG.info("Initializing client"); - - initConnection(); - initSession(); - } - - private void shutdownClient() throws JMSException { - LOG.info("Closing session and connection"); - session.close(); - connection.close(); - session = null; - connection = null; - } - - private void sendMessages() throws JMSException { - initConnection(); - - initSession(); - - LOG.info("Creating producer"); - producer = session.createProducer(topic); - - sendMessageThatFailsSelection(); - - sendMessage(textOfSelectedMsg, "value"); - } - - private void initSession() throws JMSException { - LOG.info("Initializing session"); - session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - } - - private void sendMessageThatFailsSelection() throws JMSException { - for (int i = 0; i < 5; i++) { - String textOfNotSelectedMsg = "Msg_" + i; - sendMessage(textOfNotSelectedMsg, "not_value"); - LOG.info("#"); - } - } - - private void sendMessage(String msgText, String propertyValue) throws JMSException { - LOG.info("Creating message: " + msgText); - TextMessage messageToSelect = session.createTextMessage(msgText); - messageToSelect.setStringProperty("name", propertyValue); - LOG.info("Sending message"); - producer.send(messageToSelect); - } - - protected void initConnectionFactory() throws Exception { - ActiveMQConnectionFactory activeMqConnectionFactory = createActiveMqConnectionFactory(); - connectionFactory = activeMqConnectionFactory; - } - - private ActiveMQConnectionFactory createActiveMqConnectionFactory() throws Exception { - ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory("failover:" + service.getTransportConnectors().get(0).getConnectUri().toString()); - activeMqConnectionFactory.setWatchTopicAdvisories(false); - ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); - prefetchPolicy.setDurableTopicPrefetch(2); - prefetchPolicy.setOptimizeDurableTopicPrefetch(2); - activeMqConnectionFactory.setPrefetchPolicy(prefetchPolicy); - activeMqConnectionFactory.setClientID(CLIENT_ID); - return activeMqConnectionFactory; - } - - private void initDurableBroker() throws Exception { - service = new BrokerService(); - setDefaultPersistenceAdapter(service); - service.setDeleteAllMessagesOnStartup(true); - service.setAdvisorySupport(false); - service.setTransportConnectorURIs(new String[]{"tcp://localhost:0"}); - service.setPersistent(true); - service.setUseJmx(false); - service.start(); - - } - - private void initTopic() throws JMSException { - initConnection(); - TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - topic = topicSession.createTopic(TOPIC_NAME); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java deleted file mode 100644 index 3b7a11b..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java +++ /dev/null @@ -1,268 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.File; -import java.io.FilenameFilter; -import java.util.Arrays; -import java.util.Properties; -import java.util.Vector; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TopicSubscriber; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.BrokerView; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.util.IntrospectionSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -// variation on AMQ2584 where the DLQ consumer works in parallel to producer so -// that some dups are not suppressed as they are already acked by the consumer -// the audit needs to be disabled to allow these dupes to be consumed -public class AMQ2584ConcurrentDlqTest extends org.apache.activemq.TestSupport { - - static final Logger LOG = LoggerFactory.getLogger(AMQ2584ConcurrentDlqTest.class); - BrokerService broker = null; - ActiveMQTopic topic; - - ActiveMQConnection consumerConnection = null, producerConnection = null, dlqConnection = null; - Session consumerSession; - Session producerSession; - MessageProducer producer; - Vector duralbeSubs = new Vector<>(); - final int numMessages = 1000; - final int numDurableSubs = 2; - - String data; - private long dlqConsumerLastReceivedTimeStamp; - private AtomicLong dlqReceivedCount = new AtomicLong(0); - - // 2 deliveries of each message, 3 producers - CountDownLatch redeliveryConsumerLatch = new CountDownLatch(((2 * numMessages) * numDurableSubs) - 1); - // should get at least numMessages, possibly more - CountDownLatch dlqConsumerLatch = new CountDownLatch((numMessages - 1)); - - public void testSize() throws Exception { - openConsumer(redeliveryConsumerLatch); - openDlqConsumer(dlqConsumerLatch); - - assertEquals(0, broker.getAdminView().getStorePercentUsage()); - - for (int i = 0; i < numMessages; i++) { - sendMessage(false); - } - - final BrokerView brokerView = broker.getAdminView(); - - broker.getSystemUsage().getStoreUsage().isFull(); - LOG.info("store percent usage: " + brokerView.getStorePercentUsage()); - assertTrue("redelivery consumer got all it needs, remaining: " + redeliveryConsumerLatch.getCount(), redeliveryConsumerLatch.await(60, TimeUnit.SECONDS)); - assertTrue("dql consumer got all it needs", dlqConsumerLatch.await(60, TimeUnit.SECONDS)); - closeConsumer(); - - LOG.info("Giving dlq a chance to clear down once topic consumer is closed"); - - // consumer all of the duplicates that arrived after the first ack - closeDlqConsumer(); - - //get broker a chance to clean obsolete messages, wait 2*cleanupInterval - Thread.sleep(5000); - - FilenameFilter justLogFiles = new FilenameFilter() { - @Override - public boolean accept(File file, String s) { - return s.endsWith(".log"); - } - }; - int numFiles = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getDirectory().list(justLogFiles).length; - if (numFiles > 2) { - LOG.info(Arrays.toString(((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getDirectory().list(justLogFiles))); - } - LOG.info("num files: " + numFiles); - assertEquals("kahaDB dir should contain 1 db file,is: " + numFiles, 1, numFiles); - } - - private void openConsumer(final CountDownLatch latch) throws Exception { - consumerConnection = (ActiveMQConnection) createConnection(); - consumerConnection.setClientID("cliID"); - consumerConnection.start(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageListener listener = new MessageListener() { - @Override - public void onMessage(Message message) { - latch.countDown(); - try { - consumerSession.recover(); - } - catch (Exception ignored) { - ignored.printStackTrace(); - } - } - }; - - for (int i = 1; i <= numDurableSubs; i++) { - TopicSubscriber sub = consumerSession.createDurableSubscriber(topic, "subName" + i); - sub.setMessageListener(listener); - duralbeSubs.add(sub); - } - } - - private void openDlqConsumer(final CountDownLatch received) throws Exception { - - dlqConnection = (ActiveMQConnection) createConnection(); - Session dlqSession = dlqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); - dlqConsumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - if (received.getCount() > 0 && received.getCount() % 200 == 0) { - LOG.info("remaining on DLQ: " + received.getCount()); - } - received.countDown(); - dlqConsumerLastReceivedTimeStamp = System.currentTimeMillis(); - dlqReceivedCount.incrementAndGet(); - } - }); - dlqConnection.start(); - } - - private void closeConsumer() throws JMSException { - for (TopicSubscriber sub : duralbeSubs) { - sub.close(); - } - if (consumerSession != null) { - for (int i = 1; i <= numDurableSubs; i++) { - consumerSession.unsubscribe("subName" + i); - } - } - if (consumerConnection != null) { - consumerConnection.close(); - consumerConnection = null; - } - } - - private void closeDlqConsumer() throws JMSException, InterruptedException { - final long limit = System.currentTimeMillis() + 30 * 1000; - if (dlqConsumerLastReceivedTimeStamp > 0) { - while (System.currentTimeMillis() < dlqConsumerLastReceivedTimeStamp + 5000 && System.currentTimeMillis() < limit) { - LOG.info("waiting for DLQ do drain, receivedCount: " + dlqReceivedCount); - TimeUnit.SECONDS.sleep(1); - } - } - if (dlqConnection != null) { - dlqConnection.close(); - dlqConnection = null; - } - } - - private void sendMessage(boolean filter) throws Exception { - if (producerConnection == null) { - producerConnection = (ActiveMQConnection) createConnection(); - producerConnection.start(); - producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = producerSession.createProducer(topic); - } - - Message message = producerSession.createMessage(); - message.setStringProperty("data", data); - producer.send(message); - } - - private void startBroker(boolean deleteMessages) throws Exception { - broker = new BrokerService(); - broker.setAdvisorySupport(false); - broker.setBrokerName("testStoreSize"); - - PolicyMap map = new PolicyMap(); - PolicyEntry entry = new PolicyEntry(); - entry.setEnableAudit(false); - map.setDefaultEntry(entry); - broker.setDestinationPolicy(map); - - if (deleteMessages) { - broker.setDeleteAllMessagesOnStartup(true); - } - configurePersistenceAdapter(broker.getPersistenceAdapter()); - broker.getSystemUsage().getStoreUsage().setLimit(200 * 1000 * 1000); - broker.start(); - } - - private void configurePersistenceAdapter(PersistenceAdapter persistenceAdapter) { - Properties properties = new Properties(); - String maxFileLengthVal = String.valueOf(2 * 1024 * 1024); - properties.put("journalMaxFileLength", maxFileLengthVal); - properties.put("maxFileLength", maxFileLengthVal); - properties.put("cleanupInterval", "2000"); - properties.put("checkpointInterval", "2000"); - // there are problems with duplicate dispatch in the cursor, which maintain - // a map of messages. A dup dispatch can be dropped. - // see: org.apache.activemq.broker.region.cursors.OrderedPendingList - // Adding duplicate detection to the default DLQ strategy removes the problem - // which means we can leave the default for concurrent store and dispatch q - //properties.put("concurrentStoreAndDispatchQueues", "false"); - - IntrospectionSupport.setProperties(persistenceAdapter, properties); - } - - private void stopBroker() throws Exception { - if (broker != null) - broker.stop(); - broker = null; - } - - @Override - protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory("vm://testStoreSize?jms.watchTopicAdvisories=false&jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0&waitForStart=5000&create=false"); - } - - @Override - protected void setUp() throws Exception { - super.setUp(); - - StringBuilder sb = new StringBuilder(5000); - for (int i = 0; i < 5000; i++) { - sb.append('a'); - } - data = sb.toString(); - - startBroker(true); - topic = (ActiveMQTopic) createDestination(); - } - - @Override - protected void tearDown() throws Exception { - stopBroker(); - super.tearDown(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java deleted file mode 100644 index 14760d9..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java +++ /dev/null @@ -1,233 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.TestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.BrokerView; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@RunWith(value = Parameterized.class) -public class AMQ2584Test extends org.apache.activemq.TestSupport { - - static final Logger LOG = LoggerFactory.getLogger(AMQ2584Test.class); - BrokerService broker = null; - ActiveMQTopic topic; - - ActiveMQConnection consumerConnection = null, producerConnection = null; - Session producerSession; - MessageProducer producer; - final int minPercentUsageForStore = 3; - String data; - - private final TestSupport.PersistenceAdapterChoice persistenceAdapterChoice; - - @Parameterized.Parameters(name = "{0}") - public static Collection getTestParameters() { - TestSupport.PersistenceAdapterChoice[] kahaDb = {TestSupport.PersistenceAdapterChoice.KahaDB}; - TestSupport.PersistenceAdapterChoice[] levelDb = {TestSupport.PersistenceAdapterChoice.LevelDB}; - List choices = new ArrayList<>(); - choices.add(kahaDb); - choices.add(levelDb); - - return choices; - } - - public AMQ2584Test(TestSupport.PersistenceAdapterChoice choice) { - this.persistenceAdapterChoice = choice; - } - - @Test(timeout = 120000) - public void testSize() throws Exception { - int messages = 1000; - CountDownLatch redeliveryConsumerLatch = new CountDownLatch((messages * 3)); - openConsumer(redeliveryConsumerLatch); - - assertEquals(0, broker.getAdminView().getStorePercentUsage()); - - for (int i = 0; i < messages; i++) { - sendMessage(false); - } - - final BrokerView brokerView = broker.getAdminView(); - - broker.getSystemUsage().getStoreUsage().isFull(); - LOG.info("store percent usage: " + brokerView.getStorePercentUsage()); - int storePercentUsage = broker.getAdminView().getStorePercentUsage(); - assertTrue("some store in use", storePercentUsage > minPercentUsageForStore); - - assertTrue("redelivery consumer got all it needs", redeliveryConsumerLatch.await(60, TimeUnit.SECONDS)); - closeConsumer(); - - // consume from DLQ - final CountDownLatch received = new CountDownLatch(messages); - consumerConnection = (ActiveMQConnection) createConnection(); - Session dlqSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); - dlqConsumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - if (received.getCount() % 500 == 0) { - LOG.info("remaining on DLQ: " + received.getCount()); - } - received.countDown(); - } - }); - consumerConnection.start(); - - assertTrue("Not all messages reached the DLQ", received.await(60, TimeUnit.SECONDS)); - - assertTrue("Store usage exceeds expected usage", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - broker.getSystemUsage().getStoreUsage().isFull(); - LOG.info("store precent usage: " + brokerView.getStorePercentUsage()); - return broker.getAdminView().getStorePercentUsage() < minPercentUsageForStore; - } - })); - - closeConsumer(); - - } - - private void openConsumer(final CountDownLatch latch) throws Exception { - consumerConnection = (ActiveMQConnection) createConnection(); - consumerConnection.setClientID("cliID"); - consumerConnection.start(); - final Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageListener listener = new MessageListener() { - @Override - public void onMessage(Message message) { - latch.countDown(); - try { - session.recover(); - } - catch (Exception ignored) { - ignored.printStackTrace(); - } - - } - }; - - session.createDurableSubscriber(topic, "subName1").setMessageListener(listener); - session.createDurableSubscriber(topic, "subName2").setMessageListener(listener); - session.createDurableSubscriber(topic, "subName3").setMessageListener(listener); - } - - private void closeConsumer() throws JMSException { - if (consumerConnection != null) - consumerConnection.close(); - consumerConnection = null; - } - - private void sendMessage(boolean filter) throws Exception { - if (producerConnection == null) { - producerConnection = (ActiveMQConnection) createConnection(); - producerConnection.start(); - producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = producerSession.createProducer(topic); - } - - Message message = producerSession.createMessage(); - message.setStringProperty("data", data); - producer.send(message); - } - - private void startBroker(boolean deleteMessages) throws Exception { - broker = new BrokerService(); - broker.setAdvisorySupport(false); - broker.setBrokerName("testStoreSize"); - - if (deleteMessages) { - broker.setDeleteAllMessagesOnStartup(true); - } - LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString()); - setPersistenceAdapter(broker, persistenceAdapterChoice); - configurePersistenceAdapter(broker.getPersistenceAdapter()); - broker.getSystemUsage().getStoreUsage().setLimit(200 * 1000 * 1000); - broker.start(); - } - - private void configurePersistenceAdapter(PersistenceAdapter persistenceAdapter) { - Properties properties = new Properties(); - String maxFileLengthVal = String.valueOf(1 * 1024 * 1024); - properties.put("journalMaxFileLength", maxFileLengthVal); - properties.put("maxFileLength", maxFileLengthVal); - properties.put("cleanupInterval", "2000"); - properties.put("checkpointInterval", "2000"); - - IntrospectionSupport.setProperties(persistenceAdapter, properties); - } - - private void stopBroker() throws Exception { - if (broker != null) - broker.stop(); - broker = null; - } - - @Override - protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory("vm://testStoreSize?jms.watchTopicAdvisories=false&jms.redeliveryPolicy.maximumRedeliveries=0&jms.closeTimeout=60000&waitForStart=5000&create=false"); - } - - @Override - @Before - public void setUp() throws Exception { - StringBuilder sb = new StringBuilder(5000); - for (int i = 0; i < 5000; i++) { - sb.append('a'); - } - data = sb.toString(); - - startBroker(true); - topic = (ActiveMQTopic) createDestination(); - } - - @Override - @After - public void tearDown() throws Exception { - stopBroker(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java deleted file mode 100644 index 71cb2a8..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.EmbeddedBrokerAndConnectionTestSupport; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.activemq.command.Message; -import org.apache.activemq.spring.ConsumerBean; - -public class AMQ2585Test extends EmbeddedBrokerAndConnectionTestSupport { - - private final Destination destination = new ActiveMQQueue("MyQueue"); - final static String LENGTH10STRING = "1234567890"; - private Session session; - private MessageProducer producer; - private ConsumerBean messageList; - - public void testOneMessageWithProperties() throws Exception { - TextMessage message = session.createTextMessage(LENGTH10STRING); - message.setStringProperty(LENGTH10STRING, LENGTH10STRING); - producer.send(message); - - messageList.assertMessagesArrived(1); - - ActiveMQTextMessage received = ((ActiveMQTextMessage) messageList.flushMessages().get(0)); - - assertEquals(LENGTH10STRING, received.getText()); - assertTrue(received.getProperties().size() > 0); - assertTrue(received.propertyExists(LENGTH10STRING)); - assertEquals(LENGTH10STRING, received.getStringProperty(LENGTH10STRING)); - - /** - * As specified by getSize(), the size (memory usage) of the body should - * be length of text * 2. Unsure of how memory usage is calculated for - * properties, but should probably not be less than the sum of (string) - * lengths for the key name and value. - */ - - final int sizeShouldBeNoLessThan = LENGTH10STRING.length() * 4 + Message.DEFAULT_MINIMUM_MESSAGE_SIZE; - assertTrue("Message size was smaller than expected: " + received.getSize(), received.getSize() >= sizeShouldBeNoLessThan); - assertFalse(LENGTH10STRING.length() * 2 == received.getSize()); - } - - @Override - protected void setUp() throws Exception { - bindAddress = bindAddress + "?marshal=true"; - super.setUp(); - messageList = new ConsumerBean(); - messageList.setVerbose(true); - - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageConsumer messageConsumer = session.createConsumer(destination); - - messageConsumer.setMessageListener(messageList); - - producer = session.createProducer(destination); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java deleted file mode 100644 index f22ff48..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.File; -import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.util.IOHelper; - -public class AMQ2616Test extends TestCase { - - private static final int NUMBER = 2000; - private BrokerService brokerService; - private final ArrayList threads = new ArrayList<>(); - private final String ACTIVEMQ_BROKER_BIND = "tcp://0.0.0.0:0"; - private final AtomicBoolean shutdown = new AtomicBoolean(); - - private String connectionUri; - - public void testQueueResourcesReleased() throws Exception { - ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(connectionUri); - Connection tempConnection = fac.createConnection(); - tempConnection.start(); - Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue tempQueue = tempSession.createTemporaryQueue(); - - Connection testConnection = fac.createConnection(); - long startUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage(); - Session testSession = testConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer testProducer = testSession.createProducer(tempQueue); - byte[] payload = new byte[1024 * 4]; - for (int i = 0; i < NUMBER; i++) { - BytesMessage msg = testSession.createBytesMessage(); - msg.writeBytes(payload); - testProducer.send(msg); - } - long endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage(); - assertFalse(startUsage == endUsage); - tempConnection.close(); - Thread.sleep(1000); - endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage(); - assertEquals(startUsage, endUsage); - } - - @Override - protected void setUp() throws Exception { - // Start an embedded broker up. - brokerService = new BrokerService(); - - KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter(); - adaptor.setEnableJournalDiskSyncs(false); - File file = new File("target/AMQ2616Test"); - IOHelper.mkdirs(file); - IOHelper.deleteChildren(file); - adaptor.setDirectory(file); - brokerService.setPersistenceAdapter(adaptor); - - PolicyMap policyMap = new PolicyMap(); - PolicyEntry pe = new PolicyEntry(); - pe.setMemoryLimit(10 * 1024 * 1024); - pe.setOptimizedDispatch(true); - pe.setProducerFlowControl(false); - pe.setExpireMessagesPeriod(1000); - pe.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy()); - policyMap.put(new ActiveMQQueue(">"), pe); - brokerService.setDestinationPolicy(policyMap); - brokerService.getSystemUsage().getMemoryUsage().setLimit(20 * 1024 * 1024); - brokerService.getSystemUsage().getTempUsage().setLimit(200 * 1024 * 1024); - brokerService.addConnector(ACTIVEMQ_BROKER_BIND); - brokerService.start(); - brokerService.waitUntilStarted(); - - connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString(); - - new ActiveMQQueue(getName()); - } - - @Override - protected void tearDown() throws Exception { - // Stop any running threads. - shutdown.set(true); - for (Thread t : threads) { - t.interrupt(); - t.join(); - } - brokerService.stop(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java deleted file mode 100644 index 61a5d1e..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ2645Test extends EmbeddedBrokerTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ2645Test.class); - private final static String QUEUE_NAME = "test.daroo.q"; - - public void testWaitForTransportInterruptionProcessingHang() throws Exception { - final ConnectionFactory fac = new ActiveMQConnectionFactory("failover:(" + this.bindAddress + ")"); - final Connection connection = fac.createConnection(); - try { - final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Queue queue = session.createQueue(QUEUE_NAME); - final MessageProducer producer = session.createProducer(queue); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - connection.start(); - - producer.send(session.createTextMessage("test")); - - final CountDownLatch afterRestart = new CountDownLatch(1); - final CountDownLatch twoNewMessages = new CountDownLatch(1); - final CountDownLatch thirdMessageReceived = new CountDownLatch(1); - - final MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); - consumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - afterRestart.await(); - - final TextMessage txtMsg = (TextMessage) message; - if (txtMsg.getText().equals("test")) { - producer.send(session.createTextMessage("test 1")); - TimeUnit.SECONDS.sleep(5); - // THIS SECOND send() WILL CAUSE CONSUMER DEADLOCK - producer.send(session.createTextMessage("test 2")); - LOG.info("Two new messages produced."); - twoNewMessages.countDown(); - } - else if (txtMsg.getText().equals("test 3")) { - thirdMessageReceived.countDown(); - } - } - catch (Exception e) { - LOG.error(e.toString()); - throw new RuntimeException(e); - } - } - }); - - LOG.info("Stopping broker...."); - broker.stop(); - - LOG.info("Creating new broker..."); - broker = createBroker(); - startBroker(); - broker.waitUntilStarted(); - - afterRestart.countDown(); - assertTrue("Consumer is deadlocked!", twoNewMessages.await(60, TimeUnit.SECONDS)); - - producer.send(session.createTextMessage("test 3")); - assertTrue("Consumer got third message after block", thirdMessageReceived.await(60, TimeUnit.SECONDS)); - - } - finally { - broker.stop(); - } - - } - - @Override - protected void setUp() throws Exception { - bindAddress = "tcp://0.0.0.0:61617"; - super.setUp(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java deleted file mode 100644 index 533b827..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBStore; -import org.apache.activemq.util.DefaultIOExceptionHandler; -import org.junit.After; -import org.junit.Test; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - -public class AMQ2736Test { - - BrokerService broker; - - @Test - public void testRollbackOnRecover() throws Exception { - broker = createAndStartBroker(true); - DefaultIOExceptionHandler ignoreAllExceptionsIOExHandler = new DefaultIOExceptionHandler(); - ignoreAllExceptionsIOExHandler.setIgnoreAllErrors(true); - broker.setIoExceptionHandler(ignoreAllExceptionsIOExHandler); - - ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost?async=false"); - f.setAlwaysSyncSend(true); - Connection c = f.createConnection(); - c.start(); - Session s = c.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer p = s.createProducer(new ActiveMQQueue("Tx")); - p.send(s.createTextMessage("aa")); - - // kill journal without commit - KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); - KahaDBStore store = pa.getStore(); - - assertNotNull("last tx location is present " + store.getInProgressTxLocationRange()[1]); - - // test hack, close the journal to ensure no further journal updates when broker stops - // mimic kill -9 in terms of no normal shutdown sequence - store.getJournal().close(); - try { - store.close(); - } - catch (Exception expectedLotsAsJournalBorked) { - } - - broker.stop(); - broker.waitUntilStopped(); - - // restart with recovery - broker = createAndStartBroker(false); - - pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); - store = pa.getStore(); - - // inflight non xa tx should be rolledback on recovery - assertNull("in progress tx location is present ", store.getInProgressTxLocationRange()[0]); - - } - - @After - public void stopBroker() throws Exception { - if (broker != null) { - broker.stop(); - } - } - - private BrokerService createAndStartBroker(boolean deleteAll) throws Exception { - BrokerService broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(deleteAll); - broker.setUseJmx(false); - broker.getManagementContext().setCreateConnector(false); - broker.start(); - return broker; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java deleted file mode 100644 index 539354c..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ2751Test extends EmbeddedBrokerTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ2751Test.class); - - private static String clientIdPrefix = "consumer"; - private static String queueName = "FOO"; - - public void testRecoverRedelivery() throws Exception { - - final CountDownLatch redelivery = new CountDownLatch(6); - final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getConnectUri() + ")"); - try { - - Connection connection = factory.createConnection(); - String clientId = clientIdPrefix; - connection.setClientID(clientId); - - final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - Queue queue = session.createQueue(queueName); - - MessageConsumer consumer = session.createConsumer(queue); - - consumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - LOG.info("Got message: " + message.getJMSMessageID()); - if (message.getJMSRedelivered()) { - LOG.info("It's a redelivery."); - redelivery.countDown(); - } - LOG.info("calling recover() on the session to force redelivery."); - session.recover(); - } - catch (JMSException e) { - e.printStackTrace(); - } - } - }); - - System.out.println("Created queue consumer with clientId " + clientId); - connection.start(); - - MessageProducer producer = session.createProducer(queue); - producer.send(session.createTextMessage("test")); - - assertTrue("we got 6 redeliveries", redelivery.await(20, TimeUnit.SECONDS)); - - } - finally { - broker.stop(); - } - - } - - @Override - protected void setUp() throws Exception { - bindAddress = "tcp://localhost:0"; - super.setUp(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java deleted file mode 100644 index 43394dc..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import static org.junit.Assert.*; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; -import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.usage.SystemUsage; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ2801Test { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ2801Test.class); - - private static final String TOPICNAME = "InvalidPendingQueueTest"; - private static final String SELECTOR1 = "JMS_ID" + " = '" + "TEST" + "'"; - private static final String SELECTOR2 = "JMS_ID" + " = '" + "TEST2" + "'"; - private static final String SUBSCRIPTION1 = "InvalidPendingQueueTest_1"; - private static final String SUBSCRIPTION2 = "InvalidPendingQueueTest_2"; - private static final int MSG_COUNT = 2500; - private Session session1; - private Connection conn1; - private Topic topic1; - private MessageConsumer consumer1; - private Session session2; - private Connection conn2; - private Topic topic2; - private MessageConsumer consumer2; - private BrokerService broker; - private String connectionUri; - - @Before - public void setUp() throws Exception { - broker = new BrokerService(); - broker.setDataDirectory("target" + File.separator + "activemq-data"); - broker.setPersistent(true); - broker.setUseJmx(true); - broker.setAdvisorySupport(false); - broker.setDeleteAllMessagesOnStartup(true); - broker.addConnector("tcp://localhost:0").setName("Default"); - applyMemoryLimitPolicy(broker); - broker.start(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - } - - private void applyMemoryLimitPolicy(BrokerService broker) { - final SystemUsage memoryManager = new SystemUsage(); - memoryManager.getMemoryUsage().setLimit(5818230784L); - memoryManager.getStoreUsage().setLimit(6442450944L); - memoryManager.getTempUsage().setLimit(3221225472L); - broker.setSystemUsage(memoryManager); - - final List policyEntries = new ArrayList<>(); - final PolicyEntry entry = new PolicyEntry(); - entry.setQueue(">"); - entry.setProducerFlowControl(false); - entry.setMemoryLimit(504857608); - entry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy()); - policyEntries.add(entry); - - final PolicyMap policyMap = new PolicyMap(); - policyMap.setPolicyEntries(policyEntries); - broker.setDestinationPolicy(policyMap); - } - - @After - public void tearDown() throws Exception { - conn1.close(); - conn2.close(); - if (broker != null) { - broker.stop(); - } - } - - private void produceMessages() throws Exception { - TopicConnection connection = createConnection(); - TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic(TOPICNAME); - TopicPublisher producer = session.createPublisher(topic); - connection.start(); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - long tStamp = System.currentTimeMillis(); - BytesMessage message = session2.createBytesMessage(); - for (int i = 1; i <= MSG_COUNT; i++) { - message.setStringProperty("JMS_ID", "TEST"); - message.setIntProperty("Type", i); - producer.publish(message); - if (i % 100 == 0) { - LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms"); - tStamp = System.currentTimeMillis(); - } - } - } - - private void activeateSubscribers() throws Exception { - // First consumer - conn1 = createConnection(); - conn1.setClientID(SUBSCRIPTION1); - session1 = conn1.createSession(true, Session.SESSION_TRANSACTED); - topic1 = session1.createTopic(TOPICNAME); - consumer1 = session1.createDurableSubscriber(topic1, SUBSCRIPTION1, SELECTOR1, false); - conn1.start(); - - // Second consumer that just exists - conn2 = createConnection(); - conn2.setClientID(SUBSCRIPTION2); - session2 = conn2.createSession(true, Session.SESSION_TRANSACTED); - topic2 = session2.createTopic(TOPICNAME); - consumer2 = session2.createDurableSubscriber(topic2, SUBSCRIPTION2, SELECTOR2, false); - conn2.start(); - } - - @Test - public void testInvalidPendingQueue() throws Exception { - - activeateSubscribers(); - - assertNotNull(consumer1); - assertNotNull(consumer2); - - produceMessages(); - LOG.debug("Sent messages to a single subscriber"); - Thread.sleep(2000); - - LOG.debug("Closing durable subscriber connections"); - conn1.close(); - conn2.close(); - LOG.debug("Closed durable subscriber connections"); - - Thread.sleep(2000); - LOG.debug("Re-starting durable subscriber connections"); - - activeateSubscribers(); - LOG.debug("Started up durable subscriber connections - now view activemq console to see pending queue size on the other subscriber"); - - ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers(); - - for (int i = 0; i < subs.length; i++) { - ObjectName subName = subs[i]; - DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true); - - LOG.info(sub.getSubscriptionName() + ": pending = " + sub.getPendingQueueSize() + ", dispatched: " + sub.getDispatchedQueueSize()); - if (sub.getSubscriptionName().equals(SUBSCRIPTION1)) { - assertEquals("Incorrect number of pending messages", MSG_COUNT, sub.getPendingQueueSize() + sub.getDispatchedQueueSize()); - } - else { - assertEquals("Incorrect number of pending messages", 0, sub.getPendingQueueSize()); - } - } - } - - private TopicConnection createConnection() throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); - connectionFactory.setBrokerURL(connectionUri); - TopicConnection conn = connectionFactory.createTopicConnection(); - return conn; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java deleted file mode 100644 index f089941..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java +++ /dev/null @@ -1,379 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.Topic; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQSession; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.leveldb.LevelDBStore; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.disk.journal.DataFile; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ2832Test { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ2832Test.class); - - BrokerService broker = null; - private ActiveMQConnectionFactory cf; - private final Destination destination = new ActiveMQQueue("AMQ2832Test"); - private String connectionUri; - - protected void startBroker() throws Exception { - doStartBroker(true, false); - } - - protected void restartBroker() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - doStartBroker(false, false); - } - - protected void recoverBroker() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - doStartBroker(false, true); - } - - private void doStartBroker(boolean delete, boolean recover) throws Exception { - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(delete); - broker.setPersistent(true); - broker.setUseJmx(true); - broker.addConnector("tcp://localhost:0"); - - configurePersistence(broker, recover); - - connectionUri = "vm://localhost?create=false"; - cf = new ActiveMQConnectionFactory(connectionUri); - - broker.start(); - LOG.info("Starting broker.."); - } - - protected void configurePersistence(BrokerService brokerService, boolean recover) throws Exception { - KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); - - // ensure there are a bunch of data files but multiple entries in each - adapter.setJournalMaxFileLength(1024 * 20); - - // speed up the test case, checkpoint and cleanup early and often - adapter.setCheckpointInterval(5000); - adapter.setCleanupInterval(5000); - - if (recover) { - adapter.setForceRecoverIndex(true); - } - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - /** - * Scenario: - * db-1.log has an unacknowledged message, - * db-2.log contains acks for the messages from db-1.log, - * db-3.log contains acks for the messages from db-2.log - * - * Expected behavior: since db-1.log is blocked, db-2.log and db-3.log should not be removed during the cleanup. - * Current situation on 5.10.0, 5.10.1 is that db-3.log is removed causing all messages from db-2.log, whose acks were in db-3.log, to be replayed. - * - * @throws Exception - */ - @Test - public void testAckChain() throws Exception { - startBroker(); - - StagedConsumer consumer = new StagedConsumer(); - // file #1 - produceMessagesToConsumeMultipleDataFiles(5); - // acknowledge first 2 messages and leave the 3rd one unacknowledged blocking db-1.log - consumer.receive(3); - - // send messages by consuming and acknowledging every message right after sent in order to get KahadbAdd and Remove command to be saved together - // this is necessary in order to get KahaAddMessageCommand to be saved in one db file and the corresponding KahaRemoveMessageCommand in the next one - produceAndConsumeImmediately(20, consumer); - consumer.receive(2).acknowledge(); // consume and ack the last 2 unconsumed - - // now we have 3 files written and started with #4 - consumer.close(); - - broker.stop(); - broker.waitUntilStopped(); - - recoverBroker(); - - consumer = new StagedConsumer(); - Message message = consumer.receive(1); - assertNotNull("One message stays unacked from db-1.log", message); - message.acknowledge(); - message = consumer.receive(1); - assertNull("There should not be any unconsumed messages any more", message); - consumer.close(); - } - - private void produceAndConsumeImmediately(int numOfMsgs, StagedConsumer consumer) throws Exception { - for (int i = 0; i < numOfMsgs; i++) { - produceMessagesToConsumeMultipleDataFiles(1); - consumer.receive(1).acknowledge(); - } - } - - @Test - public void testAckRemovedMessageReplayedAfterRecovery() throws Exception { - - startBroker(); - - StagedConsumer consumer = new StagedConsumer(); - int numMessagesAvailable = produceMessagesToConsumeMultipleDataFiles(20); - // this will block the reclaiming of one data file - Message firstUnacked = consumer.receive(10); - LOG.info("first unacked: " + firstUnacked.getJMSMessageID()); - Message secondUnacked = consumer.receive(1); - LOG.info("second unacked: " + secondUnacked.getJMSMessageID()); - numMessagesAvailable -= 11; - - numMessagesAvailable += produceMessagesToConsumeMultipleDataFiles(10); - // ensure ack is another data file - LOG.info("Acking firstUnacked: " + firstUnacked.getJMSMessageID()); - firstUnacked.acknowledge(); - - numMessagesAvailable += produceMessagesToConsumeMultipleDataFiles(10); - - consumer.receive(numMessagesAvailable).acknowledge(); - - // second unacked should keep first data file available but journal with the first ack - // may get whacked - consumer.close(); - - broker.stop(); - broker.waitUntilStopped(); - - recoverBroker(); - - consumer = new StagedConsumer(); - // need to force recovery? - - Message msg = consumer.receive(1, 5); - assertNotNull("One messages left after recovery", msg); - msg.acknowledge(); - - // should be no more messages - msg = consumer.receive(1, 5); - assertEquals("Only one messages left after recovery: " + msg, null, msg); - consumer.close(); - } - - @Test - public void testAlternateLossScenario() throws Exception { - - startBroker(); - PersistenceAdapter pa = broker.getPersistenceAdapter(); - if (pa instanceof LevelDBStore) { - return; - } - - ActiveMQQueue queue = new ActiveMQQueue("MyQueue"); - ActiveMQQueue disposable = new ActiveMQQueue("MyDisposableQueue"); - ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic"); - - // This ensure that data file 1 never goes away. - createInactiveDurableSub(topic); - assertEquals(1, getNumberOfJournalFiles()); - - // One Queue Message that will be acked in another data file. - produceMessages(queue, 1); - assertEquals(1, getNumberOfJournalFiles()); - - // Add some messages to consume space - produceMessages(disposable, 50); - - int dataFilesCount = getNumberOfJournalFiles(); - assertTrue(dataFilesCount > 1); - - // Create an ack for the single message on this queue - drainQueue(queue); - - // Add some more messages to consume space beyond tha data file with the ack - produceMessages(disposable, 50); - - assertTrue(dataFilesCount < getNumberOfJournalFiles()); - dataFilesCount = getNumberOfJournalFiles(); - - restartBroker(); - - // Clear out all queue data - broker.getAdminView().removeQueue(disposable.getQueueName()); - - // Once this becomes true our ack could be lost. - assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return getNumberOfJournalFiles() <= 3; - } - }, TimeUnit.MINUTES.toMillis(3))); - - // Recover and the Message should not be replayed but if the old MessageAck is lost - // then it could be. - recoverBroker(); - - assertTrue(drainQueue(queue) == 0); - } - - private int getNumberOfJournalFiles() throws IOException { - - Collection files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); - int reality = 0; - for (DataFile file : files) { - if (file != null) { - reality++; - } - } - - return reality; - } - - private void createInactiveDurableSub(Topic topic) throws Exception { - Connection connection = cf.createConnection(); - connection.setClientID("Inactive"); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive"); - consumer.close(); - connection.close(); - produceMessages(topic, 1); - } - - private int drainQueue(Queue queue) throws Exception { - Connection connection = cf.createConnection(); - connection.setClientID("Inactive"); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(queue); - int count = 0; - while (consumer.receive(5000) != null) { - count++; - } - consumer.close(); - connection.close(); - return count; - } - - private int produceMessages(Destination destination, int numToSend) throws Exception { - int sent = 0; - Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); - connection.start(); - try { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - for (int i = 0; i < numToSend; i++) { - producer.send(createMessage(session, i)); - sent++; - } - } - finally { - connection.close(); - } - - return sent; - } - - private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception { - return produceMessages(destination, numToSend); - } - - final String payload = new String(new byte[1024]); - - private Message createMessage(Session session, int i) throws Exception { - return session.createTextMessage(payload + "::" + i); - } - - private class StagedConsumer { - - Connection connection; - MessageConsumer consumer; - - StagedConsumer() throws Exception { - connection = new ActiveMQConnectionFactory("failover://" + broker.getTransportConnectors().get(0).getConnectUri().toString()).createConnection(); - connection.start(); - consumer = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE).createConsumer(destination); - } - - public Message receive(int numToReceive) throws Exception { - return receive(numToReceive, 2); - } - - public Message receive(int numToReceive, int timeoutInSeconds) throws Exception { - Message msg = null; - for (; numToReceive > 0; numToReceive--) { - - do { - msg = consumer.receive(1 * 1000); - } while (msg == null && --timeoutInSeconds > 0); - - if (numToReceive > 1) { - msg.acknowledge(); - } - - if (msg != null) { - LOG.debug("received: " + msg.getJMSMessageID()); - } - } - // last message, unacked - return msg; - } - - void close() throws JMSException { - consumer.close(); - connection.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java deleted file mode 100644 index b4f0a33..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Properties; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TopicSubscriber; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.BrokerView; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@RunWith(value = Parameterized.class) -public class AMQ2870Test extends org.apache.activemq.TestSupport { - - static final Logger LOG = LoggerFactory.getLogger(AMQ2870Test.class); - BrokerService broker = null; - ActiveMQTopic topic; - - ActiveMQConnection consumerConnection = null, producerConnection = null; - Session producerSession; - MessageProducer producer; - final int minPercentUsageForStore = 10; - String data; - - private final PersistenceAdapterChoice persistenceAdapterChoice; - - @Parameterized.Parameters - public static Collection getTestParameters() { - String osName = System.getProperty("os.name"); - LOG.info("Running on [" + osName + "]"); - PersistenceAdapterChoice[] kahaDb = {PersistenceAdapterChoice.KahaDB}; - PersistenceAdapterChoice[] levelDb = {PersistenceAdapterChoice.LevelDB}; - List choices = new ArrayList<>(); - choices.add(kahaDb); - if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) { - choices.add(levelDb); - } - - return choices; - } - - public AMQ2870Test(PersistenceAdapterChoice choice) { - this.persistenceAdapterChoice = choice; - } - - @Test(timeout = 300000) - public void testSize() throws Exception { - openConsumer(); - - assertEquals(0, broker.getAdminView().getStorePercentUsage()); - - for (int i = 0; i < 5000; i++) { - sendMessage(false); - } - - final BrokerView brokerView = broker.getAdminView(); - - // wait for reclaim - assertTrue("in range with consumer", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - // usage percent updated only on send check for isFull so once - // sends complete it is no longer updated till next send via a call to isFull - // this is optimal as it is only used to block producers - broker.getSystemUsage().getStoreUsage().isFull(); - LOG.info("store percent usage: " + brokerView.getStorePercentUsage()); - return broker.getAdminView().getStorePercentUsage() < minPercentUsageForStore; - } - })); - - closeConsumer(); - - assertTrue("in range with closed consumer", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - broker.getSystemUsage().getStoreUsage().isFull(); - LOG.info("store precent usage: " + brokerView.getStorePercentUsage()); - return broker.getAdminView().getStorePercentUsage() < minPercentUsageForStore; - } - })); - - for (int i = 0; i < 5000; i++) { - sendMessage(false); - } - - // What if i drop the subscription? - broker.getAdminView().destroyDurableSubscriber("cliID", "subName"); - - assertTrue("in range after send with consumer", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - broker.getSystemUsage().getStoreUsage().isFull(); - LOG.info("store precent usage: " + brokerView.getStorePercentUsage()); - return broker.getAdminView().getStorePercentUsage() < minPercentUsageForStore; - } - })); - } - - private void openConsumer() throws Exception { - consumerConnection = (ActiveMQConnection) createConnection(); - consumerConnection.setClientID("cliID"); - consumerConnection.start(); - Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subName", "filter=true", false); - - subscriber.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - // received++; - } - }); - } - - private void closeConsumer() throws JMSException { - if (consumerConnection != null) - consumerConnection.close(); - consumerConnection = null; - } - - private void sendMessage(boolean filter) throws Exception { - if (producerConnection == null) { - producerConnection = (ActiveMQConnection) createConnection(); - producerConnection.start(); - producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = producerSession.createProducer(topic); - } - - Message message = producerSession.createMessage(); - message.setBooleanProperty("filter", filter); - message.setStringProperty("data", data); - producer.send(message); - } - - private void startBroker(boolean deleteMessages) throws Exception { - broker = new BrokerService(); - broker.setAdvisorySupport(false); - broker.setBrokerName("testStoreSize"); - - if (deleteMessages) { - broker.setDeleteAllMessagesOnStartup(true); - } - LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString()); - setPersistenceAdapter(broker, persistenceAdapterChoice); - configurePersistenceAdapter(broker.getPersistenceAdapter()); - broker.getSystemUsage().getStoreUsage().setLimit(100 * 1000 * 1000); - broker.start(); - } - - private void configurePersistenceAdapter(PersistenceAdapter persistenceAdapter) { - Properties properties = new Properties(); - String maxFileLengthVal = String.valueOf(2 * 1024 * 1024); - properties.put("journalMaxFileLength", maxFileLengthVal); - properties.put("maxFileLength", maxFileLengthVal); - properties.put("cleanupInterval", "2000"); - properties.put("checkpointInterval", "2000"); - - // leveldb - properties.put("logSize", maxFileLengthVal); - - IntrospectionSupport.setProperties(persistenceAdapter, properties); - } - - private void stopBroker() throws Exception { - if (broker != null) - broker.stop(); - broker = null; - } - - @Override - protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory("vm://testStoreSize?jms.watchTopicAdvisories=false&waitForStart=5000&create=false"); - } - - @Override - @Before - public void setUp() throws Exception { - StringBuilder sb = new StringBuilder(5000); - for (int i = 0; i < 5000; i++) { - sb.append('a'); - } - data = sb.toString(); - - startBroker(true); - topic = (ActiveMQTopic) createDestination(); - } - - @Override - @After - public void tearDown() throws Exception { - stopBroker(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java deleted file mode 100644 index 798d32f..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.TransportConnection; -import org.apache.activemq.transport.TransportDisposedIOException; -import org.apache.activemq.util.DefaultTestAppender; -import org.apache.log4j.Appender; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.spi.LoggingEvent; -import org.slf4j.LoggerFactory; - -public class AMQ2902Test extends TestCase { - - private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(AMQ2580Test.class); - - final AtomicBoolean gotExceptionInLog = new AtomicBoolean(Boolean.FALSE); - final AtomicBoolean failedToFindMDC = new AtomicBoolean(Boolean.FALSE); - - Appender appender = new DefaultTestAppender() { - @Override - public void doAppend(LoggingEvent event) { - if (event.getThrowableInformation() != null && event.getThrowableInformation().getThrowable() instanceof TransportDisposedIOException) { - - // Prevent StackOverflowException so we can see a sane stack trace. - if (gotExceptionInLog.get()) { - return; - } - - gotExceptionInLog.set(Boolean.TRUE); - LOG.error("got event: " + event + ", ex:" + event.getThrowableInformation().getThrowable(), event.getThrowableInformation().getThrowable()); - LOG.error("Event source: ", new Throwable("Here")); - } - if (!"Loaded the Bouncy Castle security provider.".equals(event.getMessage())) { - if (event.getMDC("activemq.broker") == null) { - failedToFindMDC.set(Boolean.TRUE); - } - } - return; - } - }; - - public void testNoExceptionOnClosewithStartStop() throws JMSException { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); - Connection connection = connectionFactory.createConnection(); - connection.start(); - connection.stop(); - connection.close(); - } - - public void testNoExceptionOnClose() throws JMSException { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); - Connection connection = connectionFactory.createConnection(); - connection.close(); - } - - @Override - public void setUp() throws Exception { - gotExceptionInLog.set(Boolean.FALSE); - failedToFindMDC.set(Boolean.FALSE); - Logger.getRootLogger().addAppender(appender); - Logger.getLogger(TransportConnection.class.getName() + ".Transport").setLevel(Level.DEBUG); - Logger.getLogger(TransportConnection.class.getName()).setLevel(Level.DEBUG); - } - - @Override - public void tearDown() throws Exception { - Logger.getRootLogger().removeAppender(appender); - assertFalse("got unexpected ex in log on graceful close", gotExceptionInLog.get()); - assertFalse("MDC is there", failedToFindMDC.get()); - } -}