activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [21/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:36:52 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java
new file mode 100644
index 0000000..fd71558
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ3140Test {
+
+    private static final int MESSAGES_PER_THREAD = 100;
+
+    private static final int THREAD_COUNT = 10;
+
+    private BrokerService broker;
+
+    private static final String QUEUE_NAME = "test";
+
+    private static class Sender extends Thread {
+
+        private static final int DELAY = 3000;
+
+        @Override
+        public void run() {
+            try {
+                ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+                Connection connection = cf.createConnection();
+                connection.start();
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+                Message message = session.createTextMessage("test");
+                for (int i = 0; i < MESSAGES_PER_THREAD; i++) {
+                    message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, DELAY);
+                    producer.send(message);
+                }
+                session.close();
+                connection.close();
+            } catch (JMSException e) {
+                fail(e.getMessage());
+            }
+        }
+    }
+
+    @Before
+    public void setup() throws Exception {
+        File schedulerDirectory = new File("target/test/ScheduledDB");
+
+        IOHelper.mkdirs(schedulerDirectory);
+        IOHelper.deleteChildren(schedulerDirectory);
+
+        broker = new BrokerService();
+        broker.setSchedulerSupport(true);
+        broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setDataDirectory("target");
+        broker.setSchedulerDirectoryFile(schedulerDirectory);
+        broker.setUseJmx(false);
+        broker.addConnector("vm://localhost");
+
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    @Test
+    public void noMessageLostOnConcurrentScheduling() throws JMSException, InterruptedException {
+
+        final AtomicLong receiveCounter = new AtomicLong();
+
+        ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                receiveCounter.incrementAndGet();
+            }
+        });
+
+        List<Sender> senderThreads = new ArrayList<Sender>();
+        for (int i = 0; i < THREAD_COUNT; i++) {
+            Sender sender = new Sender();
+            senderThreads.add(sender);
+        }
+        for (Sender sender : senderThreads) {
+            sender.start();
+        }
+        for (Sender sender : senderThreads) {
+            sender.join();
+        }
+
+        // wait until all scheduled messages has been received
+        TimeUnit.MINUTES.sleep(2);
+
+        session.close();
+        connection.close();
+
+        assertEquals(MESSAGES_PER_THREAD * THREAD_COUNT, receiveCounter.get());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java
new file mode 100644
index 0000000..1209bd7
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ3141Test {
+
+    private static final int MAX_MESSAGES = 100;
+
+    private static final long DELAY_IN_MS = 100;
+
+    private static final String QUEUE_NAME = "target.queue";
+
+    private BrokerService broker;
+
+    private final CountDownLatch messageCountDown = new CountDownLatch(MAX_MESSAGES);
+
+    private ConnectionFactory factory;
+
+    @Before
+    public void setup() throws Exception {
+
+        broker = new BrokerService();
+        broker.setPersistent(true);
+        broker.setSchedulerSupport(true);
+        broker.setDataDirectory("target");
+        broker.setUseJmx(false);
+        broker.addConnector("vm://localhost");
+
+        File schedulerDirectory = new File("target/test/ScheduledDB");
+        IOHelper.mkdirs(schedulerDirectory);
+        IOHelper.deleteChildren(schedulerDirectory);
+        broker.setSchedulerDirectoryFile(schedulerDirectory);
+
+        broker.start();
+        broker.waitUntilStarted();
+
+        factory = new ActiveMQConnectionFactory("vm://localhost");
+    }
+
+    private void sendMessages() throws Exception {
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+        for (int i = 0; i < MAX_MESSAGES; i++) {
+            Message message = session.createTextMessage();
+            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, DELAY_IN_MS);
+            producer.send(message);
+        }
+        connection.close();
+    }
+
+    @Test
+    public void testNoMissingMessagesOnShortScheduleDelay() throws Exception {
+
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
+
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                messageCountDown.countDown();
+            }
+        });
+        sendMessages();
+
+        boolean receiveComplete = messageCountDown.await(5, TimeUnit.SECONDS);
+
+        connection.close();
+
+        assertTrue("expect all messages received but " + messageCountDown.getCount() + " are missing", receiveComplete);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java
new file mode 100644
index 0000000..81128bd
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ3145Test {
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ3145Test.class);
+    private final String MESSAGE_TEXT = new String(new byte[1024]);
+    BrokerService broker;
+    ConnectionFactory factory;
+    Connection connection;
+    Session session;
+    Queue queue;
+    MessageConsumer consumer;
+
+    @Before
+    public void createBroker() throws Exception {
+        createBroker(true);
+    }
+
+    public void createBroker(boolean deleteAll) throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(deleteAll);
+        broker.setDataDirectory("target/AMQ3145Test");
+        broker.setUseJmx(true);
+        broker.getManagementContext().setCreateConnector(false);
+        broker.addConnector("tcp://localhost:0");
+        broker.start();
+        broker.waitUntilStarted();
+        factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString());
+        connection = factory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (consumer != null) {
+            consumer.close();
+        }
+        session.close();
+        connection.stop();
+        connection.close();
+        broker.stop();
+    }
+
+    @Test
+    public void testCacheDisableReEnable() throws Exception {
+        createProducerAndSendMessages(1);
+        QueueViewMBean proxy = getProxyToQueueViewMBean();
+        assertTrue("cache is enabled", proxy.isCacheEnabled());
+        tearDown();
+        createBroker(false);
+        proxy = getProxyToQueueViewMBean();
+        assertEquals("one pending message", 1, proxy.getQueueSize());
+        assertTrue("cache is disabled when there is a pending message", !proxy.isCacheEnabled());
+
+        createConsumer(1);
+        createProducerAndSendMessages(1);
+        assertTrue("cache is enabled again on next send when there are no messages", proxy.isCacheEnabled());
+    }
+
+    private QueueViewMBean getProxyToQueueViewMBean()
+            throws MalformedObjectNameException, JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq"
+                + ":destinationType=Queue,destinationName=" + queue.getQueueName()
+                + ",type=Broker,brokerName=localhost");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueViewMBeanName,
+                        QueueViewMBean.class, true);
+        return proxy;
+    }
+
+    private void createProducerAndSendMessages(int numToSend) throws Exception {
+        queue = session.createQueue("test1");
+        MessageProducer producer = session.createProducer(queue);
+        for (int i = 0; i < numToSend; i++) {
+            TextMessage message = session.createTextMessage(MESSAGE_TEXT + i);
+            if (i  != 0 && i % 50000 == 0) {
+                LOG.info("sent: " + i);
+            }
+            producer.send(message);
+        }
+        producer.close();
+    }
+
+    private void createConsumer(int numToConsume) throws Exception {
+        consumer = session.createConsumer(queue);
+        // wait for buffer fill out
+        for (int i = 0; i < numToConsume; ++i) {
+            Message message = consumer.receive(2000);
+            message.acknowledge();
+        }
+        consumer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java
new file mode 100644
index 0000000..f18af6f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.virtual.MirroredQueue;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.spring.ConsumerBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ3157Test extends EmbeddedBrokerTestSupport {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3157Test.class);
+    private Connection connection;
+
+    public void testInactiveMirroredQueueIsCleanedUp() throws Exception {
+
+        if (connection == null) {
+            connection = createConnection();
+        }
+        connection.start();
+
+        ConsumerBean messageList = new ConsumerBean();
+        messageList.setVerbose(true);
+
+        ActiveMQDestination consumeDestination = createConsumeDestination();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        LOG.info("Consuming from: " + consumeDestination);
+
+        MessageConsumer c1 = session.createConsumer(consumeDestination);
+        c1.setMessageListener(messageList);
+
+        // create topic producer
+        ActiveMQQueue sendDestination = new ActiveMQQueue(getQueueName());
+        LOG.info("Sending to: " + sendDestination);
+
+        MessageProducer producer = session.createProducer(sendDestination);
+        assertNotNull(producer);
+
+        final int total = 10;
+        for (int i = 0; i < total; i++) {
+            producer.send(session.createTextMessage("message: " + i));
+        }
+
+        messageList.assertMessagesArrived(total);
+        LOG.info("Received: " + messageList);
+        messageList.flushMessages();
+
+        MessageConsumer c2 = session.createConsumer(sendDestination);
+        c2.setMessageListener(messageList);
+        messageList.assertMessagesArrived(total);
+        LOG.info("Q Received: " + messageList);
+
+        connection.close();
+
+        List<ObjectName> topics = Arrays.asList(broker.getAdminView().getTopics());
+        assertTrue(topics.contains(createObjectName(consumeDestination)));
+        List<ObjectName> queues = Arrays.asList(broker.getAdminView().getQueues());
+        assertTrue(queues.contains(createObjectName(sendDestination)));
+
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+        topics = Arrays.asList(broker.getAdminView().getTopics());
+        if (topics != null) {
+            assertFalse("Virtual Topic Desination did not get cleaned up.",
+                        topics.contains(createObjectName(consumeDestination)));
+        }
+        queues = Arrays.asList(broker.getAdminView().getQueues());
+        if (queues != null) {
+            assertFalse("Mirrored Queue Desination did not get cleaned up.",
+                        queues.contains(createObjectName(sendDestination)));
+        }
+    }
+
+    protected ActiveMQDestination createConsumeDestination() {
+        return new ActiveMQTopic("VirtualTopic.Mirror." + getQueueName());
+    }
+
+    protected String getQueueName() {
+        return "My.Queue";
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setUseMirroredQueues(true);
+        answer.setPersistent(isPersistent());
+        answer.setSchedulePeriodForDestinationPurge(1000);
+
+        PolicyEntry entry = new PolicyEntry();
+        entry.setGcInactiveDestinations(true);
+        entry.setInactiveTimoutBeforeGC(5000);
+        entry.setProducerFlowControl(true);
+        PolicyMap map = new PolicyMap();
+        map.setDefaultEntry(entry);
+
+        MirroredQueue mirrorQ = new MirroredQueue();
+        mirrorQ.setCopyMessage(true);
+        DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{mirrorQ};
+        answer.setDestinationInterceptors(destinationInterceptors);
+
+        answer.setDestinationPolicy(map);
+        answer.addConnector(bindAddress);
+
+        return answer;
+    }
+
+    protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
+        String domain = "org.apache.activemq";
+        ObjectName name;
+        if (destination.isQueue()) {
+            name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=" + destination.getPhysicalName());
+        } else {
+            name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=" + destination.getPhysicalName());
+        }
+        return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class,
+                true);
+    }
+
+    protected ObjectName createObjectName(ActiveMQDestination destination) throws Exception {
+        String domain = "org.apache.activemq";
+        ObjectName name;
+        if (destination.isQueue()) {
+            name = new ObjectName(domain + ":type=Broker,brokerName=localhost," +
+                                  "destinationType=Queue,destinationName=" + destination.getPhysicalName());
+        } else {
+            name = new ObjectName(domain + ":type=Broker,brokerName=localhost," +
+                                  "destinationType=Topic,destinationName=" + destination.getPhysicalName());
+        }
+
+        return name;
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java
new file mode 100644
index 0000000..7e3048a
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the loss of messages detected during testing with ActiveMQ 5.4.1 and 5.4.2.
+ * <p/>
+ * Symptoms: - 1 record is lost "early" in the stream. - no more records lost.
+ * <p/>
+ * Test Configuration: - Broker Settings: - Destination Policy - Occurs with "Destination Policy" using Store Cursor and
+ * a memory limit - Not reproduced without "Destination Policy" defined - Persistence Adapter - Memory: Does not occur.
+ * - KahaDB: Occurs. - Messages - Occurs with TextMessage and BinaryMessage - Persistent messages.
+ * <p/>
+ * Notes: - Lower memory limits increase the rate of occurrence. - Higher memory limits may prevent the problem
+ * (probably because memory limits not reached). - Producers sending a number of messages before consumers come online
+ * increases rate of occurrence.
+ */
+
+public class AMQ3167Test {
+    protected BrokerService embeddedBroker;
+
+    protected static final int MEMORY_LIMIT = 16 * 1024;
+
+    protected static boolean Debug_f = false;
+
+    protected long Producer_stop_time = 0;
+    protected long Consumer_stop_time = 0;
+    protected long Consumer_startup_delay_ms = 2000;
+    protected boolean Stop_after_error = true;
+
+    protected Connection JMS_conn;
+    protected long Num_error = 0;
+
+    // // ////
+    // // UTILITIES ////
+    // // ////
+
+    /**
+     * Create a new, unsecured, client connection to the test broker using the given username and password. This
+     * connection bypasses all security.
+     * <p/>
+     * Don't forget to start the connection or no messages will be received by consumers even though producers will work
+     * fine.
+     *
+     * @username name of the JMS user for the connection; may be null.
+     * @password Password for the JMS user; may be null.
+     */
+
+    protected Connection createUnsecuredConnection(String username, String password) throws javax.jms.JMSException {
+        ActiveMQConnectionFactory conn_fact;
+
+        conn_fact = new ActiveMQConnectionFactory(embeddedBroker.getVmConnectorURI());
+
+        return conn_fact.createConnection(username, password);
+    }
+
+    // // ////
+    // // TEST FUNCTIONALITY ////
+    // // ////
+
+    @Before
+    public void testPrep() throws Exception {
+        embeddedBroker = new BrokerService();
+        configureBroker(embeddedBroker);
+        embeddedBroker.start();
+        embeddedBroker.waitUntilStarted();
+
+        // Prepare the connection
+        JMS_conn = createUnsecuredConnection(null, null);
+        JMS_conn.start();
+    }
+
+    @After
+    public void testCleanup() throws java.lang.Exception {
+        JMS_conn.stop();
+        embeddedBroker.stop();
+    }
+
+    protected void configureBroker(BrokerService broker_svc) throws Exception {
+
+        broker_svc.setBrokerName("testbroker1");
+
+        broker_svc.setUseJmx(false);
+        broker_svc.setPersistent(true);
+        broker_svc.setDataDirectory("target/AMQ3167Test");
+        configureDestinationPolicy(broker_svc);
+    }
+
+    /**
+     * NOTE: overrides any prior policy map defined for the broker service.
+     */
+
+    protected void configureDestinationPolicy(BrokerService broker_svc) {
+        PolicyMap pol_map;
+        PolicyEntry pol_ent;
+        ArrayList<PolicyEntry> ent_list;
+
+        ent_list = new ArrayList<PolicyEntry>();
+
+        //
+        // QUEUES
+        //
+
+        pol_ent = new PolicyEntry();
+        pol_ent.setQueue(">");
+        pol_ent.setMemoryLimit(MEMORY_LIMIT);
+        pol_ent.setProducerFlowControl(false);
+        ent_list.add(pol_ent);
+
+        //
+        // COMPLETE POLICY MAP
+        //
+
+        pol_map = new PolicyMap();
+        pol_map.setPolicyEntries(ent_list);
+
+        broker_svc.setDestinationPolicy(pol_map);
+    }
+
+    // // ////
+    // // TEST ////
+    // // ////
+
+    @Test
+    public void testQueueLostMessage() throws Exception {
+        Destination dest;
+
+        dest = ActiveMQDestination.createDestination("lostmsgtest.queue", ActiveMQDestination.QUEUE_TYPE);
+
+        // 10 seconds from now
+        Producer_stop_time = java.lang.System.nanoTime() + (10L * 1000000000L);
+
+        // 15 seconds from now
+        Consumer_stop_time = Producer_stop_time + (5L * 1000000000L);
+
+        runLostMsgTest(dest, 1000000, 1, 1, false);
+
+        // Make sure failures in the threads are thoroughly reported in the JUnit framework.
+        assertTrue(Num_error == 0);
+    }
+
+    /**
+     *
+     */
+
+    protected static void log(String msg) {
+        if (Debug_f)
+            java.lang.System.err.println(msg);
+    }
+
+    /**
+     * Main body of the lost-message test.
+     */
+
+    protected void runLostMsgTest(Destination dest, int num_msg, int num_send_per_sess, int num_recv_per_sess, boolean topic_f) throws Exception {
+        Thread prod_thread;
+        Thread cons_thread;
+        String tag;
+        Session sess;
+        MessageProducer prod;
+        MessageConsumer cons;
+        int ack_mode;
+
+        //
+        // Start the producer
+        //
+
+        tag = "prod";
+        log(">> Starting producer " + tag);
+
+        sess = JMS_conn.createSession((num_send_per_sess > 1), Session.AUTO_ACKNOWLEDGE);
+        prod = sess.createProducer(dest);
+
+        prod_thread = new producerThread(sess, prod, tag, num_msg, num_send_per_sess);
+        prod_thread.start();
+        log("Started producer " + tag);
+
+        //
+        // Delay before starting consumers
+        //
+
+        log("Waiting before starting consumers");
+        java.lang.Thread.sleep(Consumer_startup_delay_ms);
+
+        //
+        // Now create and start the consumer
+        //
+
+        tag = "cons";
+        log(">> Starting consumer");
+
+        if (num_recv_per_sess > 1)
+            ack_mode = Session.CLIENT_ACKNOWLEDGE;
+        else
+            ack_mode = Session.AUTO_ACKNOWLEDGE;
+
+        sess = JMS_conn.createSession(false, ack_mode);
+        cons = sess.createConsumer(dest);
+
+        cons_thread = new consumerThread(sess, cons, tag, num_msg, num_recv_per_sess);
+        cons_thread.start();
+        log("Started consumer " + tag);
+
+        //
+        // Wait for the producer and consumer to finish.
+        //
+
+        log("< waiting for producer.");
+        prod_thread.join();
+
+        log("< waiting for consumer.");
+        cons_thread.join();
+
+        log("Shutting down");
+    }
+
+    // // ////
+    // // INTERNAL CLASSES ////
+    // // ////
+
+    /**
+     * Producer thread - runs a single producer until the maximum number of messages is sent, the producer stop time is
+     * reached, or a test error is detected.
+     */
+
+    protected class producerThread extends Thread {
+        protected Session msgSess;
+        protected MessageProducer msgProd;
+        protected String producerTag;
+        protected int numMsg;
+        protected int numPerSess;
+        protected long producer_stop_time;
+
+        producerThread(Session sess, MessageProducer prod, String tag, int num_msg, int sess_size) {
+            super();
+
+            producer_stop_time = 0;
+            msgSess = sess;
+            msgProd = prod;
+            producerTag = tag;
+            numMsg = num_msg;
+            numPerSess = sess_size;
+        }
+
+        public void execTest() throws Exception {
+            Message msg;
+            int sess_start;
+            int cur;
+
+            sess_start = 0;
+            cur = 0;
+            while ((cur < numMsg) && (!didTimeOut()) && ((!Stop_after_error) || (Num_error == 0))) {
+                msg = msgSess.createTextMessage("test message from " + producerTag);
+                msg.setStringProperty("testprodtag", producerTag);
+                msg.setIntProperty("seq", cur);
+
+                if (msg instanceof ActiveMQMessage) {
+                    ((ActiveMQMessage) msg).setResponseRequired(true);
+                }
+
+                //
+                // Send the message.
+                //
+
+                msgProd.send(msg);
+                cur++;
+
+                //
+                // Commit if the number of messages per session has been reached, and
+                // transactions are being used (only when > 1 msg per sess).
+                //
+
+                if ((numPerSess > 1) && ((cur - sess_start) >= numPerSess)) {
+                    msgSess.commit();
+                    sess_start = cur;
+                }
+            }
+
+            // Make sure to send the final commit, if there were sends since the last commit.
+            if ((numPerSess > 1) && ((cur - sess_start) > 0))
+                msgSess.commit();
+
+            if (cur < numMsg)
+                log("* Producer " + producerTag + " timed out at " + java.lang.System.nanoTime() + " (stop time " + producer_stop_time + ")");
+        }
+
+        /**
+         * Check whether it is time for the producer to terminate.
+         */
+
+        protected boolean didTimeOut() {
+            if ((Producer_stop_time > 0) && (java.lang.System.nanoTime() >= Producer_stop_time))
+                return true;
+
+            return false;
+        }
+
+        /**
+         * Run the producer.
+         */
+
+        @Override
+        public void run() {
+            try {
+                log("- running producer " + producerTag);
+                execTest();
+                log("- finished running producer " + producerTag);
+            } catch (Throwable thrown) {
+                Num_error++;
+                fail("producer " + producerTag + " failed: " + thrown.getMessage());
+                throw new Error("producer " + producerTag + " failed", thrown);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return producerTag;
+        }
+    }
+
+    /**
+     * Producer thread - runs a single consumer until the maximum number of messages is received, the consumer stop time
+     * is reached, or a test error is detected.
+     */
+
+    protected class consumerThread extends Thread {
+        protected Session msgSess;
+        protected MessageConsumer msgCons;
+        protected String consumerTag;
+        protected int numMsg;
+        protected int numPerSess;
+
+        consumerThread(Session sess, MessageConsumer cons, String tag, int num_msg, int sess_size) {
+            super();
+
+            msgSess = sess;
+            msgCons = cons;
+            consumerTag = tag;
+            numMsg = num_msg;
+            numPerSess = sess_size;
+        }
+
+        public void execTest() throws Exception {
+            Message msg;
+            int sess_start;
+            int cur;
+
+            msg = null;
+            sess_start = 0;
+            cur = 0;
+
+            while ((cur < numMsg) && (!didTimeOut()) && ((!Stop_after_error) || (Num_error == 0))) {
+                //
+                // Use a timeout of 1 second to periodically check the consumer timeout.
+                //
+                msg = msgCons.receive(1000);
+                if (msg != null) {
+                    checkMessage(msg, cur);
+                    cur++;
+
+                    if ((numPerSess > 1) && ((cur - sess_start) >= numPerSess)) {
+                        msg.acknowledge();
+                        sess_start = cur;
+                    }
+                }
+            }
+
+            // Acknowledge the last messages, if they were not yet acknowledged.
+            if ((numPerSess > 1) && ((cur - sess_start) > 0))
+                msg.acknowledge();
+
+            if (cur < numMsg)
+                log("* Consumer " + consumerTag + " timed out");
+        }
+
+        /**
+         * Check whether it is time for the consumer to terminate.
+         */
+
+        protected boolean didTimeOut() {
+            if ((Consumer_stop_time > 0) && (java.lang.System.nanoTime() >= Consumer_stop_time))
+                return true;
+
+            return false;
+        }
+
+        /**
+         * Verify the message received. Sequence numbers are checked and are expected to exactly match the message
+         * number (starting at 0).
+         */
+
+        protected void checkMessage(Message msg, int exp_seq) throws javax.jms.JMSException {
+            int seq;
+
+            seq = msg.getIntProperty("seq");
+
+            if (exp_seq != seq) {
+                Num_error++;
+                fail("*** Consumer " + consumerTag + " expected seq " + exp_seq + "; received " + seq);
+            }
+        }
+
+        /**
+         * Run the consumer.
+         */
+
+        @Override
+        public void run() {
+            try {
+                log("- running consumer " + consumerTag);
+                execTest();
+                log("- running consumer " + consumerTag);
+            } catch (Throwable thrown) {
+                Num_error++;
+                fail("consumer " + consumerTag + " failed: " + thrown.getMessage());
+                throw new Error("consumer " + consumerTag + " failed", thrown);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return consumerTag;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
new file mode 100644
index 0000000..48c5cbb
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
@@ -0,0 +1,734 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ3274Test {
+    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3274Test.class);
+
+    protected static int Next_broker_num = 0;
+    protected EmbeddedTcpBroker broker1;
+    protected EmbeddedTcpBroker broker2;
+
+    protected int nextEchoId = 0;
+    protected boolean testError = false;
+
+    protected int echoResponseFill = 0; // Number of "filler" response messages per request
+
+    public AMQ3274Test() throws Exception {
+        broker1 = new EmbeddedTcpBroker();
+        broker2 = new EmbeddedTcpBroker();
+
+        broker1.coreConnectTo(broker2, true);
+        broker2.coreConnectTo(broker1, true);
+    }
+
+    public void logMessage(String msg) {
+        System.out.println(msg);
+        System.out.flush();
+    }
+
+    public void testMessages(Session sess, MessageProducer req_prod, Destination resp_dest, int num_msg) throws Exception {
+        MessageConsumer resp_cons;
+        TextMessage msg;
+        MessageClient cons_client;
+        int cur;
+        int tot_expected;
+
+        resp_cons = sess.createConsumer(resp_dest);
+
+        cons_client = new MessageClient(resp_cons, num_msg);
+        cons_client.start();
+
+        cur = 0;
+        while ((cur < num_msg) && (!testError)) {
+            msg = sess.createTextMessage("MSG AAAA " + cur);
+            msg.setIntProperty("SEQ", 100 + cur);
+            msg.setStringProperty("TEST", "TOPO");
+            msg.setJMSReplyTo(resp_dest);
+
+            if (cur == (num_msg - 1))
+                msg.setBooleanProperty("end-of-response", true);
+
+            req_prod.send(msg);
+
+            cur++;
+        }
+
+        cons_client.waitShutdown(5000);
+
+        if (cons_client.shutdown()) {
+            LOG.debug("Consumer client shutdown complete");
+        } else {
+            LOG.debug("Consumer client shutdown incomplete!!!");
+        }
+
+        tot_expected = num_msg * (echoResponseFill + 1);
+
+        if (cons_client.getNumMsgReceived() == tot_expected) {
+            LOG.info("Have " + tot_expected + " messages, as-expected");
+        } else {
+            LOG.error("Have " + cons_client.getNumMsgReceived() + " messages; expected " + tot_expected);
+            testError = true;
+        }
+
+        resp_cons.close();
+    }
+
+    /**
+     * Test one destination between the given "producer broker" and
+     * "consumer broker" specified.
+     */
+    public void testOneDest(Connection conn, Session sess, Destination cons_dest, String prod_broker_url, String cons_broker_url, int num_msg) throws Exception {
+        int echo_id;
+
+        EchoService echo_svc;
+        String echo_queue_name;
+        Destination prod_dest;
+        MessageProducer msg_prod;
+
+        synchronized (this) {
+            echo_id = this.nextEchoId;
+            this.nextEchoId++;
+        }
+
+        echo_queue_name = "echo.queue." + echo_id;
+
+        LOG.trace("destroying the echo queue in case an old one exists");
+        removeQueue(conn, echo_queue_name);
+
+        echo_svc = new EchoService(echo_queue_name, prod_broker_url);
+        echo_svc.start();
+
+        LOG.trace("Creating echo queue and producer");
+        prod_dest = sess.createQueue(echo_queue_name);
+        msg_prod = sess.createProducer(prod_dest);
+
+        testMessages(sess, msg_prod, cons_dest, num_msg);
+
+        echo_svc.shutdown();
+        msg_prod.close();
+    }
+
+    /**
+     * TEST TEMPORARY TOPICS
+     */
+    public void testTempTopic(String prod_broker_url, String cons_broker_url) throws Exception {
+        Connection conn;
+        Session sess;
+        Destination cons_dest;
+        int num_msg;
+
+        num_msg = 5;
+
+        LOG.info("TESTING TEMP TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        LOG.trace("Creating destination");
+        cons_dest = sess.createTemporaryTopic();
+
+        testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+        sess.close();
+        conn.close();
+    }
+
+    /**
+     * TEST TOPICS
+     */
+    public void testTopic(String prod_broker_url, String cons_broker_url) throws Exception {
+        int num_msg;
+
+        Connection conn;
+        Session sess;
+        String topic_name;
+
+        Destination cons_dest;
+
+        num_msg = 5;
+
+        LOG.info("TESTING TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        topic_name = "topotest2.perm.topic";
+        LOG.trace("Removing existing Topic");
+        removeTopic(conn, topic_name);
+        LOG.trace("Creating Topic, " + topic_name);
+        cons_dest = sess.createTopic(topic_name);
+
+        testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+        removeTopic(conn, topic_name);
+        sess.close();
+        conn.close();
+    }
+
+    /**
+     * TEST TEMPORARY QUEUES
+     */
+    public void testTempQueue(String prod_broker_url, String cons_broker_url) throws Exception {
+        int num_msg;
+
+        Connection conn;
+        Session sess;
+
+        Destination cons_dest;
+
+        num_msg = 5;
+
+        LOG.info("TESTING TEMP QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        LOG.trace("Creating destination");
+        cons_dest = sess.createTemporaryQueue();
+
+        testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+        sess.close();
+        conn.close();
+    }
+
+    /**
+     * TEST QUEUES
+     */
+    public void testQueue(String prod_broker_url, String cons_broker_url) throws Exception {
+        int num_msg;
+
+        Connection conn;
+        Session sess;
+        String queue_name;
+
+        Destination cons_dest;
+
+        num_msg = 5;
+
+        LOG.info("TESTING QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        queue_name = "topotest2.perm.queue";
+        LOG.trace("Removing existing Queue");
+        removeQueue(conn, queue_name);
+        LOG.trace("Creating Queue, " + queue_name);
+        cons_dest = sess.createQueue(queue_name);
+
+        testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+        removeQueue(conn, queue_name);
+        sess.close();
+        conn.close();
+    }
+
+    @Test
+    public void run() throws Exception {
+        Thread start1;
+        Thread start2;
+
+        testError = false;
+
+        // Use threads to avoid startup deadlock since the first broker started waits until
+        // it knows the name of the remote broker before finishing its startup, which means
+        // the remote must already be running.
+
+        start1 = new Thread() {
+            public void run() {
+                try {
+                    broker1.start();
+                } catch (Exception ex) {
+                    LOG.error(null, ex);
+                }
+            }
+        };
+
+        start2 = new Thread() {
+            public void run() {
+                try {
+                    broker2.start();
+                } catch (Exception ex) {
+                    LOG.error(null, ex);
+                }
+            }
+        };
+
+        start1.start();
+        start2.start();
+
+        start1.join();
+        start2.join();
+
+        if (!testError) {
+            this.testTempTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+        }
+        if (!testError) {
+            this.testTempQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+        }
+        if (!testError) {
+            this.testTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+        }
+        if (!testError) {
+            this.testQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+        }
+        Thread.sleep(100);
+
+        shutdown();
+
+        assertTrue(!testError);
+    }
+
+    public void shutdown() throws Exception {
+        broker1.stop();
+        broker2.stop();
+    }
+
+    /**
+     * @param args
+     *            the command line arguments
+     */
+    public static void main(String[] args) {
+        AMQ3274Test main_obj;
+
+        try {
+            main_obj = new AMQ3274Test();
+            main_obj.run();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            LOG.error(null, ex);
+            System.exit(0);
+        }
+    }
+
+    protected Connection createConnection(String url) throws Exception {
+        return org.apache.activemq.ActiveMQConnection.makeConnection(url);
+    }
+
+    protected static void removeQueue(Connection conn, String dest_name) throws java.lang.Exception {
+        org.apache.activemq.command.ActiveMQDestination dest;
+
+        if (conn instanceof org.apache.activemq.ActiveMQConnection) {
+            dest = org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name,
+                    (byte) org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE);
+            ((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest);
+        }
+    }
+
+    protected static void removeTopic(Connection conn, String dest_name) throws java.lang.Exception {
+        org.apache.activemq.command.ActiveMQDestination dest;
+
+        if (conn instanceof org.apache.activemq.ActiveMQConnection) {
+            dest = org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name,
+                    (byte) org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE);
+            ((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest);
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static String fmtMsgInfo(Message msg) throws Exception {
+        StringBuilder msg_desc;
+        String prop;
+        Enumeration prop_enum;
+
+        msg_desc = new StringBuilder();
+        msg_desc = new StringBuilder();
+
+        if (msg instanceof TextMessage) {
+            msg_desc.append(((TextMessage) msg).getText());
+        } else {
+            msg_desc.append("[");
+            msg_desc.append(msg.getClass().getName());
+            msg_desc.append("]");
+        }
+
+        prop_enum = msg.getPropertyNames();
+        while (prop_enum.hasMoreElements()) {
+            prop = (String) prop_enum.nextElement();
+            msg_desc.append("; ");
+            msg_desc.append(prop);
+            msg_desc.append("=");
+            msg_desc.append(msg.getStringProperty(prop));
+        }
+
+        return msg_desc.toString();
+    }
+
+    // ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////// INTERNAL CLASSES
+    // /////////////////////////////////////////////////
+    // ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    protected class EmbeddedTcpBroker {
+        protected BrokerService brokerSvc;
+        protected int brokerNum;
+        protected String brokerName;
+        protected String brokerId;
+        protected int port;
+        protected String tcpUrl;
+
+        public EmbeddedTcpBroker() throws Exception {
+            brokerSvc = new BrokerService();
+
+            synchronized (this.getClass()) {
+                brokerNum = Next_broker_num;
+                Next_broker_num++;
+            }
+
+            brokerName = "broker" + brokerNum;
+            brokerId = "b" + brokerNum;
+
+            brokerSvc.setBrokerName(brokerName);
+            brokerSvc.setBrokerId(brokerId);
+            brokerSvc.setPersistent(false);
+            brokerSvc.setUseJmx(false);
+            tcpUrl = brokerSvc.addConnector("tcp://localhost:0").getPublishableConnectString();
+        }
+
+        public Connection createConnection() throws URISyntaxException, JMSException {
+            Connection result;
+
+            result = org.apache.activemq.ActiveMQConnection.makeConnection(this.tcpUrl);
+
+            return result;
+        }
+
+        public String getConnectionUrl() {
+            return this.tcpUrl;
+        }
+
+        /**
+         * Create network connections to the given broker using the
+         * network-connector configuration of CORE brokers (e.g.
+         * core1.bus.dev1.coresys.tmcs)
+         *
+         * @param other
+         * @param duplex_f
+         */
+        public void coreConnectTo(EmbeddedTcpBroker other, boolean duplex_f) throws Exception {
+            this.makeConnectionTo(other, duplex_f, true);
+            this.makeConnectionTo(other, duplex_f, false);
+        }
+
+        public void start() throws Exception {
+            brokerSvc.start();
+        }
+
+        public void stop() throws Exception {
+            brokerSvc.stop();
+        }
+
+        /**
+         * Make one connection to the other embedded broker, of the specified
+         * type (queue or topic) using the standard CORE broker networking.
+         *
+         * @param other
+         * @param duplex_f
+         * @param queue_f
+         * @throws Exception
+         */
+        protected void makeConnectionTo(EmbeddedTcpBroker other, boolean duplex_f, boolean queue_f) throws Exception {
+            NetworkConnector nw_conn;
+            String prefix;
+            ActiveMQDestination excl_dest;
+            ArrayList<ActiveMQDestination> excludes;
+
+            nw_conn = new DiscoveryNetworkConnector(new URI("static:(" + other.tcpUrl + ")"));
+            nw_conn.setDuplex(duplex_f);
+
+            if (queue_f)
+                nw_conn.setConduitSubscriptions(false);
+            else
+                nw_conn.setConduitSubscriptions(true);
+
+            nw_conn.setNetworkTTL(5);
+            nw_conn.setSuppressDuplicateQueueSubscriptions(true);
+            nw_conn.setDecreaseNetworkConsumerPriority(true);
+            nw_conn.setBridgeTempDestinations(true);
+
+            if (queue_f) {
+                prefix = "queue";
+                excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
+            } else {
+                prefix = "topic";
+                excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
+            }
+
+            excludes = new ArrayList<ActiveMQDestination>();
+            excludes.add(excl_dest);
+            nw_conn.setExcludedDestinations(excludes);
+
+            if (duplex_f)
+                nw_conn.setName(this.brokerId + "<-" + prefix + "->" + other.brokerId);
+            else
+                nw_conn.setName(this.brokerId + "-" + prefix + "->" + other.brokerId);
+
+            brokerSvc.addNetworkConnector(nw_conn);
+        }
+    }
+
+    protected class MessageClient extends java.lang.Thread {
+        protected MessageConsumer msgCons;
+        protected boolean shutdownInd;
+        protected int expectedCount;
+        protected int lastSeq = 0;
+        protected int msgCount = 0;
+        protected boolean haveFirstSeq;
+        protected CountDownLatch shutdownLatch;
+
+        public MessageClient(MessageConsumer cons, int num_to_expect) {
+            msgCons = cons;
+            expectedCount = (num_to_expect * (echoResponseFill + 1));
+            shutdownLatch = new CountDownLatch(1);
+        }
+
+        public void run() {
+            CountDownLatch latch;
+
+            try {
+                synchronized (this) {
+                    latch = shutdownLatch;
+                }
+
+                shutdownInd = false;
+                processMessages();
+
+                latch.countDown();
+            } catch (Exception exc) {
+                LOG.error("message client error", exc);
+            }
+        }
+
+        public void waitShutdown(long timeout) {
+            CountDownLatch latch;
+
+            try {
+                synchronized (this) {
+                    latch = shutdownLatch;
+                }
+
+                if (latch != null)
+                    latch.await(timeout, TimeUnit.MILLISECONDS);
+                else
+                    LOG.info("echo client shutdown: client does not appear to be active");
+            } catch (InterruptedException int_exc) {
+                LOG.warn("wait for message client shutdown interrupted", int_exc);
+            }
+        }
+
+        public boolean shutdown() {
+            boolean down_ind;
+
+            if (!shutdownInd) {
+                shutdownInd = true;
+            }
+
+            waitShutdown(200);
+
+            synchronized (this) {
+                if ((shutdownLatch == null) || (shutdownLatch.getCount() == 0))
+                    down_ind = true;
+                else
+                    down_ind = false;
+            }
+
+            return down_ind;
+        }
+
+        public int getNumMsgReceived() {
+            return msgCount;
+        }
+
+        protected void processMessages() throws Exception {
+            Message in_msg;
+
+            haveFirstSeq = false;
+            while ((!shutdownInd) && (!testError)) {
+                in_msg = msgCons.receive(100);
+
+                if (in_msg != null) {
+                    msgCount++;
+                    checkMessage(in_msg);
+                }
+            }
+        }
+
+        protected void checkMessage(Message in_msg) throws Exception {
+            int seq;
+
+            LOG.debug("received message " + fmtMsgInfo(in_msg));
+
+            if (in_msg.propertyExists("SEQ")) {
+                seq = in_msg.getIntProperty("SEQ");
+
+                if ((haveFirstSeq) && (seq != (lastSeq + 1))) {
+                    LOG.error("***ERROR*** incorrect sequence number; expected " + Integer.toString(lastSeq + 1) + " but have " + Integer.toString(seq));
+
+                    testError = true;
+                }
+
+                lastSeq = seq;
+
+                if (msgCount > expectedCount) {
+                    LOG.warn("*** have more messages than expected; have " + msgCount + "; expect " + expectedCount);
+
+                    testError = true;
+                }
+            }
+
+            if (in_msg.propertyExists("end-of-response")) {
+                LOG.trace("received end-of-response message");
+                shutdownInd = true;
+            }
+        }
+    }
+
+    protected class EchoService extends java.lang.Thread {
+        protected String destName;
+        protected Connection jmsConn;
+        protected Session sess;
+        protected MessageConsumer msg_cons;
+        protected boolean Shutdown_ind;
+
+        protected Destination req_dest;
+        protected Destination resp_dest;
+        protected MessageProducer msg_prod;
+
+        protected CountDownLatch waitShutdown;
+
+        public EchoService(String dest, Connection broker_conn) throws Exception {
+            destName = dest;
+            jmsConn = broker_conn;
+
+            Shutdown_ind = false;
+
+            sess = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            req_dest = sess.createQueue(destName);
+            msg_cons = sess.createConsumer(req_dest);
+
+            jmsConn.start();
+
+            waitShutdown = new CountDownLatch(1);
+        }
+
+        public EchoService(String dest, String broker_url) throws Exception {
+            this(dest, ActiveMQConnection.makeConnection(broker_url));
+        }
+
+        public void run() {
+            Message req;
+
+            try {
+                LOG.info("STARTING ECHO SERVICE");
+
+                while (!Shutdown_ind) {
+                    req = msg_cons.receive(100);
+                    if (req != null) {
+                        if (LOG.isDebugEnabled())
+                            LOG.debug("ECHO request message " + req.toString());
+
+                        resp_dest = req.getJMSReplyTo();
+                        if (resp_dest != null) {
+                            msg_prod = sess.createProducer(resp_dest);
+                            msg_prod.send(req);
+                            msg_prod.close();
+                            msg_prod = null;
+                        } else {
+                            LOG.warn("invalid request: no reply-to destination given");
+                        }
+                    }
+                }
+            } catch (Exception ex) {
+                LOG.error(null, ex);
+            } finally {
+                LOG.info("shutting down test echo service");
+
+                try {
+                    jmsConn.stop();
+                } catch (javax.jms.JMSException jms_exc) {
+                    LOG.warn("error on shutting down JMS connection", jms_exc);
+                }
+
+                synchronized (this) {
+                    waitShutdown.countDown();
+                }
+            }
+        }
+
+        /**
+         * Shut down the service, waiting up to 3 seconds for the service to
+         * terminate.
+         */
+        public void shutdown() {
+            CountDownLatch wait_l;
+
+            synchronized (this) {
+                wait_l = waitShutdown;
+            }
+
+            Shutdown_ind = true;
+
+            try {
+                if (wait_l != null) {
+                    if (wait_l.await(3000, TimeUnit.MILLISECONDS)) {
+                        LOG.info("echo service shutdown complete");
+                    } else {
+                        LOG.warn("timeout waiting for echo service shutdown");
+                    }
+                } else {
+                    LOG.info("echo service shutdown: service does not appear to be active");
+                }
+            } catch (InterruptedException int_exc) {
+                LOG.warn("interrupted while waiting for echo service shutdown");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java
new file mode 100644
index 0000000..a1e9b93
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.virtual.MirroredQueue;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ3324Test {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3324Test.class);
+
+    private static final String bindAddress = "tcp://0.0.0.0:0";
+    private BrokerService broker;
+    private ActiveMQConnectionFactory cf;
+
+    private static final int MESSAGE_COUNT = 100;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = this.createBroker();
+        String address = broker.getTransportConnectors().get(0).getPublishableConnectString();
+        broker.start();
+        broker.waitUntilStarted();
+
+        cf = new ActiveMQConnectionFactory(address);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void testTempMessageConsumedAdvisoryConnectionClose() throws Exception {
+
+        Connection connection = cf.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        final TemporaryQueue queue = session.createTemporaryQueue();
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        final Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue);
+
+        MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic);
+        MessageProducer producer = session.createProducer(queue);
+
+        // send lots of messages to the tempQueue
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            BytesMessage m = session.createBytesMessage();
+            m.writeBytes(new byte[1024]);
+            producer.send(m);
+        }
+
+        // consume one message from tempQueue
+        Message msg = consumer.receive(5000);
+        assertNotNull(msg);
+
+        // check one advisory message has produced on the advisoryTopic
+        Message advCmsg = advisoryConsumer.receive(5000);
+        assertNotNull(advCmsg);
+
+        connection.close();
+        LOG.debug("Connection closed, destinations should now become inactive.");
+
+        assertTrue("The destination " + advisoryTopic + "was not removed. ", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker.getAdminView().getTopics().length == 0;
+            }
+        }));
+
+        assertTrue("The destination " + queue + " was not removed. ", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker.getAdminView().getTemporaryQueues().length == 0;
+            }
+        }));
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setUseMirroredQueues(true);
+        answer.setPersistent(false);
+        answer.setSchedulePeriodForDestinationPurge(1000);
+
+        PolicyEntry entry = new PolicyEntry();
+        entry.setGcInactiveDestinations(true);
+        entry.setInactiveTimoutBeforeGC(2000);
+        entry.setProducerFlowControl(true);
+        entry.setAdvisoryForConsumed(true);
+        entry.setAdvisoryForFastProducers(true);
+        entry.setAdvisoryForDelivery(true);
+        PolicyMap map = new PolicyMap();
+        map.setDefaultEntry(entry);
+
+        MirroredQueue mirrorQ = new MirroredQueue();
+        mirrorQ.setCopyMessage(true);
+        DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{mirrorQ};
+        answer.setDestinationInterceptors(destinationInterceptors);
+
+        answer.setDestinationPolicy(map);
+        answer.addConnector(bindAddress);
+
+        return answer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java
new file mode 100644
index 0000000..aa84d2d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ3352Test
+{
+    TransportConnector connector;
+     BrokerService brokerService;
+
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        connector = brokerService.addConnector("tcp://0.0.0.0:0");
+        brokerService.start();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        brokerService.stop();
+    }
+
+   @Test
+   public void verifyEnqueueLargeNumWithStateTracker() throws Exception {
+        String url = "failover:(" + connector.getPublishableConnectString() + ")?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=131072";
+
+        ActiveMQConnection conn = (ActiveMQConnection)new ActiveMQConnectionFactory(url).createConnection(null, null);
+
+        Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(session.createQueue("EVENTQ"));
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        producer.setDisableMessageID(true);
+        producer.setDisableMessageTimestamp(true);
+
+        StringBuffer buffer = new StringBuffer();
+        for (int i=0;i<1024;i++)
+        {
+            buffer.append(String.valueOf(Math.random()));
+        }
+        String payload = buffer.toString();
+
+       for (int i=0; i<10000; i++) {
+            StringBuffer buff = new StringBuffer("x");
+            buff.append(payload);
+            producer.send(session.createTextMessage(buff.toString()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java
new file mode 100644
index 0000000..9711d06
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ3405Test extends TestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ3405Test.class);
+
+    private Connection connection;
+    private Session session;
+    private MessageConsumer consumer;
+    private MessageProducer producer;
+    private int deliveryMode = DeliveryMode.PERSISTENT;
+    private Destination dlqDestination;
+    private MessageConsumer dlqConsumer;
+    private BrokerService broker;
+
+    private int messageCount;
+    private Destination destination;
+    private int rollbackCount;
+    private Session dlqSession;
+    private final Error[] error = new Error[1];
+    private boolean topic = true;
+    private boolean durableSubscriber = true;
+
+    public void testTransientTopicMessage() throws Exception {
+        topic = true;
+        deliveryMode = DeliveryMode.NON_PERSISTENT;
+        durableSubscriber = true;
+        doTest();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        PolicyEntry policy = new PolicyEntry();
+        DeadLetterStrategy defaultDeadLetterStrategy = policy.getDeadLetterStrategy();
+        if(defaultDeadLetterStrategy!=null) {
+            defaultDeadLetterStrategy.setProcessNonPersistent(true);
+        }
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+        return broker;
+    }
+
+    protected void doTest() throws Exception {
+        messageCount = 200;
+        connection.start();
+
+        final QueueViewMBean dlqView = getProxyToDLQ();
+
+        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
+        rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
+        LOG.info("Will redeliver messages: " + rollbackCount + " times");
+
+        makeConsumer();
+        makeDlqConsumer();
+        dlqConsumer.close();
+
+        sendMessages();
+
+        // now lets receive and rollback N times
+        int maxRollbacks = messageCount * rollbackCount;
+
+        consumer.setMessageListener(new RollbackMessageListener(maxRollbacks, rollbackCount));
+
+        // We receive and rollback into the DLQ N times moving the DLQ messages back to their
+        // original Q to test that they are continually placed back in the DLQ.
+        for (int i = 0; i < 2; ++i) {
+
+            assertTrue("DLQ was not filled as expected", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return dlqView.getQueueSize() == messageCount;
+                }
+            }));
+
+            connection.stop();
+
+            assertEquals("DLQ should be full now.", messageCount, dlqView.getQueueSize());
+
+            String moveTo;
+            if (topic) {
+                moveTo = "topic://" + ((Topic) getDestination()).getTopicName();
+            } else {
+                moveTo = "queue://" + ((Queue) getDestination()).getQueueName();
+            }
+
+            LOG.debug("Moving " + messageCount + " messages from ActiveMQ.DLQ to " + moveTo);
+            dlqView.moveMatchingMessagesTo("", moveTo);
+
+            assertTrue("DLQ was not emptied as expected", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return dlqView.getQueueSize() == 0;
+                }
+            }));
+
+            connection.start();
+        }
+    }
+
+    protected void makeConsumer() throws JMSException {
+        Destination destination = getDestination();
+        LOG.info("Consuming from: " + destination);
+        if (durableSubscriber) {
+            consumer = session.createDurableSubscriber((Topic)destination, destination.toString());
+        } else {
+            consumer = session.createConsumer(destination);
+        }
+    }
+
+    protected void makeDlqConsumer() throws JMSException {
+        dlqDestination = createDlqDestination();
+
+        LOG.info("Consuming from dead letter on: " + dlqDestination);
+        dlqConsumer = dlqSession.createConsumer(dlqDestination);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+
+        connection = createConnection();
+        connection.setClientID(createClientId());
+
+        session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        connection.start();
+
+        dlqSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        dlqConsumer.close();
+        dlqSession.close();
+        session.close();
+
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    };
+
+    @Override
+    protected ActiveMQConnectionFactory createConnectionFactory()
+            throws Exception {
+        ActiveMQConnectionFactory answer = super.createConnectionFactory();
+        RedeliveryPolicy policy = new RedeliveryPolicy();
+        policy.setMaximumRedeliveries(3);
+        policy.setBackOffMultiplier((short) 1);
+        policy.setRedeliveryDelay(0);
+        policy.setInitialRedeliveryDelay(0);
+        policy.setUseExponentialBackOff(false);
+        answer.setRedeliveryPolicy(policy);
+        return answer;
+    }
+
+    protected void sendMessages() throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(getDestination());
+        producer.setDeliveryMode(deliveryMode);
+
+        LOG.info("Sending " + messageCount + " messages to: " + getDestination());
+        for (int i = 0; i < messageCount; i++) {
+            Message message = createMessage(session, i);
+            producer.send(message);
+        }
+    }
+
+    protected TextMessage createMessage(Session session, int i) throws JMSException {
+        return session.createTextMessage(getMessageText(i));
+    }
+
+    protected String getMessageText(int i) {
+        return "message: " + i;
+    }
+
+    protected Destination createDlqDestination() {
+        return new ActiveMQQueue("ActiveMQ.DLQ");
+    }
+
+    private QueueViewMBean getProxyToDLQ() throws MalformedObjectNameException, JMSException {
+        ObjectName queueViewMBeanName = new ObjectName(
+            "org.apache.activemq:type=Broker,brokerName=localhost," +
+            "destinationType=Queue,destinationName=ActiveMQ.DLQ");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
+
+    protected Destination getDestination() {
+        if (destination == null) {
+            destination = createDestination();
+        }
+        return destination;
+    }
+
+    protected String createClientId() {
+        return toString();
+    }
+
+    class RollbackMessageListener implements MessageListener {
+
+        final int maxRollbacks;
+        final int deliveryCount;
+        final AtomicInteger rollbacks = new AtomicInteger();
+
+        RollbackMessageListener(int c, int delvery) {
+            maxRollbacks = c;
+            deliveryCount = delvery;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                int expectedMessageId = rollbacks.get() / deliveryCount;
+                LOG.info("expecting messageId: " + expectedMessageId);
+                rollbacks.incrementAndGet();
+                session.rollback();
+            } catch (Throwable e) {
+                LOG.error("unexpected exception:" + e, e);
+                // propagating assertError to execution task will cause a hang
+                // at shutdown
+                if (e instanceof Error) {
+                    error[0] = (Error) e;
+                } else {
+                    fail("unexpected exception: " + e);
+                }
+            }
+        }
+    }
+}


Mime
View raw message