activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [11/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:36:42 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
new file mode 100644
index 0000000..4c8527a
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import javax.jms.*;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test to determine if expired messages are being reaped if there is
+ * no active consumer connected to the broker.
+ */
+public class MessageExpirationReaperTest {
+
+    private BrokerService broker;
+    private ConnectionFactory factory;
+    private ActiveMQConnection connection;
+    private final String destinationName = "TEST.Q";
+    private final String brokerUrl = "tcp://localhost:0";
+    private final String brokerName = "testBroker";
+    private String connectionUri;
+
+    @Before
+    public void init() throws Exception {
+        createBroker();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+
+        factory = createConnectionFactory();
+        connection = (ActiveMQConnection) factory.createConnection();
+        connection.setClientID("test-connection");
+        connection.start();
+    }
+
+    @After
+    public void cleanUp() throws Exception {
+        connection.close();
+        broker.stop();
+    }
+
+    protected void createBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setBrokerName(brokerName);
+        broker.addConnector(brokerUrl);
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setExpireMessagesPeriod(500);
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+
+        broker.start();
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    protected Session createSession() throws Exception {
+        return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test
+    public void testExpiredMessageReaping() throws Exception {
+
+        Session producerSession = createSession();
+        ActiveMQDestination destination =  (ActiveMQDestination) producerSession.createQueue(destinationName);
+        MessageProducer producer = producerSession.createProducer(destination);
+        producer.setTimeToLive(1000);
+
+        final int count = 3;
+        // Send some messages with an expiration
+        for (int i = 0; i < count; i++) {
+            TextMessage message = producerSession.createTextMessage("" + i);
+            producer.send(message);
+        }
+
+        // Let the messages expire
+        Thread.sleep(2000);
+
+        DestinationViewMBean view = createView(destination);
+
+        assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 0, view.getInFlightCount());
+        assertEquals("Incorrect queue size count", 0, view.getQueueSize());
+        assertEquals("Incorrect expired size count", view.getEnqueueCount(), view.getExpiredCount());
+
+        // Send more messages with an expiration
+        for (int i = 0; i < count; i++) {
+            TextMessage message = producerSession.createTextMessage("" + i);
+            producer.send(message);
+        }
+
+        // Let the messages expire
+        Thread.sleep(2000);
+
+        // Simply browse the queue
+        Session browserSession = createSession();
+        QueueBrowser browser = browserSession.createBrowser((Queue) destination);
+        assertFalse("no message in the browser", browser.getEnumeration().hasMoreElements());
+
+        // The messages expire and should be reaped because of the presence of
+        // the queue browser
+        assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount());
+    }
+
+    @Test
+    public void testExpiredMessagesOnTopic() throws Exception{
+        Session session = createSession();
+
+        // use a zero prefetch so messages don't go inflight
+        ActiveMQTopic destination = new ActiveMQTopic(destinationName + "?consumer.prefetchSize=0");
+
+        MessageProducer producer = session.createProducer(destination);
+
+        // should have a durable sub because it's a little tricky to get messages to expire in
+        // non-durable subs.. with durable subs, we can just expire in the topic using the expire
+        // period.. also.. durable sub has to be "inactive" for the expire checker to actually
+        // expire the messages
+        MessageConsumer consumer = session.createDurableSubscriber(destination, "test-durable");
+
+        producer.setTimeToLive(500);
+
+        final int count = 3;
+        // Send some messages with an expiration
+        for (int i = 0; i < count; i++) {
+            TextMessage message = session.createTextMessage("" + i);
+            producer.send(message);
+        }
+
+        DestinationViewMBean view = createView(destination);
+        // not expired yet...
+        assertEquals("Incorrect enqueue count", 3, view.getEnqueueCount() );
+
+        // close consumer so topic thinks consumer is inactive
+        consumer.close();
+
+        // Let the messages reach an expiry time
+        Thread.sleep(2000);
+
+        assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 0, view.getInFlightCount());
+        assertEquals("Incorrect queue size count", 0, view.getQueueSize());
+        assertEquals("Incorrect expired size count", view.getEnqueueCount(), view.getExpiredCount());
+    }
+
+    protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
+        String domain = "org.apache.activemq";
+        ObjectName name;
+        if (destination.isQueue()) {
+            name = new ObjectName(domain + ":type=Broker,brokerName=" + brokerName + ",destinationType=Queue,destinationName=" + destinationName);
+        } else {
+            name = new ObjectName(domain + ":type=Broker,brokerName=" + brokerName + ",destinationType=Topic,destinationName=" + destinationName);
+        }
+        return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class,
+                true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java
new file mode 100644
index 0000000..f85bdba
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+public class MessageSender {
+    private MessageProducer producer;
+    private Session session;
+
+    public MessageSender(String queueName, Connection connection, boolean useTransactedSession, boolean topic) throws Exception {
+        session = useTransactedSession ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(topic ? session.createTopic(queueName) : session.createQueue(queueName));
+    }
+
+    public void send(String payload) throws Exception {
+        ObjectMessage message = session.createObjectMessage();
+        message.setObject(payload);
+        producer.send(message);
+        if (session.getTransacted()) {
+            session.commit();
+        }
+    }
+    
+    public MessageProducer getProducer() {
+        return producer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
new file mode 100644
index 0000000..68055bb
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.usage.SystemUsage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Try and replicate:
+ * Caused by: java.io.IOException: Could not locate data file data--188
+ *  at org.apache.activemq.kaha.impl.async.AsyncDataManager.getDataFile(AsyncDataManager.java:302)
+ *  at org.apache.activemq.kaha.impl.async.AsyncDataManager.read(AsyncDataManager.java:614)
+ *  at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:523)
+ */
+
+public class MissingDataFileTest extends TestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MissingDataFileTest.class);
+
+    private static int counter = 500;
+
+    private static int hectorToHaloCtr;
+    private static int xenaToHaloCtr;
+    private static int troyToHaloCtr;
+
+    private static int haloToHectorCtr;
+    private static int haloToXenaCtr;
+    private static int haloToTroyCtr;
+
+    private final String hectorToHalo = "hectorToHalo";
+    private final String xenaToHalo = "xenaToHalo";
+    private final String troyToHalo = "troyToHalo";
+
+    private final String haloToHector = "haloToHector";
+    private final String haloToXena = "haloToXena";
+    private final String haloToTroy = "haloToTroy";
+
+
+    private BrokerService broker;
+
+    private Connection hectorConnection;
+    private Connection xenaConnection;
+    private Connection troyConnection;
+    private Connection haloConnection;
+
+    private final Object lock = new Object();
+    final boolean useTopic = false;
+    final boolean useSleep = true;
+
+    protected static final String payload = new String(new byte[500]);
+
+    public Connection createConnection() throws JMSException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+        return factory.createConnection();
+    }
+
+    public Session createSession(Connection connection, boolean transacted) throws JMSException {
+        return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.addConnector("tcp://localhost:61616").setName("Default");
+
+        SystemUsage systemUsage;
+        systemUsage = new SystemUsage();
+        systemUsage.getMemoryUsage().setLimit(10 * 1024 * 1024); // Just a few messags
+        broker.setSystemUsage(systemUsage);
+
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
+        kahaDBPersistenceAdapter.setJournalMaxFileLength(16*1024);
+        kahaDBPersistenceAdapter.setCleanupInterval(500);
+        broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
+
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        hectorConnection.close();
+        xenaConnection.close();
+        troyConnection.close();
+        haloConnection.close();
+        broker.stop();
+    }
+
+    public void testForNoDataFoundError() throws Exception {
+
+        startBroker();
+        hectorConnection = createConnection();
+        Thread hectorThread = buildProducer(hectorConnection, hectorToHalo, false, useTopic);
+        Receiver hHectorReceiver = new Receiver() {
+            @Override
+            public void receive(String s) throws Exception {
+                haloToHectorCtr++;
+                if (haloToHectorCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+                possiblySleep(haloToHectorCtr);
+            }
+        };
+        buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver, useTopic);
+
+        troyConnection = createConnection();
+        Thread troyThread = buildProducer(troyConnection, troyToHalo);
+        Receiver hTroyReceiver = new Receiver() {
+            @Override
+            public void receive(String s) throws Exception {
+                haloToTroyCtr++;
+                if (haloToTroyCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+                possiblySleep(haloToTroyCtr);
+            }
+        };
+        buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver, false);
+
+        xenaConnection = createConnection();
+        Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
+        Receiver hXenaReceiver = new Receiver() {
+            @Override
+            public void receive(String s) throws Exception {
+                haloToXenaCtr++;
+                if (haloToXenaCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+                possiblySleep(haloToXenaCtr);
+            }
+        };
+        buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver, false);
+
+        haloConnection = createConnection();
+        final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection, false);
+        final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection, false);
+        final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection, false);
+        Receiver hectorReceiver = new Receiver() {
+            @Override
+            public void receive(String s) throws Exception {
+                hectorToHaloCtr++;
+                troySender.send(payload);
+                if (hectorToHaloCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                    possiblySleep(hectorToHaloCtr);
+                }
+            }
+        };
+        Receiver xenaReceiver = new Receiver() {
+            @Override
+            public void receive(String s) throws Exception {
+                xenaToHaloCtr++;
+                hectorSender.send(payload);
+                if (xenaToHaloCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+                possiblySleep(xenaToHaloCtr);
+            }
+        };
+        Receiver troyReceiver = new Receiver() {
+            @Override
+            public void receive(String s) throws Exception {
+                troyToHaloCtr++;
+                xenaSender.send(payload);
+                if (troyToHaloCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+            }
+        };
+        buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver, false);
+        buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver, false);
+        buildReceiver(haloConnection, troyToHalo, true, troyReceiver, false);
+
+        haloConnection.start();
+
+        troyConnection.start();
+        troyThread.start();
+
+        xenaConnection.start();
+        xenaThread.start();
+
+        hectorConnection.start();
+        hectorThread.start();
+        waitForMessagesToBeDelivered();
+        // number of messages received should match messages sent
+        assertEquals(hectorToHaloCtr, counter);
+        LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages");
+        assertEquals(xenaToHaloCtr, counter);
+        LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages");
+        assertEquals(troyToHaloCtr, counter);
+        LOG.info("troyToHalo received " + troyToHaloCtr + " messages");
+        assertEquals(haloToHectorCtr, counter);
+        LOG.info("haloToHector received " + haloToHectorCtr + " messages");
+        assertEquals(haloToXenaCtr, counter);
+        LOG.info("haloToXena received " + haloToXenaCtr + " messages");
+        assertEquals(haloToTroyCtr, counter);
+        LOG.info("haloToTroy received " + haloToTroyCtr + " messages");
+
+    }
+
+    protected void possiblySleep(int count) throws InterruptedException {
+        if (useSleep) {
+            if (count % 100 == 0) {
+                Thread.sleep(5000);
+            }
+        }
+
+    }
+
+    protected void waitForMessagesToBeDelivered() {
+        // let's give the listeners enough time to read all messages
+        long maxWaitTime = counter * 1000;
+        long waitTime = maxWaitTime;
+        long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
+
+        synchronized (lock) {
+            boolean hasMessages = true;
+            while (hasMessages && waitTime >= 0) {
+                try {
+                    lock.wait(200);
+                } catch (InterruptedException e) {
+                    LOG.error(e.toString());
+                }
+                // check if all messages have been received
+                hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter
+                              || haloToTroyCtr < counter;
+                waitTime = maxWaitTime - (System.currentTimeMillis() - start);
+            }
+        }
+    }
+
+    public MessageSender buildTransactionalProducer(String queueName, Connection connection, boolean isTopic) throws Exception {
+
+        return new MessageSender(queueName, connection, true, isTopic);
+    }
+
+    public Thread buildProducer(Connection connection, final String queueName) throws Exception {
+        return buildProducer(connection, queueName, false, false);
+    }
+
+    public Thread buildProducer(Connection connection, final String queueName, boolean transacted, boolean isTopic) throws Exception {
+        final MessageSender producer = new MessageSender(queueName, connection, transacted, isTopic);
+        Thread thread = new Thread() {
+            @Override
+            public synchronized void run() {
+                for (int i = 0; i < counter; i++) {
+                    try {
+                        producer.send(payload );
+                    } catch (Exception e) {
+                        throw new RuntimeException("on " + queueName + " send", e);
+                    }
+                }
+            }
+        };
+        return thread;
+    }
+
+    public void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver, boolean isTopic) throws Exception {
+        final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer inputMessageConsumer = session.createConsumer(isTopic ? session.createTopic(queueName) : session.createQueue(queueName));
+        MessageListener messageListener = new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    ObjectMessage objectMessage = (ObjectMessage)message;
+                    String s = (String)objectMessage.getObject();
+                    receiver.receive(s);
+                    if (session.getTransacted()) {
+                        session.commit();
+                    }
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        inputMessageConsumer.setMessageListener(messageListener);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java
new file mode 100644
index 0000000..195ccbd
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for AMQ-3965.
+ * A consumer may be stalled in case it uses optimizeAcknowledge and receives
+ * a number of messages that expire before being dispatched to application code.
+ * See for more details.
+ *
+ */
+public class OptimizeAcknowledgeWithExpiredMsgsTest {
+
+    private final static Logger LOG = LoggerFactory.getLogger(OptimizeAcknowledgeWithExpiredMsgsTest.class);
+
+    private BrokerService broker = null;
+
+    private String connectionUri;
+
+    /**
+     * Creates a broker instance but does not start it.
+     *
+     * @param brokerUri - transport uri of broker
+     * @param brokerName - name for the broker
+     * @return a BrokerService instance with transport uri and broker name set
+     * @throws Exception
+     */
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setUseJmx(false);
+        connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+        return broker;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        }
+    }
+
+    /**
+     * Tests for AMQ-3965
+     * Creates connection into broker using optimzeAcknowledge and prefetch=100
+     * Creates producer and consumer. Producer sends 45 msgs that will expire
+     * at consumer (but before being dispatched to app code).
+     * Producer then sends 60 msgs without expiry.
+     *
+     * Consumer receives msgs using a MessageListener and increments a counter.
+     * Main thread sleeps for 5 seconds and checks the counter value.
+     * If counter != 60 msgs (the number of msgs that should get dispatched
+     * to consumer) the test fails.
+     */
+    @Test
+    public void testOptimizedAckWithExpiredMsgs() throws Exception
+    {
+        ActiveMQConnectionFactory connectionFactory =
+                new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
+
+        // Create JMS resources
+        Connection connection = connectionFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue("TEST.FOO");
+
+        // ***** Consumer code *****
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final MyMessageListener listener = new MyMessageListener();
+        connection.setExceptionListener((ExceptionListener) listener);
+
+        // ***** Producer Code *****
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
+        TextMessage message;
+
+        // Produce msgs that will expire quickly
+        for (int i=0; i<45; i++) {
+            message = session.createTextMessage(text);
+            producer.send(message,1,1,100);
+            LOG.trace("Sent message: "+ message.getJMSMessageID() +
+                " with expiry 10 msec");
+        }
+        // Produce msgs that don't expire
+        for (int i=0; i<60; i++) {
+            message = session.createTextMessage(text);
+            producer.send(message,1,1,60000);
+            // producer.send(message);
+            LOG.trace("Sent message: "+ message.getJMSMessageID() +
+                " with expiry 30 sec");
+        }
+        consumer.setMessageListener(listener);
+
+        sleep(1000);  // let the batch of 45 expire.
+
+        connection.start();
+
+        assertTrue("Should receive all expected messages, counter at " + listener.getCounter(), Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.getCounter() == 60;
+            }
+        }));
+
+        LOG.info("Received all expected messages with counter at: " + listener.getCounter());
+
+        // Cleanup
+        producer.close();
+        consumer.close();
+        session.close();
+        connection.close();
+    }
+
+    @Test
+    public void testOptimizedAckWithExpiredMsgsSync() throws Exception
+    {
+        ActiveMQConnectionFactory connectionFactory =
+                new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
+
+        // Create JMS resources
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue("TEST.FOO");
+
+        // ***** Consumer code *****
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // ***** Producer Code *****
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
+        TextMessage message;
+
+        // Produce msgs that will expire quickly
+        for (int i=0; i<45; i++) {
+            message = session.createTextMessage(text);
+            producer.send(message,1,1,10);
+            LOG.trace("Sent message: "+ message.getJMSMessageID() +
+                " with expiry 10 msec");
+        }
+        // Produce msgs that don't expire
+        for (int i=0; i<60; i++) {
+            message = session.createTextMessage(text);
+            producer.send(message,1,1,30000);
+            // producer.send(message);
+            LOG.trace("Sent message: "+ message.getJMSMessageID() +
+                " with expiry 30 sec");
+        }
+        sleep(200);
+
+        int counter = 1;
+        for (; counter <= 60; ++counter) {
+            assertNotNull(consumer.receive(2000));
+            LOG.info("counter at " + counter);
+        }
+        LOG.info("Received all expected messages with counter at: " + counter);
+
+        // Cleanup
+        producer.close();
+        consumer.close();
+        session.close();
+        connection.close();
+    }
+
+    @Test
+    public void testOptimizedAckWithExpiredMsgsSync2() throws Exception
+    {
+        ActiveMQConnectionFactory connectionFactory =
+                new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
+
+        // Create JMS resources
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue("TEST.FOO");
+
+        // ***** Consumer code *****
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // ***** Producer Code *****
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
+        TextMessage message;
+
+        // Produce msgs that don't expire
+        for (int i=0; i<56; i++) {
+            message = session.createTextMessage(text);
+            producer.send(message,1,1,30000);
+            // producer.send(message);
+            LOG.trace("Sent message: "+ message.getJMSMessageID() +
+                " with expiry 30 sec");
+        }
+        // Produce msgs that will expire quickly
+        for (int i=0; i<44; i++) {
+            message = session.createTextMessage(text);
+            producer.send(message,1,1,10);
+            LOG.trace("Sent message: "+ message.getJMSMessageID() +
+                " with expiry 10 msec");
+        }
+        // Produce some moremsgs that don't expire
+        for (int i=0; i<4; i++) {
+            message = session.createTextMessage(text);
+            producer.send(message,1,1,30000);
+            // producer.send(message);
+            LOG.trace("Sent message: "+ message.getJMSMessageID() +
+                " with expiry 30 sec");
+        }
+
+        sleep(200);
+
+        int counter = 1;
+        for (; counter <= 60; ++counter) {
+            assertNotNull(consumer.receive(2000));
+            LOG.info("counter at " + counter);
+        }
+        LOG.info("Received all expected messages with counter at: " + counter);
+
+        // Cleanup
+        producer.close();
+        consumer.close();
+        session.close();
+        connection.close();
+    }
+
+    private void sleep(int milliSecondTime) {
+        try {
+            Thread.sleep(milliSecondTime);
+        } catch (InterruptedException igonred) {
+        }
+    }
+
+    /**
+     * Standard JMS MessageListener
+     */
+    private class MyMessageListener implements MessageListener, ExceptionListener {
+
+        private AtomicInteger counter = new AtomicInteger(0);
+
+        public void onMessage(final Message message) {
+            try {
+                LOG.trace("Got Message " + message.getJMSMessageID());
+                LOG.info("counter at " + counter.incrementAndGet());
+            } catch (final Exception e) {
+            }
+        }
+
+        public int getCounter() {
+            return counter.get();
+        }
+
+        public synchronized void onException(JMSException ex) {
+            LOG.error("JMS Exception occured.  Shutting down client.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
new file mode 100644
index 0000000..34e3866
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OutOfOrderTestCase extends TestCase {
+
+    private static final Logger log = LoggerFactory.getLogger(OutOfOrderTestCase.class);
+
+    private static final String BROKER_URL = "tcp://localhost:0";
+    private static final int PREFETCH = 10;
+    private static final String CONNECTION_URL_OPTIONS = "?jms.prefetchPolicy.all=" + PREFETCH;
+
+    private static final String DESTINATION = "QUEUE?consumer.exclusive=true";
+
+    private BrokerService brokerService;
+    private Session session;
+    private Connection connection;
+    private String connectionUri;
+
+    private int seq = 0;
+
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setUseJmx(true);
+        brokerService.addConnector(BROKER_URL);
+        brokerService.deleteAllMessages();
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri + CONNECTION_URL_OPTIONS);
+        connection = connectionFactory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    protected void tearDown() throws Exception {
+        session.close();
+        connection.close();
+        brokerService.stop();
+    }
+
+    public void testOrder() throws Exception {
+
+        log.info("Producing messages 0-29 . . .");
+        Destination destination = session.createQueue(DESTINATION);
+        final MessageProducer messageProducer = session
+                .createProducer(destination);
+        try {
+            for (int i = 0; i < 30; ++i) {
+                final Message message = session
+                        .createTextMessage(createMessageText(i));
+                message.setStringProperty("JMSXGroupID", "FOO");
+
+                messageProducer.send(message);
+                log.info("sent " + toString(message));
+            }
+        } finally {
+            messageProducer.close();
+        }
+
+        log.info("Consuming messages 0-9 . . .");
+        consumeBatch();
+
+        log.info("Consuming messages 10-19 . . .");
+        consumeBatch();
+
+        log.info("Consuming messages 20-29 . . .");
+        consumeBatch();
+    }
+
+    protected void consumeBatch() throws Exception {
+        Destination destination = session.createQueue(DESTINATION);
+        final MessageConsumer messageConsumer = session.createConsumer(destination);
+        try {
+            for (int i = 0; i < 10; ++i) {
+                final Message message = messageConsumer.receive(1000L);
+                log.info("received " + toString(message));
+                assertEquals("Message out of order", createMessageText(seq++), ((TextMessage) message).getText());
+                message.acknowledge();
+            }
+        } finally {
+            messageConsumer.close();
+        }
+    }
+
+    private String toString(final Message message) throws JMSException {
+        String ret = "received message '" + ((TextMessage) message).getText() + "' - " + message.getJMSMessageID();
+        if (message.getJMSRedelivered())
+             ret += " (redelivered)";
+        return ret;
+
+    }
+
+    private static String createMessageText(final int index) {
+        return "message #" + index;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
new file mode 100644
index 0000000..80adaed
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test case demonstrating situation where messages are not delivered to
+ * consumers.
+ */
+public class QueueWorkerPrefetchTest extends TestCase implements
+        MessageListener {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(QueueWorkerPrefetchTest.class);
+    private static final int BATCH_SIZE = 10;
+    private static final long WAIT_TIMEOUT = 1000 * 10;
+
+    /** The connection URL. */
+    private static final String BROKER_BIND_ADDRESS = "tcp://localhost:0";
+
+    /**
+     * The queue prefetch size to use. A value greater than 1 seems to make
+     * things work.
+     */
+    private static final int QUEUE_PREFETCH_SIZE = 1;
+
+    /**
+     * The number of workers to use. A single worker with a prefetch of 1 works.
+     */
+    private static final int NUM_WORKERS = 2;
+
+    /** Embedded JMS broker. */
+    private BrokerService broker;
+
+    /** The master's producer object for creating work items. */
+    private MessageProducer workItemProducer;
+
+    /** The master's consumer object for consuming ack messages from workers. */
+    private MessageConsumer masterItemConsumer;
+
+    /** The number of acks received by the master. */
+    private final AtomicLong acksReceived = new AtomicLong(0);
+
+    private final AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>();
+
+    private String connectionUri;
+
+    /** Messages sent to the work-item queue. */
+    private static class WorkMessage implements Serializable {
+        private static final long serialVersionUID = 1L;
+        private final int id;
+
+        public WorkMessage(int id) {
+            this.id = id;
+        }
+
+        @Override
+        public String toString() {
+            return "Work: " + id;
+        }
+    }
+
+    /**
+     * The worker process. Consume messages from the work-item queue, possibly
+     * creating more messages to submit to the work-item queue. For each work
+     * item, send an ack to the master.
+     */
+    private static class Worker implements MessageListener {
+        /**
+         * Counter shared between workers to decided when new work-item messages
+         * are created.
+         */
+        private static AtomicInteger counter = new AtomicInteger(0);
+
+        /** Session to use. */
+        private Session session;
+
+        /** Producer for sending ack messages to the master. */
+        private MessageProducer masterItemProducer;
+
+        /** Producer for sending new work items to the work-items queue. */
+        private MessageProducer workItemProducer;
+
+        public Worker(Session session) throws JMSException {
+            this.session = session;
+            masterItemProducer = session.createProducer(session
+                    .createQueue("master-item"));
+            Queue workItemQueue = session.createQueue("work-item");
+            workItemProducer = session.createProducer(workItemQueue);
+            MessageConsumer workItemConsumer = session
+                    .createConsumer(workItemQueue);
+            workItemConsumer.setMessageListener(this);
+        }
+
+        public void onMessage(javax.jms.Message message) {
+            try {
+                WorkMessage work = (WorkMessage) ((ObjectMessage) message)
+                        .getObject();
+
+                long c = counter.incrementAndGet();
+
+                // Don't create a new work item for every BATCH_SIZE message. */
+                if (c % BATCH_SIZE != 0) {
+                    // Send new work item to work-item queue.
+                    workItemProducer.send(session
+                            .createObjectMessage(new WorkMessage(work.id + 1)));
+                }
+
+                // Send ack to master.
+                masterItemProducer.send(session.createObjectMessage(work));
+            } catch (JMSException e) {
+                throw new IllegalStateException("Something has gone wrong", e);
+            }
+        }
+
+        /** Close of JMS resources used by worker. */
+        public void close() throws JMSException {
+            masterItemProducer.close();
+            workItemProducer.close();
+            session.close();
+        }
+    }
+
+    /** Master message handler. Process ack messages. */
+    public void onMessage(javax.jms.Message message) {
+        long acks = acksReceived.incrementAndGet();
+        latch.get().countDown();
+        if (acks % 1 == 0) {
+            LOG.info("Master now has ack count of: " + acksReceived);
+        }
+    }
+
+    protected void setUp() throws Exception {
+        // Create the message broker.
+        super.setUp();
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(true);
+        broker.addConnector(BROKER_BIND_ADDRESS);
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+    }
+
+    protected void tearDown() throws Exception {
+        // Shut down the message broker.
+        broker.deleteAllMessages();
+        broker.stop();
+        super.tearDown();
+    }
+
+    public void testActiveMQ() throws Exception {
+        // Create the connection to the broker.
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setQueuePrefetch(QUEUE_PREFETCH_SIZE);
+        connectionFactory.setPrefetchPolicy(prefetchPolicy);
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        Session masterSession = connection.createSession(false,
+                Session.AUTO_ACKNOWLEDGE);
+        workItemProducer = masterSession.createProducer(masterSession
+                .createQueue("work-item"));
+        masterItemConsumer = masterSession.createConsumer(masterSession
+                .createQueue("master-item"));
+        masterItemConsumer.setMessageListener(this);
+
+        // Create the workers.
+        Worker[] workers = new Worker[NUM_WORKERS];
+        for (int i = 0; i < NUM_WORKERS; i++) {
+            workers[i] = new Worker(connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE));
+        }
+
+        // Send a message to the work queue, and wait for the BATCH_SIZE acks
+        // from the workers.
+        acksReceived.set(0);
+        latch.set(new CountDownLatch(BATCH_SIZE));
+        workItemProducer.send(masterSession
+                .createObjectMessage(new WorkMessage(1)));
+
+        if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
+            fail("First batch only received " + acksReceived + " messages");
+        }
+
+        LOG.info("First batch received");
+
+        // Send another message to the work queue, and wait for the next 1000 acks. It is
+        // at this point where the workers never get notified of this message, as they
+        // have a large pending queue. Creating a new worker at this point however will
+        // receive this new message.
+        acksReceived.set(0);
+        latch.set(new CountDownLatch(BATCH_SIZE));
+        workItemProducer.send(masterSession
+                .createObjectMessage(new WorkMessage(1)));
+
+        if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
+            fail("Second batch only received " + acksReceived + " messages");
+        }
+
+        LOG.info("Second batch received");
+
+        // Cleanup all JMS resources.
+        for (int i = 0; i < NUM_WORKERS; i++) {
+            workers[i].close();
+        }
+        masterSession.close();
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java
new file mode 100644
index 0000000..4790e42
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class RawRollbackSharedConsumerTests {
+	
+	private static ConnectionFactory connectionFactory;
+	private static Destination queue;
+	private static BrokerService broker;
+
+	@BeforeClass
+	public static void clean() throws Exception {	
+		broker = new BrokerService();
+		broker.setDeleteAllMessagesOnStartup(true);
+		broker.setUseJmx(true);
+		broker.start();
+		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
+		connectionFactory.setBrokerURL("vm://localhost?async=false");
+		RawRollbackSharedConsumerTests.connectionFactory = connectionFactory;
+		queue = new ActiveMQQueue("queue");
+	}
+
+	@AfterClass
+	public static void close() throws Exception {
+		broker.stop();
+	}
+
+	@Before
+	public void clearData() throws Exception {
+		getMessages(false); // drain queue
+		convertAndSend("foo");
+		convertAndSend("bar");
+	}
+
+
+	@After
+	public void checkPostConditions() throws Exception {
+
+		Thread.sleep(1000L);
+		List<String> list = getMessages(false);
+		assertEquals(2, list.size());
+
+	}
+
+	@Test
+	public void testReceiveMessages() throws Exception {
+
+		List<String> list = getMessages(true);
+		assertEquals(2, list.size());
+		assertTrue(list.contains("foo"));
+
+	}
+	
+	private void convertAndSend(String msg) throws Exception {
+		Connection connection = connectionFactory.createConnection();
+		connection.start();
+		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+		MessageProducer producer = session.createProducer(queue);
+		producer.send(session.createTextMessage(msg));
+		producer.close();
+		session.commit();
+		session.close();
+		connection.close();
+	}
+
+	private List<String> getMessages(boolean rollback) throws Exception {
+		Connection connection = connectionFactory.createConnection();
+		connection.start();
+		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+		String next = "";
+		List<String> msgs = new ArrayList<String>();
+		MessageConsumer consumer = session.createConsumer(queue);
+		while (next != null) {
+			next = (String) receiveAndConvert(consumer);
+			if (next != null)
+				msgs.add(next);
+		}
+		consumer.close();
+		if (rollback) {
+			session.rollback();
+		} else {
+			session.commit();
+		}
+		session.close();
+		connection.close();
+		return msgs;
+	}
+
+	private String receiveAndConvert(MessageConsumer consumer) throws Exception {
+		Message message = consumer.receive(100L);
+		if (message==null) {
+			return null;
+		}
+		return ((TextMessage)message).getText();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java
new file mode 100644
index 0000000..93abb28
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class RawRollbackTests {
+	
+	private static ConnectionFactory connectionFactory;
+	private static Destination queue;
+	private static BrokerService broker;
+
+	@BeforeClass
+	public static void clean() throws Exception {
+		broker = new BrokerService();
+		broker.setDeleteAllMessagesOnStartup(true);
+		broker.setUseJmx(true);
+		broker.start();
+		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
+		connectionFactory.setBrokerURL("vm://localhost?async=false&waitForStart=5000&jms.prefetchPolicy.all=0");
+		RawRollbackTests.connectionFactory = connectionFactory;
+		queue = new ActiveMQQueue("queue");
+	}
+
+	@AfterClass
+	public static void close() throws Exception {
+		broker.stop();
+	}
+
+	@Before
+	public void clearData() throws Exception {
+		getMessages(false); // drain queue
+		convertAndSend("foo");
+		convertAndSend("bar");
+	}
+
+
+	@After
+	public void checkPostConditions() throws Exception {
+
+		Thread.sleep(1000L);
+		List<String> list = getMessages(false);
+		assertEquals(2, list.size());
+
+	}
+
+	@Test
+	public void testReceiveMessages() throws Exception {
+
+		List<String> list = getMessages(true);
+		assertEquals(2, list.size());
+		assertTrue(list.contains("foo"));
+
+	}
+	
+	private void convertAndSend(String msg) throws Exception {
+		Connection connection = connectionFactory.createConnection();
+		connection.start();
+		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+		MessageProducer producer = session.createProducer(queue);
+		producer.send(session.createTextMessage(msg));
+		producer.close();
+		session.commit();
+		session.close();
+		connection.close();
+	}
+
+	private List<String> getMessages(boolean rollback) throws Exception {
+		Connection connection = connectionFactory.createConnection();
+		connection.start();
+		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+		String next = "";
+		List<String> msgs = new ArrayList<String>();
+		while (next != null) {
+			next = (String) receiveAndConvert(session);
+			if (next != null)
+				msgs.add(next);
+		}
+		if (rollback) {
+			session.rollback();
+		} else {
+			session.commit();
+		}
+		session.close();
+		connection.close();
+		return msgs;
+	}
+
+	private String receiveAndConvert(Session session) throws Exception {
+		MessageConsumer consumer = session.createConsumer(queue);
+		Message message = consumer.receive(100L);
+		consumer.close();
+		if (message==null) {
+			return null;
+		}
+		return ((TextMessage)message).getText();
+	}
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java
new file mode 100644
index 0000000..65f30e3
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+public interface Receiver {
+    void receive(String s) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java
new file mode 100644
index 0000000..414b70d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
+import org.apache.activemq.broker.util.RedeliveryPlugin;
+import org.apache.activemq.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Testing if the the broker "sends" the message as expected after the redeliveryPlugin has redelivered the
+ * message previously.
+ */
+
+public class RedeliveryPluginHeaderTest extends TestCase {
+
+    private static final String TEST_QUEUE_ONE = "TEST_QUEUE_ONE";
+    private static final String TEST_QUEUE_TWO = "TEST_QUEUE_TWO";
+    private static final Logger LOG = LoggerFactory
+            .getLogger(RedeliveryPluginHeaderTest.class);
+    private String transportURL;
+    private BrokerService broker;
+
+    /**
+     * Test
+     * - consumes message from Queue1
+     * - rolls back message to Queue1 and message is scheduled for redelivery to Queue1 by brokers plugin
+     * - consumes message from Queue1 again
+     * - sends same message to Queue2
+     * - expects to consume message from Queue2 immediately
+     */
+
+    public void testSendAfterRedelivery() throws Exception {
+        broker = this.createBroker(false);
+        broker.start();
+        broker.waitUntilStarted();
+
+        LOG.info("***Broker started...");
+
+        //pushed message to broker
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                transportURL + "?trace=true&jms.redeliveryPolicy.maximumRedeliveries=0");
+
+        Connection connection = factory.createConnection();
+        connection.start();
+
+        try {
+
+            Session session = connection.createSession(true,
+                    Session.SESSION_TRANSACTED);
+
+            Destination destinationQ1 = session.createQueue(TEST_QUEUE_ONE);
+            Destination destinationQ2 = session.createQueue(TEST_QUEUE_TWO);
+
+            MessageProducer producerQ1 = session.createProducer(destinationQ1);
+            producerQ1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+            Message m = session.createTextMessage("testMessage");
+            LOG.info("*** send message to broker...");
+            producerQ1.send(m);
+            session.commit();
+
+            //consume message from Q1 and rollback to get it redelivered
+            MessageConsumer consumerQ1 = session.createConsumer(destinationQ1);
+
+            LOG.info("*** consume message from Q1 and rolled back..");
+
+            TextMessage textMessage = (TextMessage) consumerQ1.receive();
+            LOG.info("got redelivered: " + textMessage);
+            assertFalse("JMSRedelivered flag is not set", textMessage.getJMSRedelivered());
+            session.rollback();
+
+            LOG.info("*** consumed message from Q1 again and sending to Q2..");
+            TextMessage textMessage2 = (TextMessage) consumerQ1.receive();
+            LOG.info("got: " + textMessage2);
+            session.commit();
+            assertTrue("JMSRedelivered flag is set", textMessage2.getJMSRedelivered());
+
+            //send message to Q2 and consume from Q2
+            MessageConsumer consumerQ2 = session.createConsumer(destinationQ2);
+            MessageProducer producer_two = session.createProducer(destinationQ2);
+            producer_two.send(textMessage2);
+            session.commit();
+
+            //Message should be available straight away on the queue_two
+            Message textMessage3 = consumerQ2.receive(1000);
+            assertNotNull("should have consumed a message from TEST_QUEUE_TWO", textMessage3);
+            assertFalse("JMSRedelivered flag is not set", textMessage3.getJMSRedelivered());
+            session.commit();
+
+        } finally {
+
+            if (connection != null) {
+                connection.close();
+            }
+
+            if (broker != null) {
+                broker.stop();
+            }
+
+        }
+
+    }
+
+    protected BrokerService createBroker(boolean withJMX) throws Exception {
+        File schedulerDirectory = new File("target/scheduler");
+        IOHelper.mkdirs(schedulerDirectory);
+        IOHelper.deleteChildren(schedulerDirectory);
+
+        BrokerService answer = new BrokerService();
+        answer.setAdvisorySupport(false);
+        answer.setDataDirectory("target");
+        answer.setSchedulerDirectoryFile(schedulerDirectory);
+        answer.setSchedulerSupport(true);
+        answer.setPersistent(true);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.setUseJmx(withJMX);
+
+        RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin();
+        RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
+        RedeliveryPolicy defaultEntry = new RedeliveryPolicy();
+        defaultEntry.setInitialRedeliveryDelay(5000);
+        defaultEntry.setMaximumRedeliveries(5);
+        redeliveryPolicyMap.setDefaultEntry(defaultEntry);
+        redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap);
+
+        answer.setPlugins(new BrokerPlugin[] {redeliveryPlugin});
+        TransportConnector transportConnector =
+                answer.addConnector("tcp://localhost:0");
+
+        transportURL = transportConnector.getConnectUri().toASCIIString();
+
+        return answer;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java
new file mode 100644
index 0000000..a2c117e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SlowConsumerTest extends TestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SlowConsumerTest.class);
+    private static final int MESSAGES_COUNT = 10000;
+
+    private final int messageLogFrequency = 2500;
+    private final long messageReceiveTimeout = 10000L;
+
+    private Socket stompSocket;
+    private ByteArrayOutputStream inputBuffer;
+    private int messagesCount;
+
+    /**
+     * @param args
+     * @throws Exception
+     */
+    public void testRemoveSubscriber() throws Exception {
+        final BrokerService broker = new BrokerService();
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        broker.addConnector("tcp://localhost:0").setName("Default");
+        broker.start();
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                broker.getTransportConnectors().get(0).getPublishableConnectString());
+        final Connection connection = factory.createConnection();
+        connection.start();
+
+        Thread producingThread = new Thread("Producing thread") {
+            public void run() {
+                try {
+                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(new ActiveMQQueue(getDestinationName()));
+                    for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
+                        Message message = session.createTextMessage("" + idx);
+                        producer.send(message);
+                        LOG.debug("Sending: " + idx);
+                    }
+                    producer.close();
+                    session.close();
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+        producingThread.setPriority(Thread.MAX_PRIORITY);
+        producingThread.start();
+        Thread.sleep(1000);
+
+        Thread consumingThread = new Thread("Consuming thread") {
+
+            public void run() {
+                try {
+                    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                    MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationName()));
+                    int diff = 0;
+                    while (messagesCount != MESSAGES_COUNT) {
+                        Message msg = consumer.receive(messageReceiveTimeout);
+                        if (msg == null) {
+                            LOG.warn("Got null message at count: " + messagesCount + ". Continuing...");
+                            break;
+                        }
+                        String text = ((TextMessage)msg).getText();
+                        int currentMsgIdx = Integer.parseInt(text);
+                        LOG.debug("Received: " + text + " messageCount: " + messagesCount);
+                        msg.acknowledge();
+                        if ((messagesCount + diff) != currentMsgIdx) {
+                            LOG.debug("Message(s) skipped!! Should be message no.: " + messagesCount + " but got: " + currentMsgIdx);
+                            diff = currentMsgIdx - messagesCount;
+                        }
+                        ++messagesCount;
+                        if (messagesCount % messageLogFrequency == 0) {
+                            LOG.info("Received: " + messagesCount + " messages so far");
+                        }
+                        // Thread.sleep(70);
+                    }
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+        consumingThread.start();
+        consumingThread.join();
+
+        assertEquals(MESSAGES_COUNT, messagesCount);
+
+    }
+
+    public void sendFrame(String data) throws Exception {
+        byte[] bytes = data.getBytes("UTF-8");
+        OutputStream outputStream = stompSocket.getOutputStream();
+        for (int i = 0; i < bytes.length; i++) {
+            outputStream.write(bytes[i]);
+        }
+        outputStream.flush();
+    }
+
+    public String receiveFrame(long timeOut) throws Exception {
+        stompSocket.setSoTimeout((int)timeOut);
+        InputStream is = stompSocket.getInputStream();
+        int c = 0;
+        for (;;) {
+            c = is.read();
+            if (c < 0) {
+                throw new IOException("socket closed.");
+            } else if (c == 0) {
+                c = is.read();
+                byte[] ba = inputBuffer.toByteArray();
+                inputBuffer.reset();
+                return new String(ba, "UTF-8");
+            } else {
+                inputBuffer.write(c);
+            }
+        }
+    }
+
+    protected String getDestinationName() {
+        return getClass().getName() + "." + getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java
new file mode 100644
index 0000000..ba15941
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.leveldb.LevelDBStore;
+
+
+public class SparseAckReplayAfterStoreCleanupLevelDBStoreTest extends AMQ2832Test {
+    @Override
+    protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
+        LevelDBStore store = new LevelDBStore();
+        store.setFlushDelay(0);
+        brokerService.setPersistenceAdapter(store);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java
new file mode 100644
index 0000000..44e7f5d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.junit.Test;
+
+/**
+ * Demonstrates how unmarshalled VM advisory messages for temporary queues prevent other connections from being closed.
+ */
+public class TempQueueDeleteOnCloseTest {
+
+    @Test
+    public void test() throws Exception {
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+
+        // create a connection and session with a temporary queue
+        Connection connectionA = connectionFactory.createConnection();
+        connectionA.setClientID("ConnectionA");
+        Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination tempQueueA = sessionA.createTemporaryQueue();
+        MessageConsumer consumer = sessionA.createConsumer(tempQueueA);
+        connectionA.start();
+
+        // start and stop another connection
+        Connection connectionB = connectionFactory.createConnection();
+        connectionB.setClientID("ConnectionB");
+        connectionB.start();
+        connectionB.close();
+
+        consumer.close();
+        connectionA.close();
+    }
+}


Mime
View raw message