activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [20/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:36:51 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
new file mode 100644
index 0000000..65e0783
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ3436Test {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(AMQ3436Test.class);
+
+    private BrokerService broker;
+    private PersistenceAdapter adapter;
+    private boolean useCache = true;
+    private boolean prioritizeMessages = true;
+
+    protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setConcurrentStoreAndDispatchQueues(false);
+        adapter.setConcurrentStoreAndDispatchTopics(false);
+        adapter.deleteAllMessages();
+        return adapter;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setBrokerName("priorityTest");
+        broker.setAdvisorySupport(false);
+        broker.setUseJmx(false);
+        adapter = createPersistenceAdapter(true);
+        broker.setPersistenceAdapter(adapter);
+        PolicyEntry policy = new PolicyEntry();
+        policy.setPrioritizedMessages(prioritizeMessages);
+        policy.setUseCache(useCache);
+        policy.setProducerFlowControl(false);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.put(new ActiveMQQueue("TEST"), policy);
+
+        // do not process expired for one test
+        PolicyEntry ignoreExpired = new PolicyEntry();
+        SharedDeadLetterStrategy ignoreExpiredStrategy = new SharedDeadLetterStrategy();
+        ignoreExpiredStrategy.setProcessExpired(false);
+        ignoreExpired.setDeadLetterStrategy(ignoreExpiredStrategy);
+
+        broker.setDestinationPolicy(policyMap);
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    protected void tearDown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test
+    public void testPriorityWhenConsumerCreatedBeforeProduction() throws Exception {
+
+        int messageCount = 200;
+        URI failoverUri = new URI("vm://priorityTest?jms.prefetchPolicy.all=1");
+
+        ActiveMQQueue dest = new ActiveMQQueue("TEST?consumer.dispatchAsync=false");
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(failoverUri);
+        cf.setDispatchAsync(false);
+
+        // Create producer
+        ActiveMQConnection producerConnection = (ActiveMQConnection) cf.createConnection();
+        producerConnection.setMessagePrioritySupported(true);
+        producerConnection.start();
+        final Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = producerSession.createProducer(dest);
+
+        ActiveMQMessageConsumer consumer;
+
+        // Create consumer on separate connection
+        ActiveMQConnection consumerConnection = (ActiveMQConnection) cf.createConnection();
+        consumerConnection.setMessagePrioritySupported(true);
+        consumerConnection.start();
+        final ActiveMQSession consumerSession = (ActiveMQSession) consumerConnection.createSession(true,
+                Session.SESSION_TRANSACTED);
+        consumer = (ActiveMQMessageConsumer) consumerSession.createConsumer(dest);
+
+        // Produce X number of messages with a session commit after each message
+        Random random = new Random();
+        for (int i = 0; i < messageCount; ++i) {
+
+            Message message = producerSession.createTextMessage("Test message #" + i);
+            producer.send(message, DeliveryMode.PERSISTENT, random.nextInt(10), 45*1000);
+            producerSession.commit();
+        }
+        producer.close();
+
+        // ***************************************************
+        // If we create the consumer here instead of above, the
+        // the messages will be consumed in priority order
+        // ***************************************************
+        //consumer = (ActiveMQMessageConsumer) consumerSession.createConsumer(dest);
+
+        // Consume all of the messages we produce using a listener.
+        // Don't exit until we get all the messages.
+        final CountDownLatch latch = new CountDownLatch(messageCount);
+        final StringBuffer failureMessage = new StringBuffer();
+        consumer.setMessageListener(new MessageListener() {
+            int lowestPrioritySeen = 10;
+
+            boolean firstMessage = true;
+
+            public void onMessage(Message msg) {
+                try {
+
+                    int currentPriority = msg.getJMSPriority();
+                    LOG.debug(currentPriority + "<=" + lowestPrioritySeen);
+
+                    // Ignore the first message priority since it is prefetched
+                    // and is out of order by design
+                    if (firstMessage == true) {
+                        firstMessage = false;
+                        LOG.debug("Ignoring first message since it was prefetched");
+
+                    } else {
+
+                        // Verify that we never see a priority higher than the
+                        // lowest
+                        // priority seen
+                        if (lowestPrioritySeen > currentPriority) {
+                            lowestPrioritySeen = currentPriority;
+                        }
+                        if (lowestPrioritySeen < currentPriority) {
+                            failureMessage.append("Incorrect priority seen (Lowest Priority = " + lowestPrioritySeen
+                                    + " Current Priority = " + currentPriority + ")"
+                                    + System.getProperty("line.separator"));
+                        }
+                    }
+
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                } finally {
+                    latch.countDown();
+                    LOG.debug("Messages remaining = " + latch.getCount());
+                }
+            }
+        });
+
+        latch.await();
+        consumer.close();
+
+        // Cleanup producer resources
+        producerSession.close();
+        producerConnection.stop();
+        producerConnection.close();
+
+        // Cleanup consumer resources
+        consumerSession.close();
+        consumerConnection.stop();
+        consumerConnection.close();
+
+        // Report the failure if found
+        if (failureMessage.length() > 0) {
+            Assert.fail(failureMessage.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java
new file mode 100644
index 0000000..73035e2
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ3445Test {
+
+    private ConnectionFactory connectionFactory;
+    private BrokerService broker;
+    private String connectionUri;
+
+    private final String queueName = "Consumer.MyApp.VirtualTopic.FOO";
+    private final String topicName = "VirtualTopic.FOO";
+
+    @Before
+    public void startBroker() throws Exception {
+        createBroker(true);
+    }
+
+    private void createBroker(boolean deleteMessages) throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(deleteMessages);
+        broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
+        broker.setAdvisorySupport(false);
+        broker.addConnector("tcp://0.0.0.0:0");
+        broker.start();
+        broker.waitUntilStarted();
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+        connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    private void restartBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+
+        createBroker(false);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void testJDBCRetiansDestinationAfterRestart() throws Exception {
+
+        broker.getAdminView().addQueue(queueName);
+        broker.getAdminView().addTopic(topicName);
+
+        assertTrue(findDestination(queueName, false));
+        assertTrue(findDestination(topicName, true));
+
+        QueueViewMBean queue = getProxyToQueueViewMBean();
+        assertEquals(0, queue.getQueueSize());
+
+        restartBroker();
+
+        assertTrue(findDestination(queueName, false));
+        queue = getProxyToQueueViewMBean();
+        assertEquals(0, queue.getQueueSize());
+
+        sendMessage();
+        restartBroker();
+        assertTrue(findDestination(queueName, false));
+
+        queue = getProxyToQueueViewMBean();
+        assertEquals(1, queue.getQueueSize());
+        sendMessage();
+        assertEquals(2, queue.getQueueSize());
+
+        restartBroker();
+        assertTrue(findDestination(queueName, false));
+        queue = getProxyToQueueViewMBean();
+        assertEquals(2, queue.getQueueSize());
+    }
+
+    private void sendMessage() throws Exception {
+        Connection connection = connectionFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(session.createTopic(topicName));
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        producer.send(session.createTextMessage("Testing"));
+        producer.close();
+        connection.close();
+    }
+
+    private QueueViewMBean getProxyToQueueViewMBean() throws Exception {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq"
+                + ":destinationType=Queue,destinationName=" + queueName
+                + ",type=Broker,brokerName=localhost");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
+
+    private boolean findDestination(String name, boolean topic) throws Exception {
+
+        ObjectName[] destinations;
+
+        if (topic) {
+            destinations = broker.getAdminView().getTopics();
+        } else {
+            destinations = broker.getAdminView().getQueues();
+        }
+
+        for (ObjectName destination : destinations) {
+            if (destination.toString().contains(name)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java
new file mode 100644
index 0000000..99c12fc
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ3454Test extends TestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ3454Test.class);
+    private static final int MESSAGES_COUNT = 10000;
+
+    public void testSendWithLotsOfDestinations() throws Exception {
+        final BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        broker.addConnector("tcp://localhost:0");
+
+        // populate a bunch of destinations, validate the impact on a call to send
+        ActiveMQDestination[] destinations = new ActiveMQDestination[MESSAGES_COUNT];
+        for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
+            destinations[idx] = new ActiveMQQueue(getDestinationName() + "-" + idx);
+        }
+        broker.setDestinations(destinations);
+        broker.start();
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                broker.getTransportConnectors().get(0).getPublishableConnectString());
+        final Connection connection = factory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(new ActiveMQQueue(getDestinationName()));
+
+        long start = System.currentTimeMillis();
+        for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
+            Message message = session.createTextMessage("" + idx);
+            producer.send(message);
+        }
+        LOG.info("Duration: " + (System.currentTimeMillis() - start) + " millis");
+        producer.close();
+        session.close();
+
+    }
+
+    protected String getDestinationName() {
+        return getClass().getName() + "." + getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java
new file mode 100644
index 0000000..bac3829
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageProducer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ3465Test
+{
+    private final String xaDestinationName = "DestinationXA";
+    private final String destinationName = "Destination";
+    private BrokerService broker;
+    private String connectionUri;
+    private long txGenerator = System.currentTimeMillis();
+
+    private XAConnectionFactory xaConnectionFactory;
+    private ConnectionFactory connectionFactory;
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://0.0.0.0:0");
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+
+        connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+        xaConnectionFactory = new ActiveMQXAConnectionFactory(connectionUri);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+   @Test
+   public void testMixedXAandNonXAorTXSessions() throws Exception {
+
+       XAConnection xaConnection = xaConnectionFactory.createXAConnection();
+       xaConnection.start();
+       XASession session = xaConnection.createXASession();
+       XAResource resource = session.getXAResource();
+       Destination dest = new ActiveMQQueue(xaDestinationName);
+
+       // publish a message
+       Xid tid = createXid();
+       resource.start(tid, XAResource.TMNOFLAGS);
+       MessageProducer producer = session.createProducer(dest);
+       ActiveMQTextMessage message  = new ActiveMQTextMessage();
+       message.setText("Some Text");
+       producer.send(message);
+       resource.end(tid, XAResource.TMSUCCESS);
+       resource.commit(tid, true);
+       session.close();
+
+       session = xaConnection.createXASession();
+       MessageConsumer consumer = session.createConsumer(dest);
+       tid = createXid();
+       resource = session.getXAResource();
+       resource.start(tid, XAResource.TMNOFLAGS);
+       TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+       assertNotNull(receivedMessage);
+       assertEquals("Some Text", receivedMessage.getText());
+       resource.end(tid, XAResource.TMSUCCESS);
+
+       // Test that a normal session doesn't operate on XASession state.
+       Connection connection2 = connectionFactory.createConnection();
+       connection2.start();
+       ActiveMQSession session2 = (ActiveMQSession) connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+       if (session2.isTransacted()) {
+           session2.rollback();
+       }
+
+       session2.close();
+
+       resource.commit(tid, true);
+   }
+
+   @Test
+   public void testMixedXAandNonXALocalTXSessions() throws Exception {
+
+       XAConnection xaConnection = xaConnectionFactory.createXAConnection();
+       xaConnection.start();
+       XASession session = xaConnection.createXASession();
+       XAResource resource = session.getXAResource();
+       Destination dest = new ActiveMQQueue(xaDestinationName);
+
+       // publish a message
+       Xid tid = createXid();
+       resource.start(tid, XAResource.TMNOFLAGS);
+       MessageProducer producer = session.createProducer(dest);
+       ActiveMQTextMessage message  = new ActiveMQTextMessage();
+       message.setText("Some Text");
+       producer.send(message);
+       resource.end(tid, XAResource.TMSUCCESS);
+       resource.commit(tid, true);
+       session.close();
+
+       session = xaConnection.createXASession();
+       MessageConsumer consumer = session.createConsumer(dest);
+       tid = createXid();
+       resource = session.getXAResource();
+       resource.start(tid, XAResource.TMNOFLAGS);
+       TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+       assertNotNull(receivedMessage);
+       assertEquals("Some Text", receivedMessage.getText());
+       resource.end(tid, XAResource.TMSUCCESS);
+
+       // Test that a normal session doesn't operate on XASession state.
+       Connection connection2 = connectionFactory.createConnection();
+       connection2.start();
+       ActiveMQSession session2 = (ActiveMQSession) connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);
+       Destination destination = new ActiveMQQueue(destinationName);
+       ActiveMQMessageProducer producer2 = (ActiveMQMessageProducer) session2.createProducer(destination);
+       producer2.send(session2.createTextMessage("Local-TX"));
+
+       if (session2.isTransacted()) {
+           session2.rollback();
+       }
+
+       session2.close();
+
+       resource.commit(tid, true);
+   }
+
+   public Xid createXid() throws IOException {
+
+       ByteArrayOutputStream baos = new ByteArrayOutputStream();
+       DataOutputStream os = new DataOutputStream(baos);
+       os.writeLong(++txGenerator);
+       os.close();
+       final byte[] bs = baos.toByteArray();
+
+       return new Xid() {
+           public int getFormatId() {
+               return 86;
+           }
+
+           public byte[] getGlobalTransactionId() {
+               return bs;
+           }
+
+           public byte[] getBranchQualifier() {
+               return bs;
+           }
+       };
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java
new file mode 100644
index 0000000..7e8b9d0
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.activemq.bugs;
+
+import java.util.Properties;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ3529Test {
+
+    private static Logger LOG = LoggerFactory.getLogger(AMQ3529Test.class);
+
+    private ConnectionFactory connectionFactory;
+    private Connection connection;
+    private Session session;
+    private BrokerService broker;
+    private String connectionUri;
+    private MessageConsumer consumer;
+    private Context ctx = null;
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://0.0.0.0:0");
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+
+        connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test(timeout = 60000)
+    public void testInterruptionAffects() throws Exception {
+        ThreadGroup tg = new ThreadGroup("tg");
+
+        assertEquals(0, tg.activeCount());
+
+        Thread client = new Thread(tg, "client") {
+
+            @Override
+            public void run() {
+                try {
+                    connection = connectionFactory.createConnection();
+                    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    assertNotNull(session);
+
+                    Properties props = new Properties();
+                    props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+                    props.setProperty(Context.PROVIDER_URL, "tcp://0.0.0.0:0");
+                    ctx = null;
+                    try {
+                        ctx = new InitialContext(props);
+                    } catch (NoClassDefFoundError e) {
+                        throw new NamingException(e.toString());
+                    } catch (Exception e) {
+                        throw new NamingException(e.toString());
+                    }
+                    Destination destination = (Destination) ctx.lookup("dynamicTopics/example.C");
+                    consumer = session.createConsumer(destination);
+                    consumer.receive(10000);
+                } catch (Exception e) {
+                    // Expect an exception here from the interrupt.
+                } finally {
+                    // next line is the nature of the test, if I remove this
+                    // line, everything works OK
+                    try {
+                        consumer.close();
+                    } catch (JMSException e) {
+                        fail("Consumer Close failed with" + e.getMessage());
+                    }
+                    try {
+                        session.close();
+                    } catch (JMSException e) {
+                        fail("Session Close failed with" + e.getMessage());
+                    }
+                    try {
+                        connection.close();
+                    } catch (JMSException e) {
+                        fail("Connection Close failed with" + e.getMessage());
+                    }
+                    try {
+                        ctx.close();
+                    } catch (Exception e) {
+                        fail("Connection Close failed with" + e.getMessage());
+                    }
+                }
+            }
+        };
+        client.start();
+        Thread.sleep(5000);
+        client.interrupt();
+        client.join();
+        Thread.sleep(2000);
+        Thread[] remainThreads = new Thread[tg.activeCount()];
+        tg.enumerate(remainThreads);
+        for (Thread t : remainThreads) {
+            if (t.isAlive() && !t.isDaemon())
+                fail("Remaining thread: " + t.toString());
+        }
+
+        ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
+        while (root.getParent() != null) {
+            root = root.getParent();
+        }
+        visit(root, 0);
+    }
+
+    // This method recursively visits all thread groups under `group'.
+    public static void visit(ThreadGroup group, int level) {
+        // Get threads in `group'
+        int numThreads = group.activeCount();
+        Thread[] threads = new Thread[numThreads * 2];
+        numThreads = group.enumerate(threads, false);
+
+        // Enumerate each thread in `group'
+        for (int i = 0; i < numThreads; i++) {
+            // Get thread
+            Thread thread = threads[i];
+            LOG.debug("Thread:" + thread.getName() + " is still running");
+        }
+
+        // Get thread subgroups of `group'
+        int numGroups = group.activeGroupCount();
+        ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
+        numGroups = group.enumerate(groups, false);
+
+        // Recursively visit each subgroup
+        for (int i = 0; i < numGroups; i++) {
+            visit(groups[i], level + 1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java
new file mode 100644
index 0000000..fe8e3fd
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Quick port to java to support AMQ build.
+ *
+ * This test demonstrates the classloader problem in the
+ * ClassLoadingAwareObjectInputStream impl. If the first interface in the proxy
+ * interfaces list is JDK and there are any subsequent interfaces that are NOT
+ * JDK interfaces the ClassLoadingAwareObjectInputStream will ignore their
+ * respective classloaders and cause the Proxy to throw an
+ * IllegalArgumentException because the core JDK classloader can't load the
+ * interfaces that are not JDK interfaces.
+ *
+ * See AMQ-3537
+ *
+ * @author jason.yankus
+ *
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class AMQ3537Test implements InvocationHandler, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * If the first and second element in this array are swapped, the test will
+     * fail.
+     */
+    public static final Class[] TEST_CLASSES = new Class[] { List.class, NonJDKList.class, Serializable.class };
+
+    /** Underlying list */
+    private final List l = new ArrayList<String>();
+
+    @Before
+    public void setUp() throws Exception {
+        l.add("foo");
+    }
+
+    @Test
+    public void testDeserializeProxy() throws Exception {
+        // create the proxy
+        List proxy = (List) java.lang.reflect.Proxy.newProxyInstance(this.getClass().getClassLoader(), TEST_CLASSES, this);
+
+        // serialize it
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(proxy);
+        byte[] serializedProxy = baos.toByteArray();
+        oos.close();
+        baos.close();
+
+        // deserialize the proxy
+        ClassLoadingAwareObjectInputStream claois =
+            new ClassLoadingAwareObjectInputStream(new ByteArrayInputStream(serializedProxy));
+
+        // this is where it fails due to the rudimentary classloader selection
+        // in ClassLoadingAwareObjectInputStream
+        List deserializedProxy = (List) claois.readObject();
+
+        claois.close();
+
+        // assert the invocation worked
+        assertEquals("foo", deserializedProxy.get(0));
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+        return method.invoke(l, args);
+    }
+
+    public interface NonJDKList {
+        int size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java
new file mode 100644
index 0000000..b4ce82f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Claudio Corsi
+ *
+ */
+public class AMQ3567Test {
+
+    private static Logger logger = LoggerFactory.getLogger(AMQ3567Test.class);
+
+    private ActiveMQConnectionFactory factory;
+    private Connection connection;
+    private Session sessionWithListener, session;
+    private Queue destination;
+    private MessageConsumer consumer;
+    private Thread thread;
+    private BrokerService broker;
+    private String connectionUri;
+
+    /**
+     * @throws java.lang.Exception
+     */
+    @Before
+    public void setUp() throws Exception {
+        startBroker();
+        initializeConsumer();
+        startConsumer();
+    }
+
+    @Test
+    public void runTest() throws Exception {
+        produceSingleMessage();
+        org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger("org.apache.activemq.util.ServiceSupport");
+        final AtomicBoolean failed = new AtomicBoolean(false);
+
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getThrowableInformation() != null) {
+                    if (event.getThrowableInformation().getThrowable() instanceof InterruptedException) {
+                        InterruptedException ie = (InterruptedException)event.getThrowableInformation().getThrowable();
+                        if (ie.getMessage().startsWith("Could not stop service:")) {
+                            logger.info("Received an interrupted exception : ", ie);
+                            failed.set(true);
+                        }
+                    }
+                }
+            }
+        };
+        log4jLogger.addAppender(appender);
+
+        Level level = log4jLogger.getLevel();
+        log4jLogger.setLevel(Level.DEBUG);
+
+        try {
+            stopConsumer();
+            stopBroker();
+            if (failed.get()) {
+                fail("An Interrupt exception was generated");
+            }
+
+        } finally {
+            log4jLogger.setLevel(level);
+            log4jLogger.removeAppender(appender);
+        }
+    }
+
+    private void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDataDirectory("target/data");
+        connectionUri = broker.addConnector("tcp://localhost:0?wireFormat.maxInactivityDuration=30000&transport.closeAsync=false&transport.threadName&soTimeout=60000&transport.keepAlive=false&transport.useInactivityMonitor=false").getPublishableConnectString();
+        broker.start(true);
+        broker.waitUntilStarted();
+    }
+
+    private void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    private void initializeConsumer() throws JMSException {
+        logger.info("Initializing the consumer messagor that will just not do anything....");
+        factory = new ActiveMQConnectionFactory();
+        factory.setBrokerURL("failover:("+connectionUri+"?wireFormat.maxInactivityDuration=30000&keepAlive=true&soTimeout=60000)?jms.watchTopicAdvisories=false&jms.useAsyncSend=false&jms.dispatchAsync=true&jms.producerWindowSize=10485760&jms.copyMessageOnSend=false&jms.disableTimeStampsByDefault=true&InitialReconnectDelay=1000&maxReconnectDelay=10000&maxReconnectAttempts=400&useExponentialBackOff=true");
+        connection = factory.createConnection();
+        connection.start();
+        sessionWithListener = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = sessionWithListener.createQueue("EMPTY.QUEUE");
+    }
+
+    private void startConsumer() throws Exception {
+        logger.info("Starting the consumer");
+        consumer = sessionWithListener.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                logger.info("Received a message: " + message);
+            }
+
+        });
+
+        thread = new Thread(new Runnable() {
+
+            private Session session;
+
+            @Override
+            public void run() {
+                try {
+                    destination = session.createQueue("EMPTY.QUEUE");
+                    MessageConsumer consumer = session.createConsumer(destination);
+                    for (int cnt = 0; cnt < 2; cnt++) {
+                        Message message = consumer.receive(50000);
+                        logger.info("Received message: " + message);
+                    }
+                } catch (JMSException e) {
+                    logger.debug("Received an exception while processing messages", e);
+                } finally {
+                    try {
+                        session.close();
+                    } catch (JMSException e) {
+                        logger.debug("Received an exception while closing session", e);
+                    }
+                }
+            }
+
+            public Runnable setSession(Session session) {
+                this.session = session;
+                return this;
+            }
+
+        }.setSession(session)) {
+            {
+                start();
+            }
+        };
+    }
+
+    private void stopConsumer() throws JMSException {
+        logger.info("Stopping the consumer");
+        try {
+            thread.join();
+        } catch (InterruptedException e) {
+            logger.debug("Received an exception while waiting for thread to complete", e);
+        }
+        if (sessionWithListener != null) {
+            sessionWithListener.close();
+        }
+        if (connection != null) {
+            connection.stop();
+        }
+    }
+
+    private void produceSingleMessage() throws JMSException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+        factory.setBrokerURL(connectionUri);
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue("EMPTY.QUEUE");
+        MessageProducer producer = session.createProducer(destination);
+        producer.send(session.createTextMessage("Single Message"));
+        producer.close();
+        session.close();
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java
new file mode 100644
index 0000000..e08279c
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.FilePendingSubscriberMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompConnection;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ3622Test {
+
+    protected BrokerService broker;
+    protected AtomicBoolean failed = new AtomicBoolean(false);
+    protected String connectionUri;
+    protected Appender appender = new DefaultTestAppender() {
+
+        @Override
+        public void doAppend(LoggingEvent event) {
+            System.err.println(event.getMessage());
+            if (event.getThrowableInformation() != null) {
+                if (event.getThrowableInformation().getThrowable() instanceof NullPointerException) {
+                    failed.set(true);
+                }
+            }
+        }
+    };
+
+    @Before
+    public void before() throws Exception {
+        Logger.getRootLogger().addAppender(appender);
+
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "activemq-data");
+        broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        PolicyEntry policy = new PolicyEntry();
+        policy.setTopic(">");
+        policy.setProducerFlowControl(false);
+        policy.setMemoryLimit(1 * 1024 * 1024);
+        policy.setPendingSubscriberPolicy(new FilePendingSubscriberMessageStoragePolicy());
+        policy.setSubscriptionRecoveryPolicy(new LastImageSubscriptionRecoveryPolicy());
+        policy.setExpireMessagesPeriod(500);
+        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
+
+        entries.add(policy);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setPolicyEntries(entries);
+        broker.setDestinationPolicy(pMap);
+
+        connectionUri = broker.addConnector("stomp://localhost:0").getPublishableConnectString();
+
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void after() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+        Logger.getRootLogger().removeAppender(appender);
+    }
+
+    @Test
+    public void go() throws Exception {
+        StompConnection connection = new StompConnection();
+        Integer port = Integer.parseInt(connectionUri.split(":")[2]);
+        connection.open("localhost", port);        
+        connection.connect("", "");
+        connection.subscribe("/topic/foobar", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
+        connection.disconnect();
+        Thread.sleep(1000);
+
+        if (failed.get()) {
+            fail("Received NullPointerException");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java
new file mode 100644
index 0000000..a386202
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ */
+
+public class AMQ3625Test {
+    
+    protected BrokerService broker1;
+    protected BrokerService broker2;
+    
+    protected AtomicBoolean authenticationFailed = new AtomicBoolean(false);
+    protected AtomicBoolean gotNPE = new AtomicBoolean(false);
+
+    protected String java_security_auth_login_config = "java.security.auth.login.config";
+    protected String xbean = "xbean:";
+    protected String base = "src/test/resources/org/apache/activemq/bugs/amq3625";
+    protected String conf = "conf";
+    protected String keys = "keys";
+    protected String JaasStompSSLBroker1_xml = "JaasStompSSLBroker1.xml";
+    protected String JaasStompSSLBroker2_xml = "JaasStompSSLBroker2.xml";
+    
+    protected String oldLoginConf = null;
+
+    @Before
+    public void before() throws Exception {
+        if (System.getProperty(java_security_auth_login_config) != null) {
+            oldLoginConf = System.getProperty(java_security_auth_login_config);
+        }
+        System.setProperty(java_security_auth_login_config, base + "/" + conf + "/" + "login.config");
+        broker1 = BrokerFactory.createBroker(xbean + base + "/" + conf + "/" + JaasStompSSLBroker1_xml);
+        broker2 = BrokerFactory.createBroker(xbean + base + "/" + conf + "/" + JaasStompSSLBroker2_xml);
+        
+        broker1.start();
+        broker1.waitUntilStarted();
+        broker2.start();
+        broker2.waitUntilStarted();
+    }
+
+    @After
+    public void after() throws Exception {
+        broker1.stop();
+        broker2.stop();
+        
+        if (oldLoginConf != null) {
+            System.setProperty(java_security_auth_login_config, oldLoginConf);
+        }
+    }
+    
+    @Test
+    public void go() throws Exception {
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getThrowableInformation() != null) {
+                    Throwable t = event.getThrowableInformation().getThrowable();
+                    if (t instanceof SecurityException) {
+                        authenticationFailed.set(true);
+                    }
+                    if (t instanceof NullPointerException) {
+                        gotNPE.set(true);
+                    }
+                }
+            }
+        };
+        Logger.getRootLogger().addAppender(appender);
+        
+        String connectURI = broker1.getConnectorByName("openwire").getConnectUri().toString();
+        connectURI = connectURI.replace("?needClientAuth=true", "");
+        broker2.addNetworkConnector("static:(" + connectURI + ")").start();
+        
+        Thread.sleep(10 * 1000);
+        
+        Logger.getRootLogger().removeAppender(appender);
+        
+        assertTrue(authenticationFailed.get());
+        assertFalse(gotNPE.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java
new file mode 100644
index 0000000..47ab754
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.*;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ3674Test {
+
+    private static Logger LOG = LoggerFactory.getLogger(AMQ3674Test.class);
+
+    private final static int deliveryMode = DeliveryMode.NON_PERSISTENT;
+    private final static ActiveMQTopic destination = new ActiveMQTopic("XYZ");
+
+    private ActiveMQConnectionFactory factory;
+    private BrokerService broker;
+
+    @Test
+    public void removeSubscription() throws Exception {
+
+        final Connection producerConnection = factory.createConnection();
+        producerConnection.start();
+        final Connection consumerConnection = factory.createConnection();
+
+        consumerConnection.setClientID("subscriber1");
+        Session consumerMQSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        TopicSubscriber activeConsumer = (TopicSubscriber) consumerMQSession.createDurableSubscriber(destination, "myTopic");
+        consumerConnection.start();
+
+        Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(deliveryMode);
+
+        final BrokerView brokerView = broker.getAdminView();
+
+        assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+
+        LOG.info("Current Durable Topic Subscriptions: " + brokerView.getDurableTopicSubscribers().length);
+
+        try {
+            brokerView.destroyDurableSubscriber("subscriber1", "myTopic");
+            fail("Expected Exception for Durable consumer is in use");
+        } catch(Exception e) {
+            LOG.info("Recieved expected exception: " + e.getMessage());
+        }
+
+        LOG.info("Current Durable Topic Subscriptions: " + brokerView.getDurableTopicSubscribers().length);
+
+        assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+
+        activeConsumer.close();
+        consumerConnection.stop();
+
+        assertTrue("The subscription should be in the inactive state.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerView.getInactiveDurableTopicSubscribers().length == 1;
+            }
+        }));
+
+        try {
+            brokerView.destroyDurableSubscriber("subscriber1", "myTopic");
+        } finally {
+            producer.close();
+            producerConnection.close();
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        TransportConnector connector = broker.addConnector("tcp://localhost:0");
+        broker.start();
+
+        factory = new ActiveMQConnectionFactory(connector.getPublishableConnectString());
+        factory.setAlwaysSyncSend(true);
+        factory.setDispatchAsync(false);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java
new file mode 100644
index 0000000..c8e4bf4
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.*;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ3675Test {
+
+    private static Logger LOG = LoggerFactory.getLogger(AMQ3675Test.class);
+
+    private final static int deliveryMode = DeliveryMode.NON_PERSISTENT;
+    private final static ActiveMQTopic destination = new ActiveMQTopic("XYZ");
+
+    private ActiveMQConnectionFactory factory;
+    private BrokerService broker;
+
+    public TopicViewMBean getTopicView() throws Exception {
+        ObjectName destinationName = broker.getAdminView().getTopics()[0];
+        TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
+        return topicView;
+    }
+
+    @Test
+    public void countConsumers() throws Exception {
+
+        final Connection producerConnection = factory.createConnection();
+        producerConnection.start();
+        final Connection consumerConnection = factory.createConnection();
+
+        consumerConnection.setClientID("subscriber1");
+        Session consumerMQSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        TopicSubscriber consumer = consumerMQSession.createDurableSubscriber(destination, "myTopic");
+        consumerConnection.start();
+
+        Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(deliveryMode);
+
+        final BrokerView brokerView = broker.getAdminView();
+        final TopicViewMBean topicView = getTopicView();
+
+        assertTrue("Should have one consumer on topic: ", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return topicView.getConsumerCount() == 1;
+            }
+        }));
+
+        consumer.close();
+
+        assertTrue("Durable consumer should now be inactive.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerView.getInactiveDurableTopicSubscribers().length == 1;
+            }
+        }));
+
+        try {
+            brokerView.removeTopic(destination.getTopicName());
+        } catch (Exception e1) {
+            fail("Unable to remove destination:" + destination.getPhysicalName());
+        }
+
+        assertTrue("Should have no topics on the broker", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerView.getTopics().length == 0;
+            }
+        }));
+
+        try {
+            brokerView.destroyDurableSubscriber("subscriber1", "myTopic");
+        } catch(Exception e) {
+            fail("Exception not expected when attempting to delete Durable consumer.");
+        }
+
+        assertTrue("Should be no durable consumers active or inactive.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerView.getInactiveDurableTopicSubscribers().length == 0 &&
+                       brokerView.getDurableTopicSubscribers().length == 0;
+            }
+        }));
+
+        consumer = consumerMQSession.createDurableSubscriber(destination, "myTopic");
+
+        consumer.close();
+
+        assertTrue("Should be one consumer on the Topic.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("Number of inactive consumers: " + brokerView.getInactiveDurableTopicSubscribers().length);
+                return brokerView.getInactiveDurableTopicSubscribers().length == 1;
+            }
+        }));
+
+        final TopicViewMBean recreatedTopicView = getTopicView();
+
+        assertTrue("Should have one consumer on topic: ", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return recreatedTopicView.getConsumerCount() == 1;
+            }
+        }));
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(true);
+        broker.setAdvisorySupport(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+        TransportConnector connector = broker.addConnector("tcp://localhost:0");
+        broker.start();
+
+        factory = new ActiveMQConnectionFactory(connector.getPublishableConnectString());
+        factory.setAlwaysSyncSend(true);
+        factory.setDispatchAsync(false);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java
new file mode 100644
index 0000000..3c79fcf
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.ServerSocket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQTopicSubscriber;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.fail;
+
+
+public class AMQ3678Test implements MessageListener {
+
+    public int deliveryMode = DeliveryMode.NON_PERSISTENT;
+
+
+    private BrokerService broker;
+
+    AtomicInteger messagesSent = new AtomicInteger(0);
+    AtomicInteger messagesReceived = new AtomicInteger(0);
+
+    ActiveMQTopic destination = new ActiveMQTopic("XYZ");
+
+    int port;
+    int jmxport;
+
+
+    final CountDownLatch latch = new CountDownLatch(2);
+
+
+    public static void main(String[] args) throws Exception {
+
+    }
+
+
+    public static int findFreePort() throws IOException {
+        ServerSocket socket = null;
+
+        try {
+            // 0 is open a socket on any free port
+            socket = new ServerSocket(0);
+            return socket.getLocalPort();
+        } finally {
+            if (socket != null) {
+                socket.close();
+            }
+        }
+    }
+
+
+    @Test
+    public void countConsumers() throws JMSException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:" + port);
+        factory.setAlwaysSyncSend(true);
+        factory.setDispatchAsync(false);
+
+        final Connection producerConnection = factory.createConnection();
+        producerConnection.start();
+
+        final Connection consumerConnection = factory.createConnection();
+
+        consumerConnection.setClientID("subscriber1");
+        Session consumerMQSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        ActiveMQTopicSubscriber activeConsumer = (ActiveMQTopicSubscriber) consumerMQSession.createDurableSubscriber(destination, "myTopic?consumer.prefetchSize=1");
+
+        activeConsumer.setMessageListener(this);
+
+        consumerConnection.start();
+
+
+        final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final MessageProducer producer = producerSession.createProducer(destination);
+        producer.setDeliveryMode(deliveryMode);
+
+        Thread t = new Thread(new Runnable() {
+
+            private boolean done = false;
+
+            public void run() {
+                while (!done) {
+                    if (messagesSent.get() == 50) {
+                        try {
+                            broker.getAdminView().removeTopic(destination.getTopicName());
+                        } catch (Exception e1) {
+                            // TODO Auto-generated catch block
+                            e1.printStackTrace();
+                            System.err.flush();
+                            fail("Unable to remove destination:"
+                                    + destination.getPhysicalName());
+                        }
+                    }
+
+                    try {
+                        producer.send(producerSession.createTextMessage());
+                        int val = messagesSent.incrementAndGet();
+
+                        System.out.println("sent message (" + val + ")");
+                        System.out.flush();
+
+                        if (val == 100) {
+                            done = true;
+                            latch.countDown();
+                            producer.close();
+                            producerSession.close();
+
+                        }
+                    } catch (JMSException e) {
+                        // TODO Auto-generated catch block
+                        e.printStackTrace();
+                    }
+                }
+            }
+        });
+
+        t.start();
+
+        try {
+            if (!latch.await(10, TimeUnit.SECONDS)) {
+                fail("did not receive all the messages");
+            }
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            fail("did not receive all the messages, exception waiting for latch");
+            e.printStackTrace();
+        }
+
+
+//   
+
+
+    }
+
+    @Before
+    public void setUp() throws Exception {
+
+        try {
+            port = findFreePort();
+            jmxport = findFreePort();
+        } catch (Exception e) {
+            fail("Unable to obtain a free port on which to start the broker");
+        }
+
+        System.out.println("Starting broker");
+        System.out.flush();
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        ManagementContext ctx = new ManagementContext(ManagementFactory.getPlatformMBeanServer());
+        ctx.setConnectorPort(jmxport);
+        broker.setManagementContext(ctx);
+        broker.setUseJmx(true);
+//        broker.setAdvisorySupport(false);
+//        broker.setDeleteAllMessagesOnStartup(true);
+
+        broker.addConnector("tcp://localhost:" + port).setName("Default");
+        broker.start();
+
+
+        System.out.println("End of Broker Setup");
+        System.out.flush();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+
+    @Override
+    public void onMessage(Message message) {
+        try {
+            message.acknowledge();
+            int val = messagesReceived.incrementAndGet();
+            System.out.println("received message (" + val + ")");
+            System.out.flush();
+            if (messagesReceived.get() == 100) {
+                latch.countDown();
+            }
+        } catch (JMSException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java
new file mode 100644
index 0000000..601901b
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ3732Test {
+
+    private static Logger LOG = LoggerFactory.getLogger(AMQ3732Test.class);
+
+    private ActiveMQConnectionFactory connectionFactory;
+    private Connection connection;
+    private Session session;
+    private BrokerService broker;
+    private String connectionUri;
+
+    private final Random pause = new Random();
+    private final long NUM_MESSAGES = 25000;
+    private final AtomicLong totalConsumed = new AtomicLong();
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://0.0.0.0:0");
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+
+        connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+        connectionFactory.getPrefetchPolicy().setAll(0);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        connection.close();
+
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test(timeout = 1200000)
+    public void testInterruptionAffects() throws Exception {
+
+        connection = connectionFactory.createConnection();
+        connection.start();
+        session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+        Queue queue = session.createQueue("AMQ3732Test");
+
+        final LinkedBlockingQueue<Message> workQueue = new LinkedBlockingQueue<Message>();
+
+        final MessageConsumer consumer1 = session.createConsumer(queue);
+        final MessageConsumer consumer2 = session.createConsumer(queue);
+        final MessageProducer producer = session.createProducer(queue);
+
+        Thread consumer1Thread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    while (totalConsumed.get() < NUM_MESSAGES) {
+                        Message message = consumer1.receiveNoWait();
+                        if (message != null) {
+                            workQueue.add(message);
+                        }
+                    }
+                } catch(Exception e) {
+                    LOG.error("Caught an unexpected error: ", e);
+                }
+            }
+        });
+        consumer1Thread.start();
+
+        Thread consumer2Thread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    while (totalConsumed.get() < NUM_MESSAGES) {
+                        Message message = consumer2.receive(50);
+                        if (message != null) {
+                            workQueue.add(message);
+                        }
+                    }
+                } catch(Exception e) {
+                    LOG.error("Caught an unexpected error: ", e);
+                }
+            }
+        });
+        consumer2Thread.start();
+
+        Thread producerThread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    for (int i = 0; i < NUM_MESSAGES; ++i) {
+                        producer.send(session.createTextMessage("TEST"));
+                        TimeUnit.MILLISECONDS.sleep(pause.nextInt(10));
+                    }
+                } catch(Exception e) {
+                    LOG.error("Caught an unexpected error: ", e);
+                }
+            }
+        });
+        producerThread.start();
+
+        Thread ackingThread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    while (totalConsumed.get() < NUM_MESSAGES) {
+                        Message message = workQueue.take();
+                        message.acknowledge();
+                        totalConsumed.incrementAndGet();
+                        if ((totalConsumed.get() % 100) == 0) {
+                            LOG.info("Consumed " + totalConsumed.get() + " messages so far.");
+                        }
+                    }
+                } catch(Exception e) {
+                    LOG.error("Caught an unexpected error: ", e);
+                }
+            }
+        });
+        ackingThread.start();
+
+        producerThread.join();
+        consumer1Thread.join();
+        consumer2Thread.join();
+        ackingThread.join();
+
+        assertEquals(NUM_MESSAGES, totalConsumed.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java
new file mode 100644
index 0000000..5a410e8
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.AutoFailTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.util.LoggingBrokerPlugin;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class AMQ3779Test extends AutoFailTestSupport {
+
+    private static final Logger logger = Logger.getLogger(AMQ3779Test.class);
+    private static final String qName = "QNameToFind";
+
+    public void testLogPerDest() throws Exception {
+
+        final AtomicBoolean ok = new AtomicBoolean(false);
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getLoggerName().toString().contains(qName)) {
+                    ok.set(true);
+                }
+            }
+        };
+        logger.getRootLogger().addAppender(appender);
+
+        try {
+
+            BrokerService broker = new BrokerService();
+            LoggingBrokerPlugin loggingBrokerPlugin = new LoggingBrokerPlugin();
+            loggingBrokerPlugin.setPerDestinationLogger(true);
+            loggingBrokerPlugin.setLogAll(true);
+            broker.setPlugins(new LoggingBrokerPlugin[]{loggingBrokerPlugin});
+            broker.start();
+
+
+            Connection connection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer messageProducer = session.createProducer(session.createQueue(qName));
+            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            connection.start();
+
+            messageProducer.send(session.createTextMessage("Hi"));
+            connection.close();
+
+            assertTrue("got expected log message", ok.get());
+        } finally {
+            logger.removeAppender(appender);
+        }
+    }
+}


Mime
View raw message