Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D90EB18575 for ; Tue, 9 Jun 2015 16:36:33 +0000 (UTC) Received: (qmail 26464 invoked by uid 500); 9 Jun 2015 16:36:33 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 26193 invoked by uid 500); 9 Jun 2015 16:36:33 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 25505 invoked by uid 99); 9 Jun 2015 16:36:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jun 2015 16:36:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AED56E0278; Tue, 9 Jun 2015 16:36:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 09 Jun 2015 16:36:43 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [12/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java new file mode 100644 index 0000000..b32c7ad --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; +import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.Assert; +import org.junit.Test; + +/** + * @author James Furness + * https://issues.apache.org/jira/browse/AMQ-3607 + */ +public class ActiveMQSlowConsumerManualTest { + private static final int PORT = 12345; + private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC"); + private static final String URL = "nio://localhost:" + PORT + "?socket.tcpNoDelay=true"; + + @Test(timeout = 60000) + public void testDefaultSettings() throws Exception { + runTest("testDefaultSettings", 30, -1, -1, false, false, false, false); + } + + @Test(timeout = 60000) + public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception { + runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1, false, false, true, false); + } + + @Test(timeout = 60000) + public void testBounded() throws Exception { + runTest("testBounded", 30, 5, 25, false, false, false, false); + } + + @Test(timeout = 60000) + public void testBoundedWithOptimiseAcknowledge() throws Exception { + runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 25, false, false, true, false); + } + + public void runTest(String name, int sendMessageCount, int prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean disableFlowControl, boolean optimizeAcknowledge, boolean persistent) throws Exception { + BrokerService broker = createBroker(persistent); + broker.setDestinationPolicy(buildPolicy(TOPIC, prefetchLimit, messageLimit, evictOldestMessage, disableFlowControl)); + broker.start(); + + // Slow consumer + Session slowConsumerSession = buildSession("SlowConsumer", URL, optimizeAcknowledge); + final CountDownLatch blockSlowConsumer = new CountDownLatch(1); + final AtomicInteger slowConsumerReceiveCount = new AtomicInteger(); + final List slowConsumerReceived = sendMessageCount <= 1000 ? new ArrayList() : null; + MessageConsumer slowConsumer = createSubscriber(slowConsumerSession, + new MessageListener() { + @Override + public void onMessage(Message message) { + try { + slowConsumerReceiveCount.incrementAndGet(); + int count = Integer.parseInt(((TextMessage) message).getText()); + if (slowConsumerReceived != null) slowConsumerReceived.add(count); + if (count % 10000 == 0) System.out.println("SlowConsumer: Receive " + count); + blockSlowConsumer.await(); + } catch (Exception ignored) { + } + } + } + ); + + // Fast consumer + Session fastConsumerSession = buildSession("FastConsumer", URL, optimizeAcknowledge); + final AtomicInteger fastConsumerReceiveCount = new AtomicInteger(); + final List fastConsumerReceived = sendMessageCount <= 1000 ? new ArrayList() : null; + MessageConsumer fastConsumer = createSubscriber(fastConsumerSession, + new MessageListener() { + @Override + public void onMessage(Message message) { + try { + fastConsumerReceiveCount.incrementAndGet(); + TimeUnit.MILLISECONDS.sleep(5); + int count = Integer.parseInt(((TextMessage) message).getText()); + if (fastConsumerReceived != null) fastConsumerReceived.add(count); + if (count % 10000 == 0) System.out.println("FastConsumer: Receive " + count); + } catch (Exception ignored) { + } + } + } + ); + + // Wait for consumers to connect + Thread.sleep(500); + + // Publisher + AtomicInteger sentCount = new AtomicInteger(); + List sent = sendMessageCount <= 1000 ? new ArrayList() : null; + Session publisherSession = buildSession("Publisher", URL, optimizeAcknowledge); + MessageProducer publisher = createPublisher(publisherSession); + for (int i = 0; i < sendMessageCount; i++) { + sentCount.incrementAndGet(); + if (sent != null) sent.add(i); + if (i % 10000 == 0) System.out.println("Publisher: Send " + i); + publisher.send(publisherSession.createTextMessage(Integer.toString(i))); + } + + // Wait for messages to arrive + Thread.sleep(500); + + System.out.println(name + ": Publisher Sent: " + sentCount + " " + sent); + System.out.println(name + ": Whilst slow consumer blocked:"); + System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived); + System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived); + + // Unblock slow consumer + blockSlowConsumer.countDown(); + + // Wait for messages to arrive + Thread.sleep(500); + + System.out.println(name + ": After slow consumer unblocked:"); + System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived); + System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived); + System.out.println(); + + publisher.close(); + publisherSession.close(); + slowConsumer.close(); + slowConsumerSession.close(); + fastConsumer.close(); + fastConsumerSession.close(); + broker.stop(); + + Assert.assertEquals("Fast consumer missed messages whilst slow consumer was blocking", sent, fastConsumerReceived); + // this is too timine dependent as sometimes there is message eviction, would need to check the dlq + //Assert.assertEquals("Slow consumer received incorrect message count", Math.min(sendMessageCount, prefetchLimit + (messageLimit > 0 ? messageLimit : Integer.MAX_VALUE)), slowConsumerReceived.size()); + } + + private static BrokerService createBroker(boolean persistent) throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("TestBroker"); + broker.setPersistent(persistent); + broker.addConnector(URL); + return broker; + } + + private static MessageConsumer createSubscriber(Session session, MessageListener messageListener) throws JMSException { + MessageConsumer consumer = session.createConsumer(TOPIC); + consumer.setMessageListener(messageListener); + return consumer; + } + + private static MessageProducer createPublisher(Session session) throws JMSException { + MessageProducer producer = session.createProducer(TOPIC); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + return producer; + } + + private static Session buildSession(String clientId, String url, boolean optimizeAcknowledge) throws JMSException { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); + + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setDisableTimeStampsByDefault(true); + connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge); + if (optimizeAcknowledge) { + connectionFactory.setOptimizeAcknowledgeTimeOut(1); + } + + Connection connection = connectionFactory.createConnection(); + connection.setClientID(clientId); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + connection.start(); + + return session; + } + + private static PolicyMap buildPolicy(ActiveMQTopic topic, int prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean disableFlowControl) { + PolicyMap policyMap = new PolicyMap(); + + PolicyEntry policyEntry = new PolicyEntry(); + + if (evictOldestMessage) { + policyEntry.setMessageEvictionStrategy(new OldestMessageEvictionStrategy()); + } + + if (disableFlowControl) { + policyEntry.setProducerFlowControl(false); + } + + if (prefetchLimit > 0) { + policyEntry.setTopicPrefetch(prefetchLimit); + } + + if (messageLimit > 0) { + ConstantPendingMessageLimitStrategy messageLimitStrategy = new ConstantPendingMessageLimitStrategy(); + messageLimitStrategy.setLimit(messageLimit); + policyEntry.setPendingMessageLimitStrategy(messageLimitStrategy); + } + + policyMap.put(topic, policyEntry); + + return policyMap; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java new file mode 100644 index 0000000..8c580a9 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package org.apache.activemq.bugs; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConnectionPerMessageTest extends EmbeddedBrokerTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(ConnectionPerMessageTest.class); + private static final int COUNT = 2000; + protected String bindAddress; + + public void testConnectionPerMessage() throws Exception { + final String topicName = "test.topic"; + + LOG.info("Initializing connection factory for JMS to URL: " + + bindAddress); + final ActiveMQConnectionFactory normalFactory = new ActiveMQConnectionFactory(); + normalFactory.setBrokerURL(bindAddress); + for (int i = 0; i < COUNT; i++) { + + if (i % 100 == 0) { + LOG.info(new Integer(i).toString()); + } + + Connection conn = null; + try { + + conn = normalFactory.createConnection(); + final Session session = conn.createSession(false, + Session.AUTO_ACKNOWLEDGE); + final Topic topic = session.createTopic(topicName); + final MessageProducer producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + final MapMessage m = session.createMapMessage(); + m.setInt("hey", i); + + producer.send(m); + + } catch (JMSException e) { + LOG.warn(e.getMessage(), e); + } finally { + if (conn != null) + try { + conn.close(); + } catch (JMSException e) { + LOG.warn(e.getMessage(), e); + } + } + } + } + + protected void setUp() throws Exception { + bindAddress = "vm://localhost"; + super.setUp(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setDeleteAllMessagesOnStartup(true); + answer.setUseJmx(false); + answer.setPersistent(isPersistent()); + answer.addConnector(bindAddress); + return answer; + } + + protected boolean isPersistent() { + return true; + } + + protected void tearDown() throws Exception { + super.tearDown(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java new file mode 100644 index 0000000..f956da6 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.command.ActiveMQQueue; + +public class CraigsBugTest extends EmbeddedBrokerTestSupport { + + private String connectionUri; + + public void testConnectionFactory() throws Exception { + final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri); + final ActiveMQQueue queue = new ActiveMQQueue("testqueue"); + final Connection conn = cf.createConnection(); + + Runnable r = new Runnable() { + public void run() { + try { + Session session = conn.createSession(false, 1); + MessageConsumer consumer = session.createConsumer(queue, null); + consumer.receive(1000); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }; + new Thread(r).start(); + conn.start(); + + try { + synchronized (this) { + wait(3000); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:0"; + super.setUp(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java new file mode 100644 index 0000000..bb66943 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.util.concurrent.TimeoutException; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.command.ActiveMQDestination; +import org.junit.Assert; + +public class DoubleExpireTest extends EmbeddedBrokerTestSupport { + + private static final long MESSAGE_TTL_MILLIS = 1000; + private static final long MAX_TEST_TIME_MILLIS = 60000; + + public void setUp() throws Exception { + setAutoFail(true); + setMaxTestTime(MAX_TEST_TIME_MILLIS); + super.setUp(); + } + + /** + * This test verifies that a message that expires can be be resent to queue + * with a new expiration and that it will be processed as a new message and + * allowed to re-expire. + *

+ * NOTE: This test fails on AMQ 5.4.2 because the originalExpiration + * timestamp is not cleared when the message is resent. + */ + public void testDoubleExpireWithoutMove() throws Exception { + // Create the default dead letter queue. + final ActiveMQDestination DLQ = createDestination("ActiveMQ.DLQ"); + + Connection conn = createConnection(); + try { + conn.start(); + Session session = conn.createSession(false, + Session.AUTO_ACKNOWLEDGE); + + // Verify that the test queue and DLQ are empty. + Assert.assertEquals(0, getSize(destination)); + Assert.assertEquals(0, getSize(DLQ)); + + // Enqueue a message to the test queue that will expire after 1s. + MessageProducer producer = session.createProducer(destination); + Message testMessage = session.createTextMessage("test message"); + producer.send(testMessage, Message.DEFAULT_DELIVERY_MODE, + Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS); + Assert.assertEquals(1, getSize(destination)); + + // Wait for the message to expire. + waitForSize(destination, 0, MAX_TEST_TIME_MILLIS); + Assert.assertEquals(1, getSize(DLQ)); + + // Consume the message from the DLQ and re-enqueue it to the test + // queue so that it expires after 1s. + MessageConsumer consumer = session.createConsumer(DLQ); + Message expiredMessage = consumer.receive(); + Assert.assertEquals(testMessage.getJMSMessageID(), expiredMessage + .getJMSMessageID()); + + producer.send(expiredMessage, Message.DEFAULT_DELIVERY_MODE, + Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS); + Assert.assertEquals(1, getSize(destination)); + Assert.assertEquals(0, getSize(DLQ)); + + // Verify that the resent message is "different" in that it has + // another ID. + Assert.assertNotSame(testMessage.getJMSMessageID(), expiredMessage + .getJMSMessageID()); + + // Wait for the message to re-expire. + waitForSize(destination, 0, MAX_TEST_TIME_MILLIS); + Assert.assertEquals(1, getSize(DLQ)); + + // Re-consume the message from the DLQ. + Message reexpiredMessage = consumer.receive(); + Assert.assertEquals(expiredMessage.getJMSMessageID(), reexpiredMessage + .getJMSMessageID()); + } finally { + conn.close(); + } + } + + /** + * A helper method that returns the embedded broker's implementation of a + * JMS queue. + */ + private Queue getPhysicalDestination(ActiveMQDestination destination) + throws Exception { + return (Queue) broker.getAdminView().getBroker().getDestinationMap() + .get(destination); + } + + /** + * A helper method that returns the size of the specified queue/topic. + */ + private long getSize(ActiveMQDestination destination) throws Exception { + return getPhysicalDestination(destination) != null ? getPhysicalDestination( + destination).getDestinationStatistics().getMessages() + .getCount() + : 0; + } + + /** + * A helper method that waits for a destination to reach a certain size. + */ + private void waitForSize(ActiveMQDestination destination, int size, + long timeoutMillis) throws Exception, TimeoutException { + long startTimeMillis = System.currentTimeMillis(); + + while (getSize(destination) != size + && System.currentTimeMillis() < (startTimeMillis + timeoutMillis)) { + Thread.sleep(250); + } + + if (getSize(destination) != size) { + throw new TimeoutException("Destination " + + destination.getPhysicalName() + " did not reach size " + + size + " within " + timeoutMillis + "ms."); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java new file mode 100644 index 0000000..eeee82b --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java @@ -0,0 +1,463 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicConnectionFactory; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import javax.management.ObjectName; +import junit.framework.Test; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Test case for AMQ-1479 + */ +public class DurableConsumerTest extends CombinationTestSupport{ + private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerTest.class); + private static int COUNT = 1024; + private static String CONSUMER_NAME = "DURABLE_TEST"; + protected BrokerService broker; + + protected String bindAddress = "tcp://localhost:61616"; + + protected byte[] payload = new byte[1024 * 32]; + protected ConnectionFactory factory; + protected Vector exceptions = new Vector(); + + private static final String TOPIC_NAME = "failoverTopic"; + private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)"; + public boolean useDedicatedTaskRunner = false; + + private class SimpleTopicSubscriber implements MessageListener,ExceptionListener{ + + private TopicConnection topicConnection = null; + + public SimpleTopicSubscriber(String connectionURL,String clientId,String topicName) { + + ActiveMQConnectionFactory topicConnectionFactory = null; + TopicSession topicSession = null; + Topic topic = null; + TopicSubscriber topicSubscriber = null; + + topicConnectionFactory = new ActiveMQConnectionFactory(connectionURL); + try { + + topic = new ActiveMQTopic(topicName); + topicConnection = topicConnectionFactory.createTopicConnection(); + topicConnection.setClientID((clientId)); + topicConnection.start(); + + topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + topicSubscriber = topicSession.createDurableSubscriber(topic, (clientId)); + topicSubscriber.setMessageListener(this); + + } catch (JMSException e) { + e.printStackTrace(); + } + } + + public void onMessage(Message arg0){ + } + + public void closeConnection(){ + if (topicConnection != null) { + try { + topicConnection.close(); + } catch (JMSException e) { + } + } + } + + public void onException(JMSException exception){ + exceptions.add(exception); + } + } + + private class MessagePublisher implements Runnable{ + private final boolean shouldPublish = true; + + public void run(){ + TopicConnectionFactory topicConnectionFactory = null; + TopicConnection topicConnection = null; + TopicSession topicSession = null; + Topic topic = null; + TopicPublisher topicPublisher = null; + Message message = null; + + topicConnectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL); + try { + topic = new ActiveMQTopic(TOPIC_NAME); + topicConnection = topicConnectionFactory.createTopicConnection(); + topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + topicPublisher = topicSession.createPublisher(topic); + message = topicSession.createMessage(); + } catch (Exception ex) { + exceptions.add(ex); + } + while (shouldPublish) { + try { + topicPublisher.publish(message, DeliveryMode.PERSISTENT, 1, 2 * 60 * 60 * 1000); + } catch (JMSException ex) { + exceptions.add(ex); + } + try { + Thread.sleep(1); + } catch (Exception ex) { + } + } + } + } + + private void configurePersistence(BrokerService broker) throws Exception{ + File dataDirFile = new File("target/" + getName()); + KahaDBPersistenceAdapter kahaDBAdapter = new KahaDBPersistenceAdapter(); + kahaDBAdapter.setDirectory(dataDirFile); + broker.setPersistenceAdapter(kahaDBAdapter); + } + + public void testFailover() throws Exception{ + + configurePersistence(broker); + broker.start(); + + Thread publisherThread = new Thread(new MessagePublisher()); + publisherThread.start(); + final int numSubs = 100; + final List list = new ArrayList(numSubs); + for (int i = 0; i < numSubs; i++) { + + final int id = i; + Thread thread = new Thread(new Runnable(){ + public void run(){ + SimpleTopicSubscriber s =new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME); + list.add(s); + } + }); + thread.start(); + + } + + Wait.waitFor(new Wait.Condition(){ + @Override + public boolean isSatisified() throws Exception { + return numSubs == list.size(); + } + }); + + broker.stop(); + broker = createBroker(false); + configurePersistence(broker); + broker.start(); + Thread.sleep(10000); + for (SimpleTopicSubscriber s:list) { + s.closeConnection(); + } + assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + } + + // makes heavy use of threads and can demonstrate https://issues.apache.org/activemq/browse/AMQ-2028 + // with use dedicatedTaskRunner=true and produce OOM + public void initCombosForTestConcurrentDurableConsumer(){ + addCombinationValues("useDedicatedTaskRunner", new Object[] { Boolean.TRUE, Boolean.FALSE }); + } + + public void testConcurrentDurableConsumer() throws Exception{ + + broker.start(); + broker.waitUntilStarted(); + + factory = createConnectionFactory(); + final String topicName = getName(); + final int numMessages = 500; + int numConsumers = 1; + final CountDownLatch counsumerStarted = new CountDownLatch(numConsumers); + final AtomicInteger receivedCount = new AtomicInteger(); + Runnable consumer = new Runnable(){ + public void run(){ + final String consumerName = Thread.currentThread().getName(); + int acked = 0; + int received = 0; + + try { + while (acked < numMessages / 2) { + // take one message and close, ack on occasion + Connection consumerConnection = factory.createConnection(); + ((ActiveMQConnection) consumerConnection).setWatchTopicAdvisories(false); + consumerConnection.setClientID(consumerName); + Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Topic topic = consumerSession.createTopic(topicName); + consumerConnection.start(); + + MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, consumerName); + + counsumerStarted.countDown(); + Message msg = null; + do { + msg = consumer.receive(5000); + if (msg != null) { + receivedCount.incrementAndGet(); + if (received != 0 && received % 100 == 0) { + LOG.info("Received msg: " + msg.getJMSMessageID()); + } + if (++received % 2 == 0) { + msg.acknowledge(); + acked++; + } + } + } while (msg == null); + + consumerConnection.close(); + } + assertTrue(received >= acked); + } catch (Exception e) { + e.printStackTrace(); + exceptions.add(e); + } + } + }; + + ExecutorService executor = Executors.newFixedThreadPool(numConsumers); + + for (int i = 0; i < numConsumers; i++) { + executor.execute(consumer); + } + + assertTrue(counsumerStarted.await(30, TimeUnit.SECONDS)); + + Connection producerConnection = factory.createConnection(); + ((ActiveMQConnection) producerConnection).setWatchTopicAdvisories(false); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = producerSession.createTopic(topicName); + MessageProducer producer = producerSession.createProducer(topic); + producerConnection.start(); + for (int i = 0; i < numMessages; i++) { + BytesMessage msg = producerSession.createBytesMessage(); + msg.writeBytes(payload); + producer.send(msg); + if (i != 0 && i % 100 == 0) { + LOG.info("Sent msg " + i); + } + } + + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.SECONDS); + + Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception{ + LOG.info("receivedCount: " + receivedCount.get()); + return receivedCount.get() == numMessages; + } + }, 360 * 1000); + assertEquals("got required some messages", numMessages, receivedCount.get()); + assertTrue("no exceptions, but: " + exceptions, exceptions.isEmpty()); + } + + public void testConsumerRecover() throws Exception{ + doTestConsumer(true); + } + + public void testConsumer() throws Exception{ + doTestConsumer(false); + } + + public void testPrefetchViaBrokerConfig() throws Exception { + + Integer prefetchVal = new Integer(150); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setDurableTopicPrefetch(prefetchVal.intValue()); + policyEntry.setPrioritizedMessages(true); + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(policyEntry); + broker.setDestinationPolicy(policyMap); + broker.start(); + + factory = createConnectionFactory(); + Connection consumerConnection = factory.createConnection(); + consumerConnection.setClientID(CONSUMER_NAME); + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = consumerSession.createTopic(getClass().getName()); + MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME); + consumerConnection.start(); + + ObjectName activeSubscriptionObjectName = broker.getAdminView().getDurableTopicSubscribers()[0]; + Object prefetchFromSubView = broker.getManagementContext().getAttribute(activeSubscriptionObjectName, "PrefetchSize"); + assertEquals(prefetchVal, prefetchFromSubView); + } + + public void doTestConsumer(boolean forceRecover) throws Exception{ + + if (forceRecover) { + configurePersistence(broker); + } + broker.start(); + + factory = createConnectionFactory(); + Connection consumerConnection = factory.createConnection(); + consumerConnection.setClientID(CONSUMER_NAME); + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = consumerSession.createTopic(getClass().getName()); + MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME); + consumerConnection.start(); + consumerConnection.close(); + broker.stop(); + broker = createBroker(false); + if (forceRecover) { + configurePersistence(broker); + } + broker.start(); + + Connection producerConnection = factory.createConnection(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = producerSession.createProducer(topic); + producerConnection.start(); + for (int i = 0; i < COUNT; i++) { + BytesMessage msg = producerSession.createBytesMessage(); + msg.writeBytes(payload); + producer.send(msg); + if (i != 0 && i % 1000 == 0) { + LOG.info("Sent msg " + i); + } + } + producerConnection.close(); + broker.stop(); + broker = createBroker(false); + if (forceRecover) { + configurePersistence(broker); + } + broker.start(); + + consumerConnection = factory.createConnection(); + consumerConnection.setClientID(CONSUMER_NAME); + consumerConnection.start(); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME); + for (int i = 0; i < COUNT; i++) { + Message msg = consumer.receive(10000); + assertNotNull("Missing message: " + i, msg); + if (i != 0 && i % 1000 == 0) { + LOG.info("Received msg " + i); + } + + } + consumerConnection.close(); + + } + + @Override + protected void setUp() throws Exception{ + if (broker == null) { + broker = createBroker(true); + } + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception{ + super.tearDown(); + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + } + + protected Topic creatTopic(Session s,String destinationName) throws JMSException{ + return s.createTopic(destinationName); + } + + /** + * Factory method to create a new broker + * + * @throws Exception + */ + protected BrokerService createBroker(boolean deleteStore) throws Exception{ + BrokerService answer = new BrokerService(); + configureBroker(answer, deleteStore); + return answer; + } + + protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception{ + answer.setDeleteAllMessagesOnStartup(deleteStore); + KahaDBStore kaha = new KahaDBStore(); + //kaha.setConcurrentStoreAndDispatchTopics(false); + File directory = new File("target/activemq-data/kahadb"); + if (deleteStore) { + IOHelper.deleteChildren(directory); + } + kaha.setDirectory(directory); + //kaha.setMaxAsyncJobs(10); + + answer.setPersistenceAdapter(kaha); + answer.addConnector(bindAddress); + answer.setUseShutdownHook(false); + answer.setAdvisorySupport(false); + answer.setDedicatedTaskRunner(useDedicatedTaskRunner); + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{ + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress); + factory.setUseDedicatedTaskRunner(useDedicatedTaskRunner); + return factory; + } + + public static Test suite(){ + return suite(DurableConsumerTest.class); + } + + public static void main(String[] args){ + junit.textui.TestRunner.run(suite()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java new file mode 100644 index 0000000..80c4e9f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; + +/** + * + */ +public class JMSDurableTopicNoLocalTest extends EmbeddedBrokerTestSupport { + protected String bindAddress; + + public void testConsumeNoLocal() throws Exception { + final String TEST_NAME = getClass().getName(); + Connection connection = createConnection(); + connection.setClientID(TEST_NAME); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + TopicSubscriber subscriber = session.createDurableSubscriber((Topic) destination, "topicUser2", null, true); + + + final CountDownLatch latch = new CountDownLatch(1); + subscriber.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + System.out.println("Receive a message " + message); + latch.countDown(); + } + }); + + connection.start(); + + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("THIS IS A TEST"); + producer.send(message); + producer.close(); + latch.await(5,TimeUnit.SECONDS); + assertEquals(latch.getCount(),1); + } + + @Override + protected void setUp() throws Exception { + bindAddress = "vm://localhost"; + useTopic=true; + super.setUp(); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setUseJmx(false); + answer.setPersistent(true); + answer.setDeleteAllMessagesOnStartup(true); + answer.addConnector(bindAddress); + return answer; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java new file mode 100644 index 0000000..05a8c1d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.Properties; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.test.JmsTopicSendReceiveTest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest { + + static final int NMSG = 200; + static final int MSIZE = 256000; + private static final transient Logger LOG = LoggerFactory.getLogger(JmsDurableTopicSlowReceiveTest.class); + private static final String COUNT_PROPERY_NAME = "count"; + + protected Connection connection2; + protected Session session2; + protected Session consumeSession2; + protected MessageConsumer consumer2; + protected MessageProducer producer2; + protected Destination consumerDestination2; + BrokerService broker; + private Connection connection3; + private Session consumeSession3; + private TopicSubscriber consumer3; + + /** + * Set up a durable suscriber test. + * + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + this.durable = true; + broker = createBroker(); + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + broker.stop(); + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory result = new ActiveMQConnectionFactory("vm://localhost?async=false"); + Properties props = new Properties(); + props.put("prefetchPolicy.durableTopicPrefetch", "5"); + props.put("prefetchPolicy.optimizeDurableTopicPrefetch", "5"); + result.setProperties(props); + return result; + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + configureBroker(answer); + answer.start(); + return answer; + } + + protected void configureBroker(BrokerService answer) throws Exception { + answer.setDeleteAllMessagesOnStartup(true); + } + + /** + * Test if all the messages sent are being received. + * + * @throws Exception + */ + public void testSlowReceiver() throws Exception { + connection2 = createConnection(); + connection2.setClientID("test"); + connection2.start(); + consumeSession2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumerDestination2 = session2.createTopic(getConsumerSubject() + "2"); + consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName()); + + consumer2.close(); + connection2.close(); + new Thread(new Runnable() { + + public void run() { + try { + int count = 0; + for (int loop = 0; loop < 4; loop++) { + connection2 = createConnection(); + connection2.start(); + session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer2 = session2.createProducer(null); + producer2.setDeliveryMode(deliveryMode); + Thread.sleep(1000); + for (int i = 0; i < NMSG / 4; i++) { + BytesMessage message = session2.createBytesMessage(); + message.writeBytes(new byte[MSIZE]); + message.setStringProperty("test", "test"); + message.setIntProperty(COUNT_PROPERY_NAME, count); + message.setJMSType("test"); + producer2.send(consumerDestination2, message); + Thread.sleep(50); + if (verbose) { + LOG.debug("Sent(" + loop + "): " + i); + } + count++; + } + producer2.close(); + connection2.stop(); + connection2.close(); + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + }, "SENDER Thread").start(); + connection3 = createConnection(); + connection3.setClientID("test"); + connection3.start(); + consumeSession3 = connection3.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer3 = consumeSession3.createDurableSubscriber((Topic)consumerDestination2, getName()); + connection3.close(); + int count = 0; + for (int loop = 0; loop < 4; ++loop) { + connection3 = createConnection(); + connection3.setClientID("test"); + connection3.start(); + consumeSession3 = connection3.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer3 = consumeSession3.createDurableSubscriber((Topic)consumerDestination2, getName()); + Message msg = null; + int i; + for (i = 0; i < NMSG / 4; i++) { + msg = consumer3.receive(10000); + if (msg == null) { + break; + } + if (verbose) { + LOG.debug("Received(" + loop + "): " + i + " count = " + msg.getIntProperty(COUNT_PROPERY_NAME)); + } + assertNotNull(msg); + assertEquals(msg.getJMSType(), "test"); + assertEquals(msg.getStringProperty("test"), "test"); + assertEquals("Messages received out of order", count, msg.getIntProperty(COUNT_PROPERY_NAME)); + Thread.sleep(500); + msg.acknowledge(); + count++; + } + consumer3.close(); + assertEquals("Receiver " + loop, NMSG / 4, i); + connection3.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java new file mode 100644 index 0000000..2858302 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.ResourceAllocationException; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.transport.RequestTimedOutIOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JmsTimeoutTest extends EmbeddedBrokerTestSupport { + + static final Logger LOG = LoggerFactory.getLogger(JmsTimeoutTest.class); + + private final int messageSize=1024*64; + private final int messageCount=10000; + private final AtomicInteger exceptionCount = new AtomicInteger(0); + + /** + * Test the case where the broker is blocked due to a memory limit + * and a producer timeout is set on the connection. + * @throws Exception + */ + public void testBlockedProducerConnectionTimeout() throws Exception { + final ActiveMQConnection cx = (ActiveMQConnection)createConnection(); + final ActiveMQDestination queue = createDestination("testqueue"); + + // we should not take longer than 10 seconds to return from send + cx.setSendTimeout(10000); + + Runnable r = new Runnable() { + public void run() { + try { + LOG.info("Sender thread starting"); + Session session = cx.createSession(false, 1); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + TextMessage message = session.createTextMessage(createMessageText()); + for(int count=0; count 0); + } + + /** + * Test the case where the broker is blocked due to a memory limit + * with a fail timeout + * @throws Exception + */ + public void testBlockedProducerUsageSendFailTimeout() throws Exception { + final ActiveMQConnection cx = (ActiveMQConnection)createConnection(); + final ActiveMQDestination queue = createDestination("testqueue"); + + broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000); + Runnable r = new Runnable() { + public void run() { + try { + LOG.info("Sender thread starting"); + Session session = cx.createSession(false, 1); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + TextMessage message = session.createTextMessage(createMessageText()); + for(int count=0; count 0); + } + + protected void setUp() throws Exception { + exceptionCount.set(0); + bindAddress = "tcp://localhost:0"; + broker = createBroker(); + broker.setDeleteAllMessagesOnStartup(true); + broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024); + + super.setUp(); + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory( + broker.getTransportConnectors().get(0).getPublishableConnectString()); + } + + private String createMessageText() { + StringBuffer buffer = new StringBuffer(); + buffer.append(""); + for (int i = buffer.length(); i < messageSize; i++) { + buffer.append('X'); + } + buffer.append(""); + return buffer.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java new file mode 100644 index 0000000..e8d5371 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import java.util.Vector; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.runners.BlockJUnit4ClassRunner; +import org.junit.runner.RunWith; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; + + +@RunWith(BlockJUnit4ClassRunner.class) +public class MemoryUsageBlockResumeTest extends TestSupport implements Thread.UncaughtExceptionHandler { + + public int deliveryMode = DeliveryMode.PERSISTENT; + + private static final Logger LOG = LoggerFactory.getLogger(MemoryUsageBlockResumeTest.class); + private static byte[] buf = new byte[4 * 1024]; + private static byte[] bigBuf = new byte[48 * 1024]; + + private BrokerService broker; + AtomicInteger messagesSent = new AtomicInteger(0); + AtomicInteger messagesConsumed = new AtomicInteger(0); + + protected long messageReceiveTimeout = 10000L; + + Destination destination = new ActiveMQQueue("FooTwo"); + Destination bigDestination = new ActiveMQQueue("FooTwoBig"); + + private String connectionUri; + private final Vector exceptions = new Vector(); + + @Test(timeout = 60 * 1000) + public void testBlockByOtherResumeNoException() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + + // ensure more than on message can be pending when full + factory.setProducerWindowSize(48*1024); + // ensure messages are spooled to disk for this consumer + ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); + prefetch.setTopicPrefetch(10); + factory.setPrefetchPolicy(prefetch); + Connection consumerConnection = factory.createConnection(); + consumerConnection.start(); + + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(bigDestination); + + final Connection producerConnection = factory.createConnection(); + producerConnection.start(); + + final int fillWithBigCount = 10; + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + producer.setDeliveryMode(deliveryMode); + for (int idx = 0; idx < fillWithBigCount; ++idx) { + Message message = session.createTextMessage(new String(bigBuf) + idx); + producer.send(bigDestination, message); + messagesSent.incrementAndGet(); + LOG.info("After big: " + idx + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); + } + + // will block on pfc + final int toSend = 20; + Thread producingThread = new Thread("Producing thread") { + @Override + public void run() { + try { + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(deliveryMode); + for (int idx = 0; idx < toSend; ++idx) { + Message message = session.createTextMessage(new String(buf) + idx); + producer.send(destination, message); + messagesSent.incrementAndGet(); + LOG.info("After little:" + idx + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); + } + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + }; + producingThread.start(); + + Thread producingThreadTwo = new Thread("Producing thread") { + @Override + public void run() { + try { + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(deliveryMode); + for (int idx = 0; idx < toSend; ++idx) { + Message message = session.createTextMessage(new String(buf) + idx); + producer.send(destination, message); + messagesSent.incrementAndGet(); + LOG.info("After little:" + idx + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); + } + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + }; + producingThreadTwo.start(); + + assertTrue("producer has sent x in a reasonable time", Wait.waitFor(new Wait.Condition() + { + @Override + public boolean isSatisified() throws Exception { + LOG.info("Checking for : X sent, System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + ", sent: " + messagesSent); + return messagesSent.get() > 20; + } + })); + + + LOG.info("Consuming from big q to allow delivery to smaller q from pending"); + int count = 0; + + Message m = null; + + for (;count < 10; count++) { + assertTrue((m = consumer.receive(messageReceiveTimeout)) != null); + LOG.info("Recieved Message (" + count + "):" + m + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); + messagesConsumed.incrementAndGet(); + } + consumer.close(); + + producingThread.join(); + producingThreadTwo.join(); + + assertEquals("Incorrect number of Messages Sent: " + messagesSent.get(), messagesSent.get(), fillWithBigCount + toSend*2); + + // consume all little messages + consumer = consumerSession.createConsumer(destination); + for (count = 0;count < toSend*2; count++) { + assertTrue((m = consumer.receive(messageReceiveTimeout)) != null); + LOG.info("Recieved Message (" + count + "):" + m + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() ); + messagesConsumed.incrementAndGet(); + } + + assertEquals("Incorrect number of Messages consumed: " + messagesConsumed.get(), messagesSent.get(), messagesConsumed.get()); + + //assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + } + + @Override + @Before + public void setUp() throws Exception { + + Thread.setDefaultUncaughtExceptionHandler(this); + broker = new BrokerService(); + broker.setDataDirectory("target" + File.separator + "activemq-data"); + broker.setPersistent(true); + broker.setUseJmx(false); + broker.setAdvisorySupport(false); + broker.setDeleteAllMessagesOnStartup(true); + + setDefaultPersistenceAdapter(broker); + broker.getSystemUsage().getMemoryUsage().setLimit((30 * 16 * 1024)); + + PolicyEntry defaultPolicy = new PolicyEntry(); + defaultPolicy.setOptimizedDispatch(true); + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(defaultPolicy); + broker.setDestinationPolicy(policyMap); + + broker.addConnector("tcp://localhost:0"); + broker.start(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + } + + @Override + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.error("Unexpected Unhandeled ex on: " + t, e); + exceptions.add(e); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java new file mode 100644 index 0000000..b229e0e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerTestSupport; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.apache.activemq.util.IOHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import java.io.File; + +public class MemoryUsageBrokerTest extends BrokerTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(MemoryUsageBrokerTest.class); + + protected void setUp() throws Exception { + this.setAutoFail(true); + super.setUp(); + } + + @Override + protected PolicyEntry getDefaultPolicy() { + PolicyEntry policy = super.getDefaultPolicy(); + // Disable PFC and assign a large memory limit that's larger than the default broker memory limit for queues + policy.setProducerFlowControl(false); + policy.setQueue(">"); + policy.setMemoryLimit(128 * 1024 * 1024); + return policy; + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + KahaDBStore kaha = new KahaDBStore(); + File directory = new File("target/activemq-data/kahadb"); + IOHelper.deleteChildren(directory); + kaha.setDirectory(directory); + kaha.deleteAllMessages(); + broker.setPersistenceAdapter(kaha); + return broker; + } + + protected ConnectionFactory createConnectionFactory() { + return new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + } + + protected Connection createJmsConnection() throws JMSException { + return createConnectionFactory().createConnection(); + } + + public void testMemoryUsage() throws Exception { + Connection conn = createJmsConnection(); + Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("queue.a.b"); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < 100000; i++) { + BytesMessage bm = session.createBytesMessage(); + bm.writeBytes(new byte[1024]); + producer.send(bm); + if ((i + 1) % 100 == 0) { + session.commit(); + int memoryUsagePercent = broker.getSystemUsage().getMemoryUsage().getPercentUsage(); + LOG.info((i + 1) + " messages have been sent; broker memory usage " + memoryUsagePercent + "%"); + assertTrue("Used more than available broker memory", memoryUsagePercent <= 100); + } + } + session.commit(); + producer.close(); + session.close(); + conn.close(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java new file mode 100644 index 0000000..e7feb90 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MemoryUsageCleanupTest { + + private static final Logger LOG = LoggerFactory.getLogger(MemoryUsageCleanupTest.class); + private static final String QUEUE_NAME = MemoryUsageCleanupTest.class.getName() + "Queue"; + + private final String str = new String( + "QAa0bcLdUK2eHfJgTP8XhiFj61DOklNm9nBoI5pGqYVrs3CtSuMZvwWx4yE7zR"); + + private BrokerService broker; + private String connectionUri; + private ExecutorService pool; + private String queueName; + private Random r = new Random(); + + @Before + public void setUp() throws Exception { + + broker = new BrokerService(); + broker.setDataDirectory("target" + File.separator + "activemq-data"); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.setDedicatedTaskRunner(false); + broker.setAdvisorySupport(false); + broker.setDeleteAllMessagesOnStartup(true); + + SharedDeadLetterStrategy strategy = new SharedDeadLetterStrategy(); + strategy.setProcessExpired(false); + strategy.setProcessNonPersistent(false); + + PolicyEntry defaultPolicy = new PolicyEntry(); + defaultPolicy.setQueue(">"); + defaultPolicy.setOptimizedDispatch(true); + defaultPolicy.setDeadLetterStrategy(strategy); + defaultPolicy.setMemoryLimit(300000000); + + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(defaultPolicy); + + broker.setDestinationPolicy(policyMap); + + broker.getSystemUsage().getMemoryUsage().setLimit(300000000L); + + broker.addConnector("tcp://localhost:0").setName("Default"); + broker.start(); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + pool = Executors.newFixedThreadPool(10); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + + if (pool != null) { + pool.shutdown(); + } + } + + @Test + public void testIt() throws Exception { + + final int startPercentage = broker.getAdminView().getMemoryPercentUsage(); + LOG.info("MemoryUseage at test start = " + startPercentage); + + for (int i = 0; i < 2; i++) { + LOG.info("Started the test iteration: " + i + " using queueName = " + queueName); + queueName = QUEUE_NAME + i; + final CountDownLatch latch = new CountDownLatch(11); + + pool.execute(new Runnable() { + @Override + public void run() { + receiveAndDiscard100messages(latch); + } + }); + + for (int j = 0; j < 10; j++) { + pool.execute(new Runnable() { + @Override + public void run() { + send10000messages(latch); + } + }); + } + + LOG.info("Waiting on the send / receive latch"); + latch.await(5, TimeUnit.MINUTES); + LOG.info("Resumed"); + + destroyQueue(); + TimeUnit.SECONDS.sleep(2); + } + + LOG.info("MemoryUseage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage()); + + assertTrue("MemoryUsage should return to: " + startPercentage + + "% but was " + broker.getAdminView().getMemoryPercentUsage() + "%", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return broker.getAdminView().getMemoryPercentUsage() <= startPercentage + 1; + } + })); + + int endPercentage = broker.getAdminView().getMemoryPercentUsage(); + LOG.info("MemoryUseage at test end = " + endPercentage); + } + + public void destroyQueue() { + try { + Broker broker = this.broker.getBroker(); + if (!broker.isStopped()) { + LOG.info("Removing: " + queueName); + broker.removeDestination(this.broker.getAdminConnectionContext(), new ActiveMQQueue(queueName), 10); + } + } catch (Exception e) { + LOG.warn("Got an error while removing the test queue", e); + } + } + + private void send10000messages(CountDownLatch latch) { + ActiveMQConnection activeMQConnection = null; + try { + activeMQConnection = createConnection(null); + Session session = activeMQConnection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session + .createQueue(queueName)); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + activeMQConnection.start(); + for (int i = 0; i < 10000; i++) { + TextMessage textMessage = session.createTextMessage(); + textMessage.setText(generateBody(1000)); + textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(textMessage); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + producer.close(); + } catch (JMSException e) { + LOG.warn("Got an error while sending the messages", e); + } finally { + if (activeMQConnection != null) { + try { + activeMQConnection.close(); + } catch (JMSException e) { + } + } + } + latch.countDown(); + } + + private void receiveAndDiscard100messages(CountDownLatch latch) { + ActiveMQConnection activeMQConnection = null; + try { + activeMQConnection = createConnection(null); + Session session = activeMQConnection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createConsumer( + session.createQueue(queueName)); + activeMQConnection.start(); + for (int i = 0; i < 100; i++) { + messageConsumer.receive(); + } + messageConsumer.close(); + LOG.info("Created and disconnected"); + } catch (JMSException e) { + LOG.warn("Got an error while receiving the messages", e); + } finally { + if (activeMQConnection != null) { + try { + activeMQConnection.close(); + } catch (JMSException e) { + } + } + } + latch.countDown(); + } + + private ActiveMQConnection createConnection(String id) throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + if (id != null) { + factory.setClientID(id); + } + + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + return connection; + } + + private String generateBody(int length) { + + StringBuilder sb = new StringBuilder(); + int te = 0; + for (int i = 1; i <= length; i++) { + te = r.nextInt(62); + sb.append(str.charAt(te)); + } + return sb.toString(); + } +}