activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [24/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:36:55 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
new file mode 100644
index 0000000..8a952fd
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerTestSupport;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.usecases.MyObject;
+
+public class AMQ2103Test extends BrokerTestSupport {
+    static PolicyEntry reduceMemoryFootprint = new PolicyEntry();
+    static {
+        reduceMemoryFootprint.setReduceMemoryFootprint(true);
+    }
+
+    public PolicyEntry defaultPolicy = reduceMemoryFootprint;
+
+    @Override
+    protected PolicyEntry getDefaultPolicy() {
+        return defaultPolicy;
+    }
+
+    public void initCombosForTestVerifyMarshalledStateIsCleared() throws Exception {
+        addCombinationValues("defaultPolicy", new Object[]{defaultPolicy, null});    
+    }
+
+    public static Test suite() {
+        return suite(AMQ2103Test.class);
+    }
+
+    /**
+     * use mem persistence so no marshaling,
+     * reduceMemoryFootprint on/off that will reduce memory by whacking the marshaled state
+     * With vm transport and deferred serialisation and no persistence (mem persistence),
+     * we see the message as sent by the client so we can validate the contents against
+     * the policy
+     * @throws Exception
+     */
+    public void testVerifyMarshalledStateIsCleared() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        factory.setOptimizedMessageDispatch(true);
+        factory.setObjectMessageSerializationDefered(true);
+        factory.setCopyMessageOnSend(false);
+
+        Connection connection = factory.createConnection();
+        Session session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQDestination destination = new ActiveMQQueue("testQ");
+		MessageConsumer consumer = session.createConsumer(destination);
+		connection.start();
+
+        MessageProducer producer = session.createProducer(destination);
+        final MyObject obj = new MyObject("A message");
+        ActiveMQObjectMessage m1 = (ActiveMQObjectMessage)session.createObjectMessage();
+        m1.setObject(obj);
+        producer.send(m1);
+
+        ActiveMQTextMessage m2 = new ActiveMQTextMessage();
+        m2.setText("Test Message Payload.");
+        producer.send(m2);
+
+        ActiveMQMapMessage m3 = new ActiveMQMapMessage();
+        m3.setString("text", "my message");
+        producer.send(m3);
+
+        Message m = consumer.receive(maxWait);
+        assertNotNull(m);
+        assertEquals(m1.getMessageId().toString(), m.getJMSMessageID());
+        assertTrue(m instanceof ActiveMQObjectMessage);
+
+        if (getDefaultPolicy() != null) {
+            assertNull("object data cleared by reduceMemoryFootprint (and never marshalled as using mem persistence)",
+                ((ActiveMQObjectMessage)m).getObject());
+        }
+
+        // verify no serialisation via vm transport
+        assertEquals("writeObject called", 0, obj.getWriteObjectCalled());
+        assertEquals("readObject called", 0, obj.getReadObjectCalled());
+        assertEquals("readObjectNoData called", 0, obj.getReadObjectNoDataCalled());
+
+        m = consumer.receive(maxWait);
+        assertNotNull(m);
+        assertEquals(m2.getMessageId().toString(), m.getJMSMessageID());
+        assertTrue(m instanceof ActiveMQTextMessage);
+
+        if (getDefaultPolicy() != null) {
+            assertNull("text cleared by reduceMemoryFootprint (and never marshalled as using mem persistence)",
+                ((ActiveMQTextMessage)m).getText());
+        }
+
+        m = consumer.receive(maxWait);
+        assertNotNull(m);
+        assertEquals(m3.getMessageId().toString(), m.getJMSMessageID());
+        assertTrue(m instanceof ActiveMQMapMessage);
+
+        if (getDefaultPolicy() != null) {
+            assertNull("text cleared by reduceMemoryFootprint (and never marshalled as using mem persistence)",
+                ((ActiveMQMapMessage)m).getStringProperty("text"));
+        }
+
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java
new file mode 100644
index 0000000..1ad8b68
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.leveldb.LevelDBStore;
+
+public class AMQ2149LevelDBTest extends AMQ2149Test {
+
+    @Override
+    protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception {
+        LevelDBStore persistenceFactory = new LevelDBStore();
+        persistenceFactory.setDirectory(dataDirFile);
+        brokerService.setPersistenceAdapter(persistenceFactory);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
new file mode 100644
index 0000000..b2eba61
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.lang.IllegalStateException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.Vector;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.jms.*;
+
+import org.apache.activemq.AutoFailTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.util.LoggingBrokerPlugin;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+interface Configurer {
+    public void configure(BrokerService broker) throws Exception;
+}
+
+public class AMQ2149Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ2149Test.class);
+    @Rule
+    public TestName testName = new TestName();
+
+    private static final String BROKER_CONNECTOR = "tcp://localhost:61617";
+    private static final String DEFAULT_BROKER_URL = "failover:("+ BROKER_CONNECTOR
+        +")?maxReconnectDelay=1000&useExponentialBackOff=false";
+        
+    private final String SEQ_NUM_PROPERTY = "seqNum";
+
+    final int MESSAGE_LENGTH_BYTES = 75 * 1024;
+    final long SLEEP_BETWEEN_SEND_MS = 25;
+    final int NUM_SENDERS_AND_RECEIVERS = 10;
+    final Object brokerLock = new Object();
+    
+    private static final long DEFAULT_BROKER_STOP_PERIOD = 10 * 1000;
+    private static final long DEFAULT_NUM_TO_SEND = 1400;
+    
+    long brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
+    long numtoSend = DEFAULT_NUM_TO_SEND;
+    long sleepBetweenSend = SLEEP_BETWEEN_SEND_MS;
+    String brokerURL = DEFAULT_BROKER_URL;
+    
+    int numBrokerRestarts = 0;
+    final static int MAX_BROKER_RESTARTS = 4;
+    BrokerService broker;
+    Vector<Throwable> exceptions = new Vector<Throwable>();
+
+    protected File dataDirFile;
+    final LoggingBrokerPlugin[] plugins = new LoggingBrokerPlugin[]{new LoggingBrokerPlugin()};
+    
+    
+    public void createBroker(Configurer configurer) throws Exception {
+        broker = new BrokerService();
+        configurePersistenceAdapter(broker);
+        
+        broker.getSystemUsage().getMemoryUsage().setLimit(MESSAGE_LENGTH_BYTES * 200 * NUM_SENDERS_AND_RECEIVERS);
+
+        broker.addConnector(BROKER_CONNECTOR);        
+        broker.setBrokerName(testName.getMethodName());
+        broker.setDataDirectoryFile(dataDirFile);
+        if (configurer != null) {
+            configurer.configure(broker);
+        }
+        broker.start();
+    }
+    
+    protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.debug("Starting test {}", testName.getMethodName());
+        dataDirFile = new File("target/"+ testName.getMethodName());
+        numtoSend = DEFAULT_NUM_TO_SEND;
+        brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
+        sleepBetweenSend = SLEEP_BETWEEN_SEND_MS;
+        brokerURL = DEFAULT_BROKER_URL;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        Future<Boolean> future = executor.submit(new TeardownTask(brokerLock, broker));
+        try {
+            LOG.debug("Teardown started.");
+            long start = System.currentTimeMillis();
+            Boolean result =  future.get(30, TimeUnit.SECONDS);
+            long finish = System.currentTimeMillis();
+            LOG.debug("Result of teardown: {} after {} ms ", result, (finish - start));
+        } catch (TimeoutException e) {
+            fail("Teardown timed out");
+            AutoFailTestSupport.dumpAllThreads(testName.getMethodName());
+        }
+        executor.shutdownNow();
+        exceptions.clear();
+    }
+    
+    private String buildLongString() {
+        final StringBuilder stringBuilder = new StringBuilder(
+                MESSAGE_LENGTH_BYTES);
+        for (int i = 0; i < MESSAGE_LENGTH_BYTES; ++i) {
+            stringBuilder.append((int) (Math.random() * 10));
+        }
+        return stringBuilder.toString();
+    }
+
+    HashSet<Connection> connections = new HashSet<Connection>();
+    private class Receiver implements MessageListener {
+
+        private final javax.jms.Destination dest;
+
+        private final Connection connection;
+
+        private final Session session;
+
+        private final MessageConsumer messageConsumer;
+
+        private volatile long nextExpectedSeqNum = 0;
+                
+        private final boolean transactional;
+
+        private String lastId = null;
+
+        public Receiver(javax.jms.Destination dest, boolean transactional) throws JMSException {
+            this.dest = dest;
+            this.transactional = transactional;
+            connection = new ActiveMQConnectionFactory(brokerURL)
+                    .createConnection();
+            connection.setClientID(dest.toString());
+            session = connection.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+            if (ActiveMQDestination.transform(dest).isTopic()) {
+                messageConsumer = session.createDurableSubscriber((Topic) dest, dest.toString());
+            } else {
+                messageConsumer = session.createConsumer(dest);
+            }
+            messageConsumer.setMessageListener(this);
+            connection.start();
+            connections.add(connection);
+        }
+
+        public void close() throws JMSException {
+            connection.close();
+        }
+        
+        public long getNextExpectedSeqNo() {
+            return nextExpectedSeqNum;
+        }
+        
+        final int TRANSACITON_BATCH = 500;
+        boolean resumeOnNextOrPreviousIsOk = false;
+        public void onMessage(Message message) {
+            try {
+                final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
+                if ((seqNum % TRANSACITON_BATCH) == 0) {
+                    LOG.info(dest + " received " + seqNum);
+                    
+                    if (transactional) {
+                        LOG.info("committing..");
+                        session.commit();
+                    }
+                }
+                if (resumeOnNextOrPreviousIsOk) {
+                    // after an indoubt commit we need to accept what we get (within reason)
+                    if (seqNum != nextExpectedSeqNum) {
+                        if (seqNum == nextExpectedSeqNum - (TRANSACITON_BATCH -1)) {
+                            nextExpectedSeqNum -= (TRANSACITON_BATCH -1);
+                            LOG.info("In doubt commit failed, getting replay at:" +  nextExpectedSeqNum);
+                        }
+                    }
+                    resumeOnNextOrPreviousIsOk = false;
+                }
+                if (seqNum != nextExpectedSeqNum) {
+                    LOG.warn(dest + " received " + seqNum
+                            + " in msg: " + message.getJMSMessageID()
+                            + " expected "
+                            + nextExpectedSeqNum
+                            + ", lastId: " + lastId 
+                            + ", message:" + message);
+                    fail(dest + " received " + seqNum + " expected "
+                            + nextExpectedSeqNum);
+                }
+                ++nextExpectedSeqNum;
+                lastId = message.getJMSMessageID();
+            } catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) {
+                LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery);
+                if (expectedSometimesOnFailoverRecovery.getMessage().contains("completion in doubt")) {
+                    // in doubt - either commit command or reply missing
+                    // don't know if we will get a replay
+                    resumeOnNextOrPreviousIsOk = true;
+                    nextExpectedSeqNum++;
+                    LOG.info("in doubt transaction completion: ok to get next or previous batch. next:" + nextExpectedSeqNum);
+                } else {
+                    resumeOnNextOrPreviousIsOk = false;
+                    // batch will be replayed
+                    nextExpectedSeqNum -= (TRANSACITON_BATCH -1);
+                }
+
+            } catch (Throwable e) {
+                LOG.error(dest + " onMessage error", e);
+                exceptions.add(e);
+            }
+        }
+
+    }
+
+    private class Sender implements Runnable {
+
+        private final javax.jms.Destination dest;
+
+        private final Connection connection;
+
+        private final Session session;
+
+        private final MessageProducer messageProducer;
+
+        private volatile long nextSequenceNumber = 0;
+
+        public Sender(javax.jms.Destination dest) throws JMSException {
+            this.dest = dest;
+            connection = new ActiveMQConnectionFactory(brokerURL)
+                    .createConnection();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            messageProducer = session.createProducer(dest);
+            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            connection.start();
+            connections.add(connection);
+        }
+
+        public void run() {
+            final String longString = buildLongString();
+            while (nextSequenceNumber < numtoSend) {
+                try {
+                    final Message message = session
+                            .createTextMessage(longString);
+                    message.setLongProperty(SEQ_NUM_PROPERTY,
+                            nextSequenceNumber);
+                    ++nextSequenceNumber;
+                    messageProducer.send(message);
+                    
+                    if ((nextSequenceNumber % 500) == 0) {
+                        LOG.info(dest + " sent " + nextSequenceNumber);
+                    }
+
+                } catch (javax.jms.IllegalStateException e) {
+                    LOG.error(dest + " bailing on send error", e);
+                    exceptions.add(e);
+                    break;
+                } catch (Exception e) {
+                    LOG.error(dest + " send error", e);
+                    exceptions.add(e);
+                }
+                if (sleepBetweenSend > 0) {
+                    try {
+                        Thread.sleep(sleepBetweenSend);
+                    } catch (InterruptedException e) {
+                        LOG.warn(dest + " sleep interrupted", e);
+                    }
+                }
+            }
+            try {
+                connection.close();
+            } catch (JMSException ignored) {
+            }
+        }
+    }
+
+    // attempt to simply replicate leveldb failure. no joy yet
+    public void x_testRestartReReceive() throws Exception {
+        createBroker(new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+                broker.deleteAllMessages();
+            }
+        });
+
+        final javax.jms.Destination destination =
+                ActiveMQDestination.createDestination("test.dest.X", ActiveMQDestination.QUEUE_TYPE);
+        Thread thread = new Thread(new Sender(destination));
+        thread.start();
+        thread.join();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection();
+        connection.setClientID(destination.toString());
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer messageConsumer = session.createConsumer(destination);
+        connection.start();
+
+        int batch = 200;
+        long expectedSeq;
+
+        final TimerTask restartTask = schedualRestartTask(null, new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+            }
+        });
+
+        expectedSeq = 0;
+        for (int s = 0; s < 4; s++) {
+            for (int i = 0; i < batch; i++) {
+                Message message = messageConsumer.receive(20000);
+                assertNotNull("s:" + s + ", i:" + i, message);
+                final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
+                assertEquals("expected order s:" + s, expectedSeq++, seqNum);
+
+                if (i > 0 && i%600 == 0) {
+                    LOG.info("Commit on %5");
+                //    session.commit();
+                }
+            }
+            restartTask.run();
+        }
+
+    }
+
+    // no need to run this unless there are some issues with the others
+    public void vanilaVerify_testOrder() throws Exception {
+        
+        createBroker(new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+               broker.deleteAllMessages();            
+            }
+        });
+        
+        verifyOrderedMessageReceipt();
+        verifyStats(false);
+    }
+
+    @Test(timeout = 5 * 60 * 1000)
+    public void testOrderWithRestart() throws Exception {
+        createBroker(new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+                broker.deleteAllMessages();     
+            }
+        });
+        
+        final Timer timer = new Timer();
+        schedualRestartTask(timer, new Configurer() {
+            public void configure(BrokerService broker) throws Exception {    
+            }
+        });
+        
+        try {
+            verifyOrderedMessageReceipt();
+        } finally {
+            timer.cancel();
+        }
+        
+        verifyStats(true);
+    }
+
+    @Test(timeout = 5 * 60 * 1000)
+    public void testTopicOrderWithRestart() throws Exception {
+        createBroker(new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+                broker.deleteAllMessages();
+            }
+        });
+        
+        final Timer timer = new Timer();
+        schedualRestartTask(timer, null);
+        
+        try {
+            verifyOrderedMessageReceipt(ActiveMQDestination.TOPIC_TYPE);
+        } finally {
+            timer.cancel();
+        }
+        
+        verifyStats(true);
+    }
+
+    @Test(timeout = 5 * 60 * 1000)
+    public void testQueueTransactionalOrderWithRestart() throws Exception {
+        doTestTransactionalOrderWithRestart(ActiveMQDestination.QUEUE_TYPE);
+    }
+
+    @Test(timeout = 5 * 60 * 1000)
+    public void testTopicTransactionalOrderWithRestart() throws Exception {
+        doTestTransactionalOrderWithRestart(ActiveMQDestination.TOPIC_TYPE);
+    }
+    
+    public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception {
+        numtoSend = 10000;
+        sleepBetweenSend = 3;
+        brokerStopPeriod = 10 * 1000;
+              
+        createBroker(new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+                broker.deleteAllMessages();
+            }
+        });
+        
+        final Timer timer = new Timer();
+        schedualRestartTask(timer, null);
+        
+        try {
+            verifyOrderedMessageReceipt(destinationType, 1, true);
+        } finally {
+            timer.cancel();
+        }
+        
+        verifyStats(true);
+    }
+
+    private void verifyStats(boolean brokerRestarts) throws Exception {
+        RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
+        
+        for (Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
+            DestinationStatistics stats = dest.getDestinationStatistics();
+            if (brokerRestarts) {
+                // all bets are off w.r.t stats as there may be duplicate sends and duplicate
+                // dispatches, all of which will be suppressed - either by the reference store
+                // not allowing duplicate references or consumers acking duplicates
+                LOG.info("with restart: not asserting qneue/dequeue stat match for: " + dest.getName()
+                        + " " + stats.getEnqueues().getCount() + " <= " +stats.getDequeues().getCount());
+            } else {
+                assertEquals("qneue/dequeue match for: " + dest.getName(),
+                        stats.getEnqueues().getCount(), stats.getDequeues().getCount());   
+            }
+        }
+    }
+
+    private TimerTask schedualRestartTask(final Timer timer, final Configurer configurer) {
+        class RestartTask extends TimerTask {
+            public void run() {
+                synchronized (brokerLock) {
+                    LOG.info("stopping broker..");
+                    try {
+                        broker.stop();
+                        broker.waitUntilStopped();
+                    } catch (Exception e) {
+                        LOG.error("ex on broker stop", e);
+                        exceptions.add(e);
+                    }
+                    LOG.info("restarting broker");
+                    try {
+                        createBroker(configurer);
+                        broker.waitUntilStarted();
+                    } catch (Exception e) {
+                        LOG.error("ex on broker restart", e);
+                        exceptions.add(e);
+                    }
+                }
+                if (++numBrokerRestarts < MAX_BROKER_RESTARTS && timer != null) {
+                    // do it again
+                    try {
+                        timer.schedule(new RestartTask(), brokerStopPeriod);
+                    } catch (IllegalStateException ignore_alreadyCancelled) {
+                    }
+                } else {
+                    LOG.info("no longer stopping broker on reaching Max restarts: " + MAX_BROKER_RESTARTS);
+                }
+            }
+        }
+        RestartTask task = new RestartTask();
+        if (timer != null) {
+            timer.schedule(task, brokerStopPeriod);
+        }
+        return task;
+    }
+    
+    private void verifyOrderedMessageReceipt(byte destinationType) throws Exception {
+        verifyOrderedMessageReceipt(destinationType, NUM_SENDERS_AND_RECEIVERS, false);
+    }
+    
+    private void verifyOrderedMessageReceipt() throws Exception {
+        verifyOrderedMessageReceipt(ActiveMQDestination.QUEUE_TYPE, NUM_SENDERS_AND_RECEIVERS, false);
+    }
+    
+    private void verifyOrderedMessageReceipt(byte destinationType, int concurrentPairs, boolean transactional) throws Exception {
+
+        Vector<Thread> threads = new Vector<Thread>();
+        Vector<Receiver> receivers = new Vector<Receiver>();
+        
+        for (int i = 0; i < concurrentPairs; ++i) {
+            final javax.jms.Destination destination =
+                    ActiveMQDestination.createDestination("test.dest." + i, destinationType);
+            receivers.add(new Receiver(destination, transactional));
+            Thread thread = new Thread(new Sender(destination));
+            thread.start();
+            threads.add(thread);
+        }
+        
+        final long expiry = System.currentTimeMillis() + 1000 * 60 * 4;
+        while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
+            Thread sendThread = threads.firstElement();
+            sendThread.join(1000*30);
+            if (!sendThread.isAlive()) {
+                threads.remove(sendThread);
+            } else {
+                AutoFailTestSupport.dumpAllThreads("Send blocked");
+            }
+        }
+        LOG.info("senders done..." + threads);
+
+        while(!receivers.isEmpty() && System.currentTimeMillis() < expiry) {
+            Receiver receiver = receivers.firstElement();
+            if (receiver.getNextExpectedSeqNo() >= numtoSend || !exceptions.isEmpty()) {
+                receiver.close();
+                receivers.remove(receiver);
+            }
+        }
+
+        for (Connection connection : connections) {
+            try {
+                connection.close();
+            } catch (Exception ignored) {}
+        }
+        connections.clear();
+
+        assertTrue("No timeout waiting for senders/receivers to complete", System.currentTimeMillis() < expiry);
+        if (!exceptions.isEmpty()) {
+            exceptions.get(0).printStackTrace();
+        }
+
+        LOG.info("Dangling threads: " + threads);
+        for (Thread dangling : threads) {
+            dangling.interrupt();
+            dangling.join(10*1000);
+        }
+
+        assertTrue("No exceptions", exceptions.isEmpty());
+    }
+
+}
+
+class TeardownTask implements Callable<Boolean> {
+    private Object brokerLock;
+    private BrokerService broker;
+
+    public TeardownTask(Object brokerLock, BrokerService broker) {
+        this.brokerLock = brokerLock;
+        this.broker = broker;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+        synchronized(brokerLock) {
+            if (broker!= null) {
+                broker.stop();
+                broker.waitUntilStopped();
+            }
+        }
+        return Boolean.TRUE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java
new file mode 100644
index 0000000..f23f758
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.jms.*;
+import javax.jms.Queue;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class AMQ2171Test implements Thread.UncaughtExceptionHandler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ2171Test.class);
+    private static final String BROKER_URL = "tcp://localhost:0";
+    private static final int QUEUE_SIZE = 100;
+
+    private static BrokerService brokerService;
+    private static Queue destination;
+
+    private String brokerUri;
+    private String brokerUriNoPrefetch;
+    private Collection<Throwable> exceptions = new CopyOnWriteArrayList<Throwable>();
+
+    @Before
+    public void setUp() throws Exception {
+        // Start an embedded broker up.
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.addConnector(BROKER_URL);
+        brokerService.start();
+
+        brokerUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString().toString();
+        brokerUriNoPrefetch = brokerUri + "?jms.prefetchPolicy.all=0";
+
+        destination = new ActiveMQQueue("Test");
+        produce(brokerUri, QUEUE_SIZE);
+    }
+
+    @Before
+    public void addHandler() {
+        Thread.setDefaultUncaughtExceptionHandler(this);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+    }
+
+    @Test(timeout = 10000)
+    public void testBrowsePrefetch() throws Exception {
+        runTest(brokerUri);
+    }
+
+    @Test(timeout = 10000)
+    public void testBrowseNoPrefetch() throws Exception {
+        runTest(brokerUriNoPrefetch);
+    }
+
+    private void runTest(String brokerURL) throws Exception {
+
+        Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection();
+
+        try {
+            connection.start();
+
+            Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+            @SuppressWarnings("unchecked")
+            Enumeration<Message> unread = (Enumeration<Message>) session.createBrowser(destination).getEnumeration();
+
+            int count = 0;
+            while (unread.hasMoreElements()) {
+                unread.nextElement();
+                count++;
+            }
+
+            assertEquals(QUEUE_SIZE, count);
+            assertTrue(exceptions.isEmpty());
+        } finally {
+            try {
+                connection.close();
+            } catch (JMSException e) {
+                exceptions.add(e);
+            }
+        }
+    }
+
+    private static void produce(String brokerURL, int count) throws Exception {
+        Connection connection = null;
+
+        try {
+
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
+            connection = factory.createConnection();
+            Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(destination);
+            producer.setTimeToLive(0);
+            connection.start();
+
+            for (int i = 0; i < count; i++) {
+                int id = i + 1;
+                TextMessage message = session.createTextMessage("Message " + id);
+                message.setIntProperty("MsgNumber", id);
+                producer.send(message);
+
+                if (id % 500 == 0) {
+                    LOG.info("sent " + id + ", ith " + message);
+                }
+            }
+        } finally {
+            try {
+                if (connection != null) {
+                    connection.close();
+                }
+            } catch (Throwable e) {
+            }
+        }
+    }
+
+    public void uncaughtException(Thread t, Throwable e) {
+        exceptions.add(e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java
new file mode 100644
index 0000000..0903e56
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.*;
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.TopicSubscriptionViewMBean;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ2200Test {
+
+    private static final String bindAddress = "tcp://0.0.0.0:0";
+    private BrokerService broker;
+    private ActiveMQConnectionFactory cf;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "activemq-data");
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setAdvisorySupport(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.addConnector(bindAddress);
+        String address = broker.getTransportConnectors().get(0).getPublishableConnectString();
+        broker.start();
+        broker.waitUntilStarted();
+
+        cf = new ActiveMQConnectionFactory(address);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void testTopicSubscriptionView() throws Exception {
+    	TopicConnection connection = cf.createTopicConnection();
+    	TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+    	Topic destination = session.createTopic("TopicViewTestTopic");
+    	MessageConsumer consumer = session.createConsumer(destination);
+    	assertNotNull(consumer);
+    	TimeUnit.SECONDS.sleep(1);
+
+    	ObjectName subscriptionNames[] = broker.getAdminView().getTopicSubscribers();
+    	assertTrue(subscriptionNames.length > 0);
+
+    	boolean fail = true;
+    	for(ObjectName name : subscriptionNames) {
+    		if (name.toString().contains("TopicViewTestTopic")) {
+                TopicSubscriptionViewMBean sub = (TopicSubscriptionViewMBean)
+                	broker.getManagementContext().newProxyInstance(name, TopicSubscriptionViewMBean.class, true);
+                assertNotNull(sub);
+                assertTrue(sub.getSessionId() != -1);
+                // Check that its the default value then configure something new.
+                assertTrue(sub.getMaximumPendingQueueSize() == -1);
+                sub.setMaximumPendingQueueSize(1000);
+                assertTrue(sub.getMaximumPendingQueueSize() != -1);
+                fail = false;
+    		}
+    	}
+
+    	if (fail) {
+    		fail("Didn't find the TopicSubscriptionView");
+    	}
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java
new file mode 100644
index 0000000..f267a99
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ2213Test
+{
+    BrokerService broker;
+    ConnectionFactory factory;
+    Connection connection;
+    Session session;
+    Queue queue;
+    MessageConsumer consumer;
+
+    public void createBroker(boolean deleteAll) throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(deleteAll);
+        broker.setDataDirectory("target/AMQ3145Test");
+        broker.setUseJmx(true);
+        broker.getManagementContext().setCreateConnector(false);
+        broker.addConnector("tcp://localhost:0");
+        broker.start();
+        broker.waitUntilStarted();
+        factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString());
+        connection = factory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    @Before
+    public void createBroker() throws Exception {
+        createBroker(true);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (consumer != null) {
+            consumer.close();
+        }
+        session.close();
+        connection.stop();
+        connection.close();
+        broker.stop();
+    }
+
+    @Test
+    public void testEqualsGenericSession() throws JMSException
+    {
+        assertNotNull(this.connection);
+        Session sess = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertTrue(sess.equals(sess));
+    }
+
+    @Test
+    public void testEqualsTopicSession() throws JMSException
+    {
+        assertNotNull(this.connection);
+        assertTrue(this.connection instanceof TopicConnection);
+        TopicSession sess = ((TopicConnection)this.connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertTrue(sess.equals(sess));
+    }
+
+    @Test
+    public void testEqualsQueueSession() throws JMSException
+    {
+        assertNotNull(this.connection);
+        assertTrue(this.connection instanceof QueueConnection);
+        QueueSession sess = ((QueueConnection)this.connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertTrue(sess.equals(sess));
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java
new file mode 100644
index 0000000..369385c
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ2314Test extends CombinationTestSupport {
+
+    public boolean consumeAll = false;
+    public int deliveryMode = DeliveryMode.NON_PERSISTENT;
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ2314Test.class);
+    private static final int MESSAGES_COUNT = 30000;
+    private static byte[]  buf = new byte[1024];
+    private BrokerService broker;
+    private String connectionUri;
+
+    private static final long messageReceiveTimeout = 500L;
+
+    Destination destination = new ActiveMQTopic("FooTwo");
+
+    public void testRemoveSlowSubscriberWhacksTempStore() throws Exception {
+        runProducerWithHungConsumer();
+    }
+
+    public void testMemoryUsageReleasedOnAllConsumed() throws Exception {
+        consumeAll = true;
+        runProducerWithHungConsumer();
+        // do it again to ensure memory limits are decreased
+        runProducerWithHungConsumer();
+    }
+
+    public void runProducerWithHungConsumer() throws Exception {
+
+        final CountDownLatch consumerContinue = new CountDownLatch(1);
+        final CountDownLatch consumerReady = new CountDownLatch(1);
+
+        final long origTempUsage = broker.getSystemUsage().getTempUsage().getUsage();
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        factory.setAlwaysSyncSend(true);
+
+        // ensure messages are spooled to disk for this consumer
+        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
+        prefetch.setTopicPrefetch(500);
+        factory.setPrefetchPolicy(prefetch);
+        final Connection connection = factory.createConnection();
+        connection.start();
+
+        Thread producingThread = new Thread("Producing thread") {
+            public void run() {
+                try {
+                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(destination);
+                    producer.setDeliveryMode(deliveryMode);
+                    for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
+                        Message message = session.createTextMessage(new String(buf) + idx);
+                        producer.send(message);
+                    }
+                    producer.close();
+                    session.close();
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+
+        Thread consumingThread = new Thread("Consuming thread") {
+            public void run() {
+                try {
+                    int count = 0;
+                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageConsumer consumer = session.createConsumer(destination);
+
+                    while (consumer.receive(messageReceiveTimeout) == null) {
+                        consumerReady.countDown();
+                    }
+                    count++;
+                    LOG.info("Received one... waiting");
+                    consumerContinue.await();
+                    if (consumeAll) {
+                        LOG.info("Consuming the rest of the messages...");
+                        while (consumer.receive(messageReceiveTimeout) != null) {
+                            count++;
+                        }
+                    }
+                    LOG.info("consumer session closing: consumed count: " + count);
+                    session.close();
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+        consumingThread.start();
+        consumerReady.await();
+
+        producingThread.start();
+        producingThread.join();
+
+        final long tempUsageBySubscription = broker.getSystemUsage().getTempUsage().getUsage();
+        LOG.info("Orig Usage: " + origTempUsage + ", currentUsage: " + tempUsageBySubscription);
+        assertTrue("some temp store has been used", tempUsageBySubscription != origTempUsage);
+        consumerContinue.countDown();
+        consumingThread.join();
+        connection.close();
+
+        LOG.info("Subscription Usage: " + tempUsageBySubscription + ", endUsage: "
+                + broker.getSystemUsage().getTempUsage().getUsage());
+
+        assertTrue("temp usage decreased with removed sub", Wait.waitFor(new Wait.Condition(){
+            public boolean isSatisified() throws Exception {
+                return broker.getSystemUsage().getTempUsage().getUsage()  < tempUsageBySubscription;
+            }
+        }));
+    }
+
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "activemq-data");
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setAdvisorySupport(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.getSystemUsage().getMemoryUsage().setLimit(1024l*1024*64);
+
+        broker.addConnector("tcp://localhost:0").setName("Default");
+        broker.start();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+    }
+
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    public static Test suite() {
+        return suite(AMQ2314Test.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java
new file mode 100644
index 0000000..283dd92
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+
+/*
+ A AMQ2356Test
+ We have an environment where we have a very large number of destinations.
+ In an effort to reduce the number of threads I have set the options
+ -Dorg.apache.activemq.UseDedicatedTaskRunner=false
+
+ and
+
+ <policyEntry queue=">" optimizedDispatch="true"/>
+
+ Unfortunately this very quickly leads to deadlocked queues.
+
+ My environment is:
+
+ ActiveMQ 5.2 Ubunty Jaunty kernel 2.6.28-14-generic #47-Ubuntu SMP (although only a single core on my system)
+ TCP transportConnector
+
+ To reproduce the bug (which I can do 100% of the time) I connect 5 consumers (AUTO_ACK) to 5 different queues.
+ Then I start 5 producers and pair them up with a consumer on a queue, and they start sending PERSISTENT messages.
+ I've set the producer to send 100 messages and disconnect, and the consumer to receive 100 messages and disconnect.
+ The first pair usually gets through their 100 messages and disconnect, at which point all the other pairs have
+ deadlocked at less than 30 messages each.
+ */
+public class AMQ2356Test extends TestCase {
+    protected static final int MESSAGE_COUNT = 1000;
+    protected static final int NUMBER_OF_PAIRS = 10;
+    protected BrokerService broker;
+    protected String brokerURL = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+    protected int destinationCount;
+
+    public void testScenario() throws Exception {
+        for (int i = 0; i < NUMBER_OF_PAIRS; i++) {
+            ActiveMQQueue queue = new ActiveMQQueue(getClass().getName() + ":" + i);
+            ProducerConsumerPair cp = new ProducerConsumerPair();
+            cp.start(this.brokerURL, queue, MESSAGE_COUNT);
+            cp.testRun();
+            cp.stop();
+        }
+    }
+
+    protected Destination getDestination(Session session) throws JMSException {
+        String destinationName = getClass().getName() + "." + destinationCount++;
+        return session.createQueue(destinationName);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        File dataFileDir = new File("target/test-amq-data/bugs/AMQ2356/kahadb");
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(dataFileDir);
+        answer.setUseJmx(false);
+        // Setup a destination policy where it takes only 1 message at a time.
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setOptimizedDispatch(true);
+        policyMap.setDefaultEntry(policy);
+        answer.setDestinationPolicy(policyMap);
+
+        answer.setAdvisorySupport(false);
+        answer.setEnableStatistics(false);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.addConnector(brokerURL);
+
+    }
+
+    static class ProducerConsumerPair {
+        private Destination destination;
+        private MessageProducer producer;
+        private MessageConsumer consumer;
+        private Connection producerConnection;
+        private Connection consumerConnection;
+        private int numberOfMessages;
+
+        ProducerConsumerPair() {
+
+        }
+
+        void start(String brokerURL, final Destination dest, int msgNum) throws Exception {
+            this.destination = dest;
+            this.numberOfMessages = msgNum;
+            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL);
+            this.producerConnection = cf.createConnection();
+            this.producerConnection.start();
+            this.consumerConnection = cf.createConnection();
+            this.consumerConnection.start();
+            this.producer = createProducer(this.producerConnection);
+            this.consumer = createConsumer(this.consumerConnection);
+        }
+
+        void testRun() throws Exception {
+
+            Session s = this.producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            for (int i = 0; i < this.numberOfMessages; i++) {
+                BytesMessage msg = s.createBytesMessage();
+                msg.writeBytes(new byte[1024]);
+                this.producer.send(msg);
+            }
+            int received = 0;
+            for (int i = 0; i < this.numberOfMessages; i++) {
+                Message msg = this.consumer.receive();
+                assertNotNull(msg);
+                received++;
+            }
+            assertEquals("Messages received on " + this.destination, this.numberOfMessages, received);
+
+        }
+
+        void stop() throws Exception {
+            if (this.producerConnection != null) {
+                this.producerConnection.close();
+            }
+            if (this.consumerConnection != null) {
+                this.consumerConnection.close();
+            }
+        }
+
+        private MessageProducer createProducer(Connection connection) throws Exception {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer result = session.createProducer(this.destination);
+            return result;
+        }
+
+        private MessageConsumer createConsumer(Connection connection) throws Exception {
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer result = session.createConsumer(this.destination);
+            return result;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java
new file mode 100644
index 0000000..15d24d5
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+//package org.apache.activemq.transport.failover;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.state.ConnectionState;
+import org.apache.activemq.state.ConnectionStateTracker;
+import org.apache.activemq.state.TransactionState;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.ResponseCorrelator;
+import org.apache.activemq.transport.failover.FailoverTransport;
+import org.junit.Test;
+
+
+public class AMQ2364Test {
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testRollbackLeak() throws Exception {
+
+        int messageCount = 1000;
+        URI failoverUri = new URI("failover:(vm://localhost)?jms.redeliveryPolicy.maximumRedeliveries=0");
+
+        Destination dest = new ActiveMQQueue("Failover.Leak");
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(failoverUri);
+        ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+        connection.start();
+        final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+        MessageProducer producer = session.createProducer(dest);
+
+        for (int i = 0; i < messageCount; ++i)
+            producer.send(session.createTextMessage("Test message #" + i));
+        producer.close();
+        session.commit();
+
+        MessageConsumer consumer = session.createConsumer(dest);
+
+        final CountDownLatch latch = new CountDownLatch(messageCount);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message msg) {
+                try {
+                    session.rollback();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                } finally {
+                    latch.countDown();
+                }
+            }
+        });
+
+        latch.await();
+        consumer.close();
+        session.close();
+
+        ResponseCorrelator respCorr = (ResponseCorrelator) connection.getTransport();
+        MutexTransport mutexTrans = (MutexTransport) respCorr.getNext();
+        FailoverTransport failoverTrans = (FailoverTransport) mutexTrans.getNext();
+        Field stateTrackerField = FailoverTransport.class.getDeclaredField("stateTracker");
+        stateTrackerField.setAccessible(true);
+        ConnectionStateTracker stateTracker = (ConnectionStateTracker) stateTrackerField.get(failoverTrans);
+        Field statesField = ConnectionStateTracker.class.getDeclaredField("connectionStates");
+        statesField.setAccessible(true);
+        ConcurrentHashMap<ConnectionId, ConnectionState> states =
+                (ConcurrentHashMap<ConnectionId, ConnectionState>) statesField.get(stateTracker);
+
+        ConnectionState state = states.get(connection.getConnectionInfo().getConnectionId());
+
+        Collection<TransactionState> transactionStates = state.getTransactionStates();
+
+        connection.stop();
+        connection.close();
+
+        assertEquals("Transaction states not cleaned up", 0,transactionStates.size());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java
new file mode 100644
index 0000000..49c2366
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+
+import static org.junit.Assert.*;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Test;
+
+public class AMQ2383Test {
+
+    @Test
+    public void activeMQTest() throws Exception {
+        Destination dest = ActiveMQQueue.createDestination("testQueue", ActiveMQQueue.QUEUE_TYPE);
+        ConnectionFactory factory = new ActiveMQConnectionFactory(
+                "vm://localhost?broker.useJmx=false&broker.persistent=false");
+        Connection producerConnection = factory.createConnection();
+        producerConnection.start();
+        Connection consumerConnection = factory.createConnection();
+        consumerConnection.start();
+
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(dest);
+        TextMessage sentMsg = producerSession.createTextMessage("test...");
+        producer.send(sentMsg);
+        producerSession.close();
+
+        Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = consumerSession.createConsumer(dest);
+        TextMessage receivedMsg = (TextMessage)consumer.receive();
+        consumerSession.rollback();
+        consumerSession.close();
+
+        assertEquals(sentMsg, receivedMsg);
+
+        producerConnection.close();
+        consumerConnection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java
new file mode 100644
index 0000000..74f920f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An AMQ-2401 Test
+ */
+public class AMQ2401Test extends TestCase implements MessageListener {
+    private BrokerService broker;
+    private ActiveMQConnectionFactory factory;
+    private static final int SEND_COUNT = 500;
+    private static final int CONSUMER_COUNT = 50;
+    private static final int PRODUCER_COUNT = 1;
+    private static final int LOG_INTERVAL = 10;
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ2401Test.class);
+
+    private final ArrayList<Service> services = new ArrayList<Service>(CONSUMER_COUNT + PRODUCER_COUNT);
+    private int count = 0;
+    private CountDownLatch latch;
+
+    @Override
+    protected void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "AMQ2401Test");
+        broker.setDeleteAllMessagesOnStartup(true);
+        String connectionUri = broker.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
+        PolicyMap policies = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setMemoryLimit(1024 * 100);
+        entry.setProducerFlowControl(true);
+        entry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+        entry.setQueue(">");
+        policies.setDefaultEntry(entry);
+        broker.setDestinationPolicy(policies);
+        broker.setUseJmx(false);
+        broker.start();
+        broker.waitUntilStarted();
+
+        factory = new ActiveMQConnectionFactory(connectionUri);
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    public void testDupsOk() throws Exception {
+
+        TestProducer p = null;
+        TestConsumer c = null;
+        try {
+            latch = new CountDownLatch(SEND_COUNT);
+
+            for (int i = 0; i < CONSUMER_COUNT; i++) {
+                TestConsumer consumer = new TestConsumer();
+                consumer.start();
+                services.add(consumer);
+            }
+            for (int i = 0; i < PRODUCER_COUNT; i++) {
+                TestProducer producer = new TestProducer();
+                producer.start();
+                services.add(producer);
+            }
+
+            waitForMessageReceipt(TimeUnit.SECONDS.toMillis(30));
+        } finally {
+            if (p != null) {
+                p.close();
+            }
+
+            if (c != null) {
+                c.close();
+            }
+        }
+    }
+
+    @Override
+    public void onMessage(Message message) {
+        latch.countDown();
+        if (++count % LOG_INTERVAL == 0) {
+            LOG.debug("Received message " + count);
+        }
+
+        try {
+            Thread.sleep(1);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    private void waitForMessageReceipt(long timeout) throws InterruptedException, TimeoutException {
+        if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
+            throw new TimeoutException(String.format("Consumner didn't receive expected # of messages, %d of %d received.", latch.getCount(), SEND_COUNT));
+        }
+    }
+
+    private interface Service {
+        public void start() throws Exception;
+        public void close();
+    }
+
+    private class TestProducer implements Runnable, Service {
+        Thread thread;
+        BytesMessage message;
+
+        Connection connection;
+        Session session;
+        MessageProducer producer;
+
+        TestProducer() throws Exception {
+            thread = new Thread(this, "TestProducer");
+            connection = factory.createConnection();
+            connection.start();
+            session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+            producer = session.createProducer(session.createQueue("AMQ2401Test"));
+        }
+
+        @Override
+        public void start() {
+            thread.start();
+        }
+
+        @Override
+        public void run() {
+
+            int count = SEND_COUNT / PRODUCER_COUNT;
+            for (int i = 1; i <= count; i++) {
+                try {
+                    if ((i % LOG_INTERVAL) == 0) {
+                        LOG.debug("Sending: " + i);
+                    }
+                    message = session.createBytesMessage();
+                    message.writeBytes(new byte[1024]);
+                    producer.send(message);
+                } catch (JMSException jmse) {
+                    jmse.printStackTrace();
+                    break;
+                }
+            }
+        }
+
+        @Override
+        public void close() {
+            try {
+                connection.close();
+            } catch (JMSException e) {
+            }
+        }
+    }
+
+    private class TestConsumer implements Runnable, Service {
+        ActiveMQConnection connection;
+        Session session;
+        MessageConsumer consumer;
+
+        TestConsumer() throws Exception {
+            factory.setOptimizeAcknowledge(false);
+            connection = (ActiveMQConnection) factory.createConnection();
+
+            session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+            consumer = session.createConsumer(session.createQueue("AMQ2401Test"));
+
+            consumer.setMessageListener(AMQ2401Test.this);
+        }
+
+        @Override
+        public void start() throws Exception {
+            connection.start();
+        }
+
+        @Override
+        public void close() {
+            try {
+                connection.close();
+            } catch (JMSException e) {
+            }
+        }
+
+        @Override
+        public void run() {
+            while (latch.getCount() > 0) {
+                try {
+                    onMessage(consumer.receive());
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+}


Mime
View raw message