activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [12/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:36:43 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java
new file mode 100644
index 0000000..b32c7ad
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * @author James Furness
+ *         https://issues.apache.org/jira/browse/AMQ-3607
+ */
+public class ActiveMQSlowConsumerManualTest {
+    private static final int PORT = 12345;
+    private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC");
+    private static final String URL = "nio://localhost:" + PORT + "?socket.tcpNoDelay=true";
+
+    @Test(timeout = 60000)
+    public void testDefaultSettings() throws Exception {
+        runTest("testDefaultSettings", 30, -1, -1, false, false, false, false);
+    }
+
+    @Test(timeout = 60000)
+    public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception {
+        runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1, false, false, true, false);
+    }
+
+    @Test(timeout = 60000)
+    public void testBounded() throws Exception {
+        runTest("testBounded", 30, 5, 25, false, false, false, false);
+    }
+
+    @Test(timeout = 60000)
+    public void testBoundedWithOptimiseAcknowledge() throws Exception {
+        runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 25, false, false, true, false);
+    }
+
+    public void runTest(String name, int sendMessageCount, int prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean disableFlowControl, boolean optimizeAcknowledge, boolean persistent) throws Exception {
+        BrokerService broker = createBroker(persistent);
+        broker.setDestinationPolicy(buildPolicy(TOPIC, prefetchLimit, messageLimit, evictOldestMessage, disableFlowControl));
+        broker.start();
+
+        // Slow consumer
+        Session slowConsumerSession = buildSession("SlowConsumer", URL, optimizeAcknowledge);
+        final CountDownLatch blockSlowConsumer = new CountDownLatch(1);
+        final AtomicInteger slowConsumerReceiveCount = new AtomicInteger();
+        final List<Integer> slowConsumerReceived = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
+        MessageConsumer slowConsumer = createSubscriber(slowConsumerSession,
+                new MessageListener() {
+                    @Override
+                    public void onMessage(Message message) {
+                        try {
+                            slowConsumerReceiveCount.incrementAndGet();
+                            int count = Integer.parseInt(((TextMessage) message).getText());
+                            if (slowConsumerReceived != null) slowConsumerReceived.add(count);
+                            if (count % 10000 == 0) System.out.println("SlowConsumer: Receive " + count);
+                            blockSlowConsumer.await();
+                        } catch (Exception ignored) {
+                        }
+                    }
+                }
+        );
+
+        // Fast consumer
+        Session fastConsumerSession = buildSession("FastConsumer", URL, optimizeAcknowledge);
+        final AtomicInteger fastConsumerReceiveCount = new AtomicInteger();
+        final List<Integer> fastConsumerReceived = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
+        MessageConsumer fastConsumer = createSubscriber(fastConsumerSession,
+                new MessageListener() {
+                    @Override
+                    public void onMessage(Message message) {
+                        try {
+                            fastConsumerReceiveCount.incrementAndGet();
+                            TimeUnit.MILLISECONDS.sleep(5);
+                            int count = Integer.parseInt(((TextMessage) message).getText());
+                            if (fastConsumerReceived != null) fastConsumerReceived.add(count);
+                            if (count % 10000 == 0) System.out.println("FastConsumer: Receive " + count);
+                        } catch (Exception ignored) {
+                        }
+                    }
+                }
+        );
+
+        // Wait for consumers to connect
+        Thread.sleep(500);
+
+        // Publisher
+        AtomicInteger sentCount = new AtomicInteger();
+        List<Integer> sent = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
+        Session publisherSession = buildSession("Publisher", URL, optimizeAcknowledge);
+        MessageProducer publisher = createPublisher(publisherSession);
+        for (int i = 0; i < sendMessageCount; i++) {
+            sentCount.incrementAndGet();
+            if (sent != null) sent.add(i);
+            if (i % 10000 == 0) System.out.println("Publisher: Send " + i);
+            publisher.send(publisherSession.createTextMessage(Integer.toString(i)));
+        }
+
+        // Wait for messages to arrive
+        Thread.sleep(500);
+
+        System.out.println(name + ": Publisher Sent: " + sentCount + " " + sent);
+        System.out.println(name + ": Whilst slow consumer blocked:");
+        System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived);
+        System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived);
+
+        // Unblock slow consumer
+        blockSlowConsumer.countDown();
+
+        // Wait for messages to arrive
+        Thread.sleep(500);
+
+        System.out.println(name + ": After slow consumer unblocked:");
+        System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived);
+        System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived);
+        System.out.println();
+
+        publisher.close();
+        publisherSession.close();
+        slowConsumer.close();
+        slowConsumerSession.close();
+        fastConsumer.close();
+        fastConsumerSession.close();
+        broker.stop();
+
+        Assert.assertEquals("Fast consumer missed messages whilst slow consumer was blocking", sent, fastConsumerReceived);
+        // this is too timine dependent  as sometimes there is message eviction, would need to check the dlq
+        //Assert.assertEquals("Slow consumer received incorrect message count", Math.min(sendMessageCount, prefetchLimit + (messageLimit > 0 ? messageLimit : Integer.MAX_VALUE)), slowConsumerReceived.size());
+    }
+
+    private static BrokerService createBroker(boolean persistent) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("TestBroker");
+        broker.setPersistent(persistent);
+        broker.addConnector(URL);
+        return broker;
+    }
+
+    private static MessageConsumer createSubscriber(Session session, MessageListener messageListener) throws JMSException {
+        MessageConsumer consumer = session.createConsumer(TOPIC);
+        consumer.setMessageListener(messageListener);
+        return consumer;
+    }
+
+    private static MessageProducer createPublisher(Session session) throws JMSException {
+        MessageProducer producer = session.createProducer(TOPIC);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        return producer;
+    }
+
+    private static Session buildSession(String clientId, String url, boolean optimizeAcknowledge) throws JMSException {
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
+
+        connectionFactory.setCopyMessageOnSend(false);
+        connectionFactory.setDisableTimeStampsByDefault(true);
+        connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge);
+        if (optimizeAcknowledge) {
+            connectionFactory.setOptimizeAcknowledgeTimeOut(1);
+        }
+
+        Connection connection = connectionFactory.createConnection();
+        connection.setClientID(clientId);
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        connection.start();
+
+        return session;
+    }
+
+    private static PolicyMap buildPolicy(ActiveMQTopic topic, int prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean disableFlowControl) {
+        PolicyMap policyMap = new PolicyMap();
+
+        PolicyEntry policyEntry = new PolicyEntry();
+
+        if (evictOldestMessage) {
+            policyEntry.setMessageEvictionStrategy(new OldestMessageEvictionStrategy());
+        }
+
+        if (disableFlowControl) {
+            policyEntry.setProducerFlowControl(false);
+        }
+
+        if (prefetchLimit > 0) {
+            policyEntry.setTopicPrefetch(prefetchLimit);
+        }
+
+        if (messageLimit > 0) {
+            ConstantPendingMessageLimitStrategy messageLimitStrategy = new ConstantPendingMessageLimitStrategy();
+            messageLimitStrategy.setLimit(messageLimit);
+            policyEntry.setPendingMessageLimitStrategy(messageLimitStrategy);
+        }
+
+        policyMap.put(topic, policyEntry);
+
+        return policyMap;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
new file mode 100644
index 0000000..8c580a9
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.activemq.bugs;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionPerMessageTest extends EmbeddedBrokerTestSupport {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ConnectionPerMessageTest.class);
+	private static final int COUNT = 2000;
+	protected String bindAddress;
+
+	public void testConnectionPerMessage() throws Exception {
+		final String topicName = "test.topic";
+
+		LOG.info("Initializing connection factory for JMS to URL: "
+				+ bindAddress);
+		final ActiveMQConnectionFactory normalFactory = new ActiveMQConnectionFactory();
+		normalFactory.setBrokerURL(bindAddress);
+		for (int i = 0; i < COUNT; i++) {
+
+			if (i % 100 == 0) {
+				LOG.info(new Integer(i).toString());
+			}
+
+			Connection conn = null;
+			try {
+
+				conn = normalFactory.createConnection();
+				final Session session = conn.createSession(false,
+						Session.AUTO_ACKNOWLEDGE);
+				final Topic topic = session.createTopic(topicName);
+				final MessageProducer producer = session.createProducer(topic);
+				producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+				final MapMessage m = session.createMapMessage();
+				m.setInt("hey", i);
+
+				producer.send(m);
+
+			} catch (JMSException e) {
+				LOG.warn(e.getMessage(), e);
+			} finally {
+				if (conn != null)
+					try {
+						conn.close();
+					} catch (JMSException e) {
+						LOG.warn(e.getMessage(), e);
+					}
+			}
+		}
+	}
+
+	protected void setUp() throws Exception {
+		bindAddress = "vm://localhost";
+		super.setUp();
+	}
+
+	protected BrokerService createBroker() throws Exception {
+		BrokerService answer = new BrokerService();
+        answer.setDeleteAllMessagesOnStartup(true);
+		answer.setUseJmx(false);
+		answer.setPersistent(isPersistent());
+		answer.addConnector(bindAddress);
+		return answer;
+	}
+
+	protected boolean isPersistent() {
+		return true;
+	}
+
+	protected void tearDown() throws Exception {
+		super.tearDown();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
new file mode 100644
index 0000000..f956da6
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class CraigsBugTest extends EmbeddedBrokerTestSupport {
+
+    private String connectionUri;
+
+    public void testConnectionFactory() throws Exception {
+        final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
+        final ActiveMQQueue queue = new ActiveMQQueue("testqueue");
+        final Connection conn = cf.createConnection();
+
+        Runnable r = new Runnable() {
+            public void run() {
+                try {
+                    Session session = conn.createSession(false, 1);
+                    MessageConsumer consumer = session.createConsumer(queue, null);
+                    consumer.receive(1000);
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        new Thread(r).start();
+        conn.start();
+
+        try {
+            synchronized (this) {
+                wait(3000);
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://localhost:0";
+        super.setUp();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java
new file mode 100644
index 0000000..bb66943
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.TimeoutException;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.Assert;
+
+public class DoubleExpireTest extends EmbeddedBrokerTestSupport {
+
+	private static final long MESSAGE_TTL_MILLIS = 1000;
+	private static final long MAX_TEST_TIME_MILLIS = 60000;
+
+	public void setUp() throws Exception {
+		setAutoFail(true);
+		setMaxTestTime(MAX_TEST_TIME_MILLIS);
+		super.setUp();
+	}
+
+	/**
+	 * This test verifies that a message that expires can be be resent to queue
+	 * with a new expiration and that it will be processed as a new message and
+	 * allowed to re-expire.
+	 * <p>
+	 * <b>NOTE:</b> This test fails on AMQ 5.4.2 because the originalExpiration
+	 * timestamp is not cleared when the message is resent.
+	 */
+	public void testDoubleExpireWithoutMove() throws Exception {
+		// Create the default dead letter queue.
+		final ActiveMQDestination DLQ = createDestination("ActiveMQ.DLQ");
+
+		Connection conn = createConnection();
+		try {
+			conn.start();
+			Session session = conn.createSession(false,
+					Session.AUTO_ACKNOWLEDGE);
+
+			// Verify that the test queue and DLQ are empty.
+			Assert.assertEquals(0, getSize(destination));
+			Assert.assertEquals(0, getSize(DLQ));
+
+			// Enqueue a message to the test queue that will expire after 1s.
+			MessageProducer producer = session.createProducer(destination);
+			Message testMessage = session.createTextMessage("test message");
+			producer.send(testMessage, Message.DEFAULT_DELIVERY_MODE,
+					Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS);
+			Assert.assertEquals(1, getSize(destination));
+
+			// Wait for the message to expire.
+			waitForSize(destination, 0, MAX_TEST_TIME_MILLIS);
+			Assert.assertEquals(1, getSize(DLQ));
+
+			// Consume the message from the DLQ and re-enqueue it to the test
+			// queue so that it expires after 1s.
+			MessageConsumer consumer = session.createConsumer(DLQ);
+			Message expiredMessage = consumer.receive();
+			Assert.assertEquals(testMessage.getJMSMessageID(), expiredMessage
+					.getJMSMessageID());
+
+			producer.send(expiredMessage, Message.DEFAULT_DELIVERY_MODE,
+					Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS);
+			Assert.assertEquals(1, getSize(destination));
+			Assert.assertEquals(0, getSize(DLQ));
+
+			// Verify that the resent message is "different" in that it has
+			// another ID.
+			Assert.assertNotSame(testMessage.getJMSMessageID(), expiredMessage
+					.getJMSMessageID());
+
+			// Wait for the message to re-expire.
+			waitForSize(destination, 0, MAX_TEST_TIME_MILLIS);
+			Assert.assertEquals(1, getSize(DLQ));
+
+			// Re-consume the message from the DLQ.
+			Message reexpiredMessage = consumer.receive();
+			Assert.assertEquals(expiredMessage.getJMSMessageID(), reexpiredMessage
+					.getJMSMessageID());
+		} finally {
+			conn.close();
+		}
+	}
+
+	/**
+	 * A helper method that returns the embedded broker's implementation of a
+	 * JMS queue.
+	 */
+	private Queue getPhysicalDestination(ActiveMQDestination destination)
+			throws Exception {
+		return (Queue) broker.getAdminView().getBroker().getDestinationMap()
+				.get(destination);
+	}
+
+	/**
+	 * A helper method that returns the size of the specified queue/topic.
+	 */
+	private long getSize(ActiveMQDestination destination) throws Exception {
+		return getPhysicalDestination(destination) != null ? getPhysicalDestination(
+				destination).getDestinationStatistics().getMessages()
+				.getCount()
+				: 0;
+	}
+
+	/**
+	 * A helper method that waits for a destination to reach a certain size.
+	 */
+	private void waitForSize(ActiveMQDestination destination, int size,
+			long timeoutMillis) throws Exception, TimeoutException {
+		long startTimeMillis = System.currentTimeMillis();
+
+		while (getSize(destination) != size
+				&& System.currentTimeMillis() < (startTimeMillis + timeoutMillis)) {
+			Thread.sleep(250);
+		}
+
+		if (getSize(destination) != size) {
+			throw new TimeoutException("Destination "
+					+ destination.getPhysicalName() + " did not reach size "
+					+ size + " within " + timeoutMillis + "ms.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
new file mode 100644
index 0000000..eeee82b
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.management.ObjectName;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  A Test case for AMQ-1479
+ */
+public class DurableConsumerTest extends CombinationTestSupport{
+    private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerTest.class);
+    private static int COUNT = 1024;
+    private static String CONSUMER_NAME = "DURABLE_TEST";
+    protected BrokerService broker;
+    
+    protected String bindAddress = "tcp://localhost:61616";
+    
+    protected byte[] payload = new byte[1024 * 32];
+    protected ConnectionFactory factory;
+    protected Vector<Exception> exceptions = new Vector<Exception>();
+    
+    private static final String TOPIC_NAME = "failoverTopic";
+    private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
+    public boolean useDedicatedTaskRunner = false;
+    
+    private class SimpleTopicSubscriber implements MessageListener,ExceptionListener{
+        
+        private TopicConnection topicConnection = null;
+        
+        public SimpleTopicSubscriber(String connectionURL,String clientId,String topicName) {
+            
+            ActiveMQConnectionFactory topicConnectionFactory = null;
+            TopicSession topicSession = null;
+            Topic topic = null;
+            TopicSubscriber topicSubscriber = null;
+            
+            topicConnectionFactory = new ActiveMQConnectionFactory(connectionURL);
+            try {
+                
+                topic = new ActiveMQTopic(topicName);
+                topicConnection = topicConnectionFactory.createTopicConnection();
+                topicConnection.setClientID((clientId));
+                topicConnection.start();
+                
+                topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+                topicSubscriber = topicSession.createDurableSubscriber(topic, (clientId));
+                topicSubscriber.setMessageListener(this);
+                
+            } catch (JMSException e) {
+                e.printStackTrace();
+            }
+        }
+        
+        public void onMessage(Message arg0){
+        }
+        
+        public void closeConnection(){
+            if (topicConnection != null) {
+                try {
+                    topicConnection.close();
+                } catch (JMSException e) {
+                }
+            }
+        }
+        
+        public void onException(JMSException exception){
+            exceptions.add(exception);
+        }
+    }
+    
+    private class MessagePublisher implements Runnable{
+        private final boolean shouldPublish = true;
+        
+        public void run(){
+            TopicConnectionFactory topicConnectionFactory = null;
+            TopicConnection topicConnection = null;
+            TopicSession topicSession = null;
+            Topic topic = null;
+            TopicPublisher topicPublisher = null;
+            Message message = null;
+            
+            topicConnectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
+            try {
+                topic = new ActiveMQTopic(TOPIC_NAME);
+                topicConnection = topicConnectionFactory.createTopicConnection();
+                topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+                topicPublisher = topicSession.createPublisher(topic);
+                message = topicSession.createMessage();
+            } catch (Exception ex) {
+                exceptions.add(ex);
+            }
+            while (shouldPublish) {
+                try {
+                    topicPublisher.publish(message, DeliveryMode.PERSISTENT, 1, 2 * 60 * 60 * 1000);
+                } catch (JMSException ex) {
+                    exceptions.add(ex);
+                }
+                try {
+                    Thread.sleep(1);
+                } catch (Exception ex) {
+                }
+            }
+        }
+    }
+    
+    private void configurePersistence(BrokerService broker) throws Exception{
+        File dataDirFile = new File("target/" + getName());
+        KahaDBPersistenceAdapter kahaDBAdapter = new KahaDBPersistenceAdapter();
+        kahaDBAdapter.setDirectory(dataDirFile);
+        broker.setPersistenceAdapter(kahaDBAdapter);
+    }
+    
+    public void testFailover() throws Exception{
+        
+        configurePersistence(broker);
+        broker.start();
+        
+        Thread publisherThread = new Thread(new MessagePublisher());
+        publisherThread.start();
+        final int numSubs = 100;
+        final List<SimpleTopicSubscriber> list = new ArrayList<SimpleTopicSubscriber>(numSubs);
+        for (int i = 0; i < numSubs; i++) {
+            
+            final int id = i;
+            Thread thread = new Thread(new Runnable(){
+                public void run(){
+                    SimpleTopicSubscriber s =new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME);
+                    list.add(s);
+                }
+            });
+            thread.start();
+            
+        }
+
+        Wait.waitFor(new Wait.Condition(){
+            @Override
+            public boolean isSatisified() throws Exception {
+                return numSubs == list.size();
+            }
+        });
+
+        broker.stop();
+        broker = createBroker(false);
+        configurePersistence(broker);
+        broker.start();
+        Thread.sleep(10000);
+        for (SimpleTopicSubscriber s:list) {
+            s.closeConnection();
+        }
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+    }
+    
+    // makes heavy use of threads and can demonstrate https://issues.apache.org/activemq/browse/AMQ-2028
+    // with use dedicatedTaskRunner=true and produce OOM
+    public void initCombosForTestConcurrentDurableConsumer(){
+        addCombinationValues("useDedicatedTaskRunner", new Object[] { Boolean.TRUE, Boolean.FALSE });
+    }
+    
+    public void testConcurrentDurableConsumer() throws Exception{
+        
+        broker.start();
+        broker.waitUntilStarted();
+        
+        factory = createConnectionFactory();
+        final String topicName = getName();
+        final int numMessages = 500;
+        int numConsumers = 1;
+        final CountDownLatch counsumerStarted = new CountDownLatch(numConsumers);
+        final AtomicInteger receivedCount = new AtomicInteger();
+        Runnable consumer = new Runnable(){
+            public void run(){
+                final String consumerName = Thread.currentThread().getName();
+                int acked = 0;
+                int received = 0;
+                
+                try {
+                    while (acked < numMessages / 2) {
+                        // take one message and close, ack on occasion
+                        Connection consumerConnection = factory.createConnection();
+                        ((ActiveMQConnection) consumerConnection).setWatchTopicAdvisories(false);
+                        consumerConnection.setClientID(consumerName);
+                        Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                        Topic topic = consumerSession.createTopic(topicName);
+                        consumerConnection.start();
+                        
+                        MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, consumerName);
+                        
+                        counsumerStarted.countDown();
+                        Message msg = null;
+                        do {
+                            msg = consumer.receive(5000);
+                            if (msg != null) {
+                                receivedCount.incrementAndGet();
+                                if (received != 0 && received % 100 == 0) {
+                                    LOG.info("Received msg: " + msg.getJMSMessageID());
+                                }
+                                if (++received % 2 == 0) {
+                                    msg.acknowledge();
+                                    acked++;
+                                }
+                            }
+                        } while (msg == null);
+
+                        consumerConnection.close();
+                    }
+                    assertTrue(received >= acked);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    exceptions.add(e);
+                }
+            }
+        };
+        
+        ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
+        
+        for (int i = 0; i < numConsumers; i++) {
+            executor.execute(consumer);
+        }
+        
+        assertTrue(counsumerStarted.await(30, TimeUnit.SECONDS));
+        
+        Connection producerConnection = factory.createConnection();
+        ((ActiveMQConnection) producerConnection).setWatchTopicAdvisories(false);
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = producerSession.createTopic(topicName);
+        MessageProducer producer = producerSession.createProducer(topic);
+        producerConnection.start();
+        for (int i = 0; i < numMessages; i++) {
+            BytesMessage msg = producerSession.createBytesMessage();
+            msg.writeBytes(payload);
+            producer.send(msg);
+            if (i != 0 && i % 100 == 0) {
+                LOG.info("Sent msg " + i);
+            }
+        }
+        
+        executor.shutdown();
+        executor.awaitTermination(30, TimeUnit.SECONDS);
+        
+        Wait.waitFor(new Wait.Condition(){
+            public boolean isSatisified() throws Exception{
+                LOG.info("receivedCount: " + receivedCount.get());
+                return receivedCount.get() == numMessages;
+            }
+        }, 360 * 1000);
+        assertEquals("got required some messages", numMessages, receivedCount.get());
+        assertTrue("no exceptions, but: " + exceptions, exceptions.isEmpty());
+    }
+    
+    public void testConsumerRecover() throws Exception{
+        doTestConsumer(true);
+    }
+    
+    public void testConsumer() throws Exception{
+        doTestConsumer(false);
+    }
+
+    public void testPrefetchViaBrokerConfig() throws Exception {
+
+        Integer prefetchVal = new Integer(150);
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setDurableTopicPrefetch(prefetchVal.intValue());
+        policyEntry.setPrioritizedMessages(true);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
+        broker.start();
+
+        factory = createConnectionFactory();
+        Connection consumerConnection = factory.createConnection();
+        consumerConnection.setClientID(CONSUMER_NAME);
+        Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = consumerSession.createTopic(getClass().getName());
+        MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
+        consumerConnection.start();
+
+        ObjectName activeSubscriptionObjectName = broker.getAdminView().getDurableTopicSubscribers()[0];
+        Object prefetchFromSubView = broker.getManagementContext().getAttribute(activeSubscriptionObjectName, "PrefetchSize");
+        assertEquals(prefetchVal, prefetchFromSubView);
+    }
+    
+    public void doTestConsumer(boolean forceRecover) throws Exception{
+        
+        if (forceRecover) {
+            configurePersistence(broker);
+        }
+        broker.start();
+        
+        factory = createConnectionFactory();
+        Connection consumerConnection = factory.createConnection();
+        consumerConnection.setClientID(CONSUMER_NAME);
+        Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = consumerSession.createTopic(getClass().getName());
+        MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
+        consumerConnection.start();
+        consumerConnection.close();
+        broker.stop();
+        broker = createBroker(false);
+        if (forceRecover) {
+            configurePersistence(broker);
+        }
+        broker.start();
+        
+        Connection producerConnection = factory.createConnection();
+        
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
+        MessageProducer producer = producerSession.createProducer(topic);
+        producerConnection.start();
+        for (int i = 0; i < COUNT; i++) {
+            BytesMessage msg = producerSession.createBytesMessage();
+            msg.writeBytes(payload);
+            producer.send(msg);
+            if (i != 0 && i % 1000 == 0) {
+                LOG.info("Sent msg " + i);
+            }
+        }
+        producerConnection.close();
+        broker.stop();
+        broker = createBroker(false);
+        if (forceRecover) {
+            configurePersistence(broker);
+        }
+        broker.start();
+        
+        consumerConnection = factory.createConnection();
+        consumerConnection.setClientID(CONSUMER_NAME);
+        consumerConnection.start();
+        consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
+        consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
+        for (int i = 0; i < COUNT; i++) {
+            Message msg = consumer.receive(10000);
+            assertNotNull("Missing message: " + i, msg);
+            if (i != 0 && i % 1000 == 0) {
+                LOG.info("Received msg " + i);
+            }
+            
+        }
+        consumerConnection.close();
+        
+    }
+    
+    @Override
+    protected void setUp() throws Exception{
+        if (broker == null) {
+            broker = createBroker(true);
+        }
+        
+        super.setUp();
+    }
+    
+    @Override
+    protected void tearDown() throws Exception{
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        }
+    }
+    
+    protected Topic creatTopic(Session s,String destinationName) throws JMSException{
+        return s.createTopic(destinationName);
+    }
+    
+    /**
+     * Factory method to create a new broker
+     * 
+     * @throws Exception
+     */
+    protected BrokerService createBroker(boolean deleteStore) throws Exception{
+        BrokerService answer = new BrokerService();
+        configureBroker(answer, deleteStore);
+        return answer;
+    }
+    
+    protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception{
+        answer.setDeleteAllMessagesOnStartup(deleteStore);
+        KahaDBStore kaha = new KahaDBStore();
+        //kaha.setConcurrentStoreAndDispatchTopics(false);
+        File directory = new File("target/activemq-data/kahadb");
+        if (deleteStore) {
+            IOHelper.deleteChildren(directory);
+        }
+        kaha.setDirectory(directory);
+        //kaha.setMaxAsyncJobs(10);
+        
+        answer.setPersistenceAdapter(kaha);
+        answer.addConnector(bindAddress);
+        answer.setUseShutdownHook(false);
+        answer.setAdvisorySupport(false);
+        answer.setDedicatedTaskRunner(useDedicatedTaskRunner);
+    }
+    
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress);
+        factory.setUseDedicatedTaskRunner(useDedicatedTaskRunner);
+        return factory;
+    }
+    
+    public static Test suite(){
+        return suite(DurableConsumerTest.class);
+    }
+    
+    public static void main(String[] args){
+        junit.textui.TestRunner.run(suite());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java
new file mode 100644
index 0000000..80c4e9f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+
+/**
+ * 
+ */
+public class JMSDurableTopicNoLocalTest extends EmbeddedBrokerTestSupport {
+    protected String bindAddress;
+
+    public void testConsumeNoLocal() throws Exception {
+        final String TEST_NAME = getClass().getName();
+        Connection connection = createConnection();
+        connection.setClientID(TEST_NAME);
+        
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
+        TopicSubscriber subscriber = session.createDurableSubscriber((Topic) destination, "topicUser2", null, true);
+        
+        
+        final CountDownLatch latch = new CountDownLatch(1);
+        subscriber.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                System.out.println("Receive a message " + message);
+                latch.countDown();        
+            }   
+        });
+        
+        connection.start();
+        
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("THIS IS A TEST");
+        producer.send(message);
+        producer.close();
+        latch.await(5,TimeUnit.SECONDS);
+        assertEquals(latch.getCount(),1);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = "vm://localhost";
+        useTopic=true;
+        super.setUp();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setUseJmx(false);
+        answer.setPersistent(true);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.addConnector(bindAddress);
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
new file mode 100644
index 0000000..05a8c1d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.Properties;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.test.JmsTopicSendReceiveTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest {
+    
+    static final int NMSG = 200;
+    static final int MSIZE = 256000;
+    private static final transient Logger LOG = LoggerFactory.getLogger(JmsDurableTopicSlowReceiveTest.class);
+    private static final String COUNT_PROPERY_NAME = "count";
+
+    protected Connection connection2;
+    protected Session session2;
+    protected Session consumeSession2;
+    protected MessageConsumer consumer2;
+    protected MessageProducer producer2;
+    protected Destination consumerDestination2;
+    BrokerService broker;
+    private Connection connection3;
+    private Session consumeSession3;
+    private TopicSubscriber consumer3;
+
+    /**
+     * Set up a durable suscriber test.
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        this.durable = true;
+        broker = createBroker();
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        broker.stop();
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory result = new ActiveMQConnectionFactory("vm://localhost?async=false");
+        Properties props = new Properties();
+        props.put("prefetchPolicy.durableTopicPrefetch", "5");
+        props.put("prefetchPolicy.optimizeDurableTopicPrefetch", "5");
+        result.setProperties(props);
+        return result;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+
+    /**
+     * Test if all the messages sent are being received.
+     * 
+     * @throws Exception
+     */
+    public void testSlowReceiver() throws Exception {
+        connection2 = createConnection();
+        connection2.setClientID("test");
+        connection2.start();
+        consumeSession2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumerDestination2 = session2.createTopic(getConsumerSubject() + "2");
+        consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName());
+
+        consumer2.close();
+        connection2.close();
+        new Thread(new Runnable() {
+
+            public void run() {
+                try {
+                    int count = 0;
+                    for (int loop = 0; loop < 4; loop++) {
+                        connection2 = createConnection();
+                        connection2.start();
+                        session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                        producer2 = session2.createProducer(null);
+                        producer2.setDeliveryMode(deliveryMode);
+                        Thread.sleep(1000);
+                        for (int i = 0; i < NMSG / 4; i++) {
+                            BytesMessage message = session2.createBytesMessage();
+                            message.writeBytes(new byte[MSIZE]);
+                            message.setStringProperty("test", "test");
+                            message.setIntProperty(COUNT_PROPERY_NAME, count);
+                            message.setJMSType("test");
+                            producer2.send(consumerDestination2, message);
+                            Thread.sleep(50);
+                            if (verbose) {
+                                LOG.debug("Sent(" + loop + "): " + i);
+                            }
+                            count++;
+                        }
+                        producer2.close();
+                        connection2.stop();
+                        connection2.close();
+                    }
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        }, "SENDER Thread").start();
+        connection3 = createConnection();
+        connection3.setClientID("test");
+        connection3.start();
+        consumeSession3 = connection3.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumer3 = consumeSession3.createDurableSubscriber((Topic)consumerDestination2, getName());
+        connection3.close();
+        int count = 0;
+        for (int loop = 0; loop < 4; ++loop) {
+            connection3 = createConnection();
+            connection3.setClientID("test");
+            connection3.start();
+            consumeSession3 = connection3.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            consumer3 = consumeSession3.createDurableSubscriber((Topic)consumerDestination2, getName());
+            Message msg = null;
+            int i;
+            for (i = 0; i < NMSG / 4; i++) {
+                msg = consumer3.receive(10000);
+                if (msg == null) {
+                    break;
+                }
+                if (verbose) {
+                    LOG.debug("Received(" + loop + "): " + i + " count = " + msg.getIntProperty(COUNT_PROPERY_NAME));
+                }
+                assertNotNull(msg);
+                assertEquals(msg.getJMSType(), "test");
+                assertEquals(msg.getStringProperty("test"), "test");
+                assertEquals("Messages received out of order", count, msg.getIntProperty(COUNT_PROPERY_NAME));
+                Thread.sleep(500);
+                msg.acknowledge();
+                count++;
+            }
+            consumer3.close();
+            assertEquals("Receiver " + loop, NMSG / 4, i);
+            connection3.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
new file mode 100644
index 0000000..2858302
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.transport.RequestTimedOutIOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsTimeoutTest extends EmbeddedBrokerTestSupport {
+
+    static final Logger LOG = LoggerFactory.getLogger(JmsTimeoutTest.class);
+
+    private final int messageSize=1024*64;
+    private final int messageCount=10000;
+    private final AtomicInteger exceptionCount = new AtomicInteger(0);
+
+    /**
+     * Test the case where the broker is blocked due to a memory limit
+     * and a producer timeout is set on the connection.
+     * @throws Exception
+     */
+    public void testBlockedProducerConnectionTimeout() throws Exception {
+        final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
+        final ActiveMQDestination queue = createDestination("testqueue");
+
+        // we should not take longer than 10 seconds to return from send
+        cx.setSendTimeout(10000);
+
+        Runnable r = new Runnable() {
+            public void run() {
+                try {
+                    LOG.info("Sender thread starting");
+                    Session session = cx.createSession(false, 1);
+                    MessageProducer producer = session.createProducer(queue);
+                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+                    TextMessage message = session.createTextMessage(createMessageText());
+                    for(int count=0; count<messageCount; count++){
+                        producer.send(message);
+                    }
+                    LOG.info("Done sending..");
+                } catch (JMSException e) {
+                    if (e.getCause() instanceof RequestTimedOutIOException) {
+                        exceptionCount.incrementAndGet();
+                    } else {
+                        e.printStackTrace();
+                    }
+                    return;
+                }
+
+            }
+        };
+        cx.start();
+        Thread producerThread = new Thread(r);
+        producerThread.start();
+        producerThread.join(30000);
+        cx.close();
+        // We should have a few timeout exceptions as memory store will fill up
+        assertTrue("No exception from the broker", exceptionCount.get() > 0);
+    }
+
+    /**
+     * Test the case where the broker is blocked due to a memory limit
+     * with a fail timeout
+     * @throws Exception
+     */
+    public void testBlockedProducerUsageSendFailTimeout() throws Exception {
+        final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
+        final ActiveMQDestination queue = createDestination("testqueue");
+
+        broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
+        Runnable r = new Runnable() {
+            public void run() {
+                try {
+                    LOG.info("Sender thread starting");
+                    Session session = cx.createSession(false, 1);
+                    MessageProducer producer = session.createProducer(queue);
+                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+                    TextMessage message = session.createTextMessage(createMessageText());
+                    for(int count=0; count<messageCount; count++){
+                        producer.send(message);
+                    }
+                    LOG.info("Done sending..");
+                } catch (JMSException e) {
+                    if (e instanceof ResourceAllocationException || e.getCause() instanceof RequestTimedOutIOException) {
+                        exceptionCount.incrementAndGet();
+                    } else {
+                        e.printStackTrace();
+                    }
+                    return;
+                }
+            }
+        };
+        cx.start();
+        Thread producerThread = new Thread(r);
+        producerThread.start();
+        producerThread.join(30000);
+        cx.close();
+        // We should have a few timeout exceptions as memory store will fill up
+        assertTrue("No exception from the broker", exceptionCount.get() > 0);
+    }
+
+    protected void setUp() throws Exception {
+        exceptionCount.set(0);
+        bindAddress = "tcp://localhost:0";
+        broker = createBroker();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
+
+        super.setUp();
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(
+            broker.getTransportConnectors().get(0).getPublishableConnectString());
+    }
+
+    private String createMessageText() {
+        StringBuffer buffer = new StringBuffer();
+        buffer.append("<filler>");
+        for (int i = buffer.length(); i < messageSize; i++) {
+            buffer.append('X');
+        }
+        buffer.append("</filler>");
+        return buffer.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java
new file mode 100644
index 0000000..e8d5371
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runner.RunWith;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+
+@RunWith(BlockJUnit4ClassRunner.class)
+public class MemoryUsageBlockResumeTest extends TestSupport implements Thread.UncaughtExceptionHandler {
+
+    public int deliveryMode = DeliveryMode.PERSISTENT;
+
+    private static final Logger LOG = LoggerFactory.getLogger(MemoryUsageBlockResumeTest.class);
+    private static byte[] buf = new byte[4 * 1024];
+    private static byte[] bigBuf = new byte[48 * 1024];
+
+    private BrokerService broker;
+    AtomicInteger messagesSent = new AtomicInteger(0);
+    AtomicInteger messagesConsumed = new AtomicInteger(0);
+
+    protected long messageReceiveTimeout = 10000L;
+
+    Destination destination = new ActiveMQQueue("FooTwo");
+    Destination bigDestination = new ActiveMQQueue("FooTwoBig");
+
+    private String connectionUri;
+    private final Vector<Throwable> exceptions = new Vector<Throwable>();
+
+    @Test(timeout = 60 * 1000)
+    public void testBlockByOtherResumeNoException() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+
+        // ensure more than on message can be pending when full
+        factory.setProducerWindowSize(48*1024);
+        // ensure messages are spooled to disk for this consumer
+        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
+        prefetch.setTopicPrefetch(10);
+        factory.setPrefetchPolicy(prefetch);
+        Connection consumerConnection = factory.createConnection();
+        consumerConnection.start();
+
+        Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = consumerSession.createConsumer(bigDestination);
+
+        final Connection producerConnection = factory.createConnection();
+        producerConnection.start();
+
+        final int fillWithBigCount = 10;
+        Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+        producer.setDeliveryMode(deliveryMode);
+        for (int idx = 0; idx < fillWithBigCount; ++idx) {
+            Message message = session.createTextMessage(new String(bigBuf) + idx);
+            producer.send(bigDestination, message);
+            messagesSent.incrementAndGet();
+            LOG.info("After big: " + idx + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+        }
+
+        // will block on pfc
+        final int toSend = 20;
+        Thread producingThread = new Thread("Producing thread") {
+            @Override
+            public void run() {
+                try {
+                    Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(destination);
+                    producer.setDeliveryMode(deliveryMode);
+                    for (int idx = 0; idx < toSend; ++idx) {
+                        Message message = session.createTextMessage(new String(buf) + idx);
+                        producer.send(destination, message);
+                        messagesSent.incrementAndGet();
+                        LOG.info("After little:" + idx + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+                    }
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+        producingThread.start();
+
+        Thread producingThreadTwo = new Thread("Producing thread") {
+            @Override
+            public void run() {
+                try {
+                    Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(destination);
+                    producer.setDeliveryMode(deliveryMode);
+                    for (int idx = 0; idx < toSend; ++idx) {
+                        Message message = session.createTextMessage(new String(buf) + idx);
+                        producer.send(destination, message);
+                        messagesSent.incrementAndGet();
+                        LOG.info("After little:" + idx + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+                    }
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+        producingThreadTwo.start();
+
+        assertTrue("producer has sent x in a reasonable time", Wait.waitFor(new Wait.Condition()
+        {
+            @Override
+            public boolean isSatisified() throws Exception {
+                 LOG.info("Checking for : X sent, System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + ", sent:  " + messagesSent);
+                return messagesSent.get() > 20;
+            }
+        }));
+
+
+        LOG.info("Consuming from big q to allow delivery to smaller q from pending");
+        int count = 0;
+
+        Message m = null;
+
+        for (;count < 10; count++) {
+            assertTrue((m = consumer.receive(messageReceiveTimeout)) != null);
+            LOG.info("Recieved Message (" + count + "):" + m + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+            messagesConsumed.incrementAndGet();
+        }
+        consumer.close();
+
+        producingThread.join();
+        producingThreadTwo.join();
+
+        assertEquals("Incorrect number of Messages Sent: " + messagesSent.get(), messagesSent.get(), fillWithBigCount +  toSend*2);
+
+        // consume all little messages
+        consumer = consumerSession.createConsumer(destination);
+        for (count = 0;count < toSend*2; count++) {
+            assertTrue((m = consumer.receive(messageReceiveTimeout)) != null);
+            LOG.info("Recieved Message (" + count + "):" + m + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() );
+            messagesConsumed.incrementAndGet();
+        }
+
+        assertEquals("Incorrect number of Messages consumed: " + messagesConsumed.get(), messagesSent.get(), messagesConsumed.get());
+
+        //assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+
+        Thread.setDefaultUncaughtExceptionHandler(this);
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "activemq-data");
+        broker.setPersistent(true);
+        broker.setUseJmx(false);
+        broker.setAdvisorySupport(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        setDefaultPersistenceAdapter(broker);
+        broker.getSystemUsage().getMemoryUsage().setLimit((30 * 16 * 1024));
+
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setOptimizedDispatch(true);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(defaultPolicy);
+        broker.setDestinationPolicy(policyMap);
+
+        broker.addConnector("tcp://localhost:0");
+        broker.start();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+        LOG.error("Unexpected Unhandeled ex on: " + t, e);
+        exceptions.add(e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java
new file mode 100644
index 0000000..b229e0e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerTestSupport;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.File;
+
+public class MemoryUsageBrokerTest extends BrokerTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(MemoryUsageBrokerTest.class);
+
+    protected void setUp() throws Exception {
+        this.setAutoFail(true);
+        super.setUp();
+    }
+
+    @Override
+    protected PolicyEntry getDefaultPolicy() {
+        PolicyEntry policy = super.getDefaultPolicy();
+        // Disable PFC and assign a large memory limit that's larger than the default broker memory limit for queues
+        policy.setProducerFlowControl(false);
+        policy.setQueue(">");
+        policy.setMemoryLimit(128 * 1024 * 1024);
+        return policy;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        File directory = new File("target/activemq-data/kahadb");
+        IOHelper.deleteChildren(directory);
+        kaha.setDirectory(directory);
+        kaha.deleteAllMessages();
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+
+    protected ConnectionFactory createConnectionFactory() {
+        return new ActiveMQConnectionFactory(broker.getVmConnectorURI());
+    }
+
+    protected Connection createJmsConnection() throws JMSException {
+        return createConnectionFactory().createConnection();
+    }
+
+    public void testMemoryUsage() throws Exception {
+        Connection conn = createJmsConnection();
+        Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue.a.b");
+        MessageProducer producer = session.createProducer(queue);
+        for (int i = 0; i < 100000; i++) {
+            BytesMessage bm = session.createBytesMessage();
+            bm.writeBytes(new byte[1024]);
+            producer.send(bm);
+            if ((i + 1) % 100 == 0) {
+                session.commit();
+                int memoryUsagePercent = broker.getSystemUsage().getMemoryUsage().getPercentUsage();
+                LOG.info((i + 1) + " messages have been sent; broker memory usage " + memoryUsagePercent + "%");
+                assertTrue("Used more than available broker memory", memoryUsagePercent <= 100);
+            }
+        }
+        session.commit();
+        producer.close();
+        session.close();
+        conn.close();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java
new file mode 100644
index 0000000..e7feb90
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryUsageCleanupTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MemoryUsageCleanupTest.class);
+    private static final String QUEUE_NAME = MemoryUsageCleanupTest.class.getName() + "Queue";
+
+    private final String str = new String(
+        "QAa0bcLdUK2eHfJgTP8XhiFj61DOklNm9nBoI5pGqYVrs3CtSuMZvwWx4yE7zR");
+
+    private BrokerService broker;
+    private String connectionUri;
+    private ExecutorService pool;
+    private String queueName;
+    private Random r = new Random();
+
+    @Before
+    public void setUp() throws Exception {
+
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "activemq-data");
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setDedicatedTaskRunner(false);
+        broker.setAdvisorySupport(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        SharedDeadLetterStrategy strategy = new SharedDeadLetterStrategy();
+        strategy.setProcessExpired(false);
+        strategy.setProcessNonPersistent(false);
+
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setQueue(">");
+        defaultPolicy.setOptimizedDispatch(true);
+        defaultPolicy.setDeadLetterStrategy(strategy);
+        defaultPolicy.setMemoryLimit(300000000);
+
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(defaultPolicy);
+
+        broker.setDestinationPolicy(policyMap);
+
+        broker.getSystemUsage().getMemoryUsage().setLimit(300000000L);
+
+        broker.addConnector("tcp://localhost:0").setName("Default");
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+        pool = Executors.newFixedThreadPool(10);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+
+        if (pool != null) {
+            pool.shutdown();
+        }
+    }
+
+    @Test
+    public void testIt() throws Exception {
+
+        final int startPercentage = broker.getAdminView().getMemoryPercentUsage();
+        LOG.info("MemoryUseage at test start = " + startPercentage);
+
+        for (int i = 0; i < 2; i++) {
+            LOG.info("Started the test iteration: " + i + " using queueName = " + queueName);
+            queueName = QUEUE_NAME + i;
+            final CountDownLatch latch = new CountDownLatch(11);
+
+            pool.execute(new Runnable() {
+                @Override
+                public void run() {
+                    receiveAndDiscard100messages(latch);
+                }
+            });
+
+            for (int j = 0; j < 10; j++) {
+                pool.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        send10000messages(latch);
+                    }
+                });
+            }
+
+            LOG.info("Waiting on the send / receive latch");
+            latch.await(5, TimeUnit.MINUTES);
+            LOG.info("Resumed");
+
+            destroyQueue();
+            TimeUnit.SECONDS.sleep(2);
+        }
+
+        LOG.info("MemoryUseage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage());
+
+        assertTrue("MemoryUsage should return to: " + startPercentage +
+                   "% but was " + broker.getAdminView().getMemoryPercentUsage() + "%", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker.getAdminView().getMemoryPercentUsage() <= startPercentage + 1;
+            }
+        }));
+
+        int endPercentage = broker.getAdminView().getMemoryPercentUsage();
+        LOG.info("MemoryUseage at test end = " + endPercentage);
+    }
+
+    public void destroyQueue() {
+        try {
+            Broker broker = this.broker.getBroker();
+            if (!broker.isStopped()) {
+                LOG.info("Removing: " + queueName);
+                broker.removeDestination(this.broker.getAdminConnectionContext(), new ActiveMQQueue(queueName), 10);
+            }
+        } catch (Exception e) {
+            LOG.warn("Got an error while removing the test queue", e);
+        }
+    }
+
+    private void send10000messages(CountDownLatch latch) {
+        ActiveMQConnection activeMQConnection = null;
+        try {
+            activeMQConnection = createConnection(null);
+            Session session = activeMQConnection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(session
+                    .createQueue(queueName));
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            activeMQConnection.start();
+            for (int i = 0; i < 10000; i++) {
+                TextMessage textMessage = session.createTextMessage();
+                textMessage.setText(generateBody(1000));
+                textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                producer.send(textMessage);
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException e) {
+                }
+            }
+            producer.close();
+        } catch (JMSException e) {
+            LOG.warn("Got an error while sending the messages", e);
+        } finally {
+            if (activeMQConnection != null) {
+                try {
+                    activeMQConnection.close();
+                } catch (JMSException e) {
+                }
+            }
+        }
+        latch.countDown();
+    }
+
+    private void receiveAndDiscard100messages(CountDownLatch latch) {
+        ActiveMQConnection activeMQConnection = null;
+        try {
+            activeMQConnection = createConnection(null);
+            Session session = activeMQConnection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = session.createConsumer(
+                    session.createQueue(queueName));
+            activeMQConnection.start();
+            for (int i = 0; i < 100; i++) {
+                messageConsumer.receive();
+            }
+            messageConsumer.close();
+            LOG.info("Created and disconnected");
+        } catch (JMSException e) {
+            LOG.warn("Got an error while receiving the messages", e);
+        } finally {
+            if (activeMQConnection != null) {
+                try {
+                    activeMQConnection.close();
+                } catch (JMSException e) {
+                }
+            }
+        }
+        latch.countDown();
+    }
+
+    private ActiveMQConnection createConnection(String id) throws JMSException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        if (id != null) {
+            factory.setClientID(id);
+        }
+
+        ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+        return connection;
+    }
+
+    private String generateBody(int length) {
+
+        StringBuilder sb = new StringBuilder();
+        int te = 0;
+        for (int i = 1; i <= length; i++) {
+            te = r.nextInt(62);
+            sb.append(str.charAt(te));
+        }
+        return sb.toString();
+    }
+}


Mime
View raw message