activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [41/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:37:12 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
new file mode 100644
index 0000000..70e15ec
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerFlowControlTest extends JmsTestSupport {
+    static final Logger LOG = LoggerFactory.getLogger(ProducerFlowControlTest.class);
+    ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
+    ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
+    protected TransportConnector connector;
+    protected ActiveMQConnection connection;
+    // used to test sendFailIfNoSpace on SystemUsage 
+    protected final AtomicBoolean gotResourceException = new AtomicBoolean(false);
+
+    public void test2ndPubisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setProducerWindowSize(1024 * 64);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queueB);
+
+        // Test sending to Queue A
+        // 1 few sends should not block until the producer window is used up.
+        fillQueue(queueA);
+
+        // Test sending to Queue B it should not block since the connection
+        // should not be blocked.
+        CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+        assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+        TextMessage msg = (TextMessage)consumer.receive();
+        assertEquals("Message 1", msg.getText());
+        msg.acknowledge();
+
+        pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
+        assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+        msg = (TextMessage)consumer.receive();
+        assertEquals("Message 2", msg.getText());
+        msg.acknowledge();
+    }
+
+    public void testPubisherRecoverAfterBlock() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageProducer producer = session.createProducer(queueA);
+        
+        final AtomicBoolean done = new AtomicBoolean(true);
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+        
+   
+		Thread thread = new Thread("Filler") {
+		    int i;
+			@Override
+			public void run() {
+                while (keepGoing.get()) {
+                    done.set(false);
+                    try {
+						producer.send(session.createTextMessage("Test message " + ++i));
+						LOG.info("sent: " + i);
+					} catch (JMSException e) {
+					}
+                }
+			}
+		};
+		thread.start();
+        waitForBlockedOrResourceLimit(done);
+
+        // after receiveing messges, producer should continue sending messages 
+        // (done == false)
+        MessageConsumer consumer = session.createConsumer(queueA);
+        TextMessage msg;
+        for (int idx = 0; idx < 5; ++idx) {
+        	msg = (TextMessage) consumer.receive(1000);
+        	LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID());
+        	msg.acknowledge();
+        }
+        Thread.sleep(1000);
+        keepGoing.set(false);
+    	
+		assertFalse("producer has resumed", done.get());
+    }
+
+    public void testAsyncPubisherRecoverAfterBlock() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setProducerWindowSize(1024 * 5);
+        factory.setUseAsyncSend(true);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageProducer producer = session.createProducer(queueA);
+        
+        final AtomicBoolean done = new AtomicBoolean(true);
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+        
+   
+        Thread thread = new Thread("Filler") {
+            int i;
+            @Override
+            public void run() {
+                while (keepGoing.get()) {
+                    done.set(false);
+                    try {
+                        producer.send(session.createTextMessage("Test message " + ++i));
+                        LOG.info("sent: " + i);
+                    } catch (JMSException e) {
+                    }
+                }
+            }
+        };
+        thread.start();
+        waitForBlockedOrResourceLimit(done);
+
+        // after receiveing messges, producer should continue sending messages 
+        // (done == false)
+        MessageConsumer consumer = session.createConsumer(queueA);
+        TextMessage msg;
+        for (int idx = 0; idx < 5; ++idx) {
+            msg = (TextMessage) consumer.receive(1000);
+            assertNotNull("Got a message", msg);
+            LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID());
+            msg.acknowledge();
+        }
+        Thread.sleep(1000);
+        keepGoing.set(false);
+        
+        assertFalse("producer has resumed", done.get());
+    }
+
+    public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setAlwaysSyncSend(true);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queueB);
+
+        // Test sending to Queue A
+        // 1st send should not block. But the rest will.
+        fillQueue(queueA);
+
+        // Test sending to Queue B it should not block.
+        CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+        assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+        TextMessage msg = (TextMessage)consumer.receive();
+        assertEquals("Message 1", msg.getText());
+        msg.acknowledge();
+
+        pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
+        assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+        msg = (TextMessage)consumer.receive();
+        assertEquals("Message 2", msg.getText());
+        msg.acknowledge();
+    }
+
+    public void testSimpleSendReceive() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setAlwaysSyncSend(true);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queueA);
+
+        // Test sending to Queue B it should not block.
+        CountDownLatch pubishDoneToQeueuA = asyncSendTo(queueA, "Message 1");
+        assertTrue(pubishDoneToQeueuA.await(2, TimeUnit.SECONDS));
+
+        TextMessage msg = (TextMessage)consumer.receive();
+        assertEquals("Message 1", msg.getText());
+        msg.acknowledge();
+
+        pubishDoneToQeueuA = asyncSendTo(queueA, "Message 2");
+        assertTrue(pubishDoneToQeueuA.await(2, TimeUnit.SECONDS));
+
+        msg = (TextMessage)consumer.receive();
+        assertEquals("Message 2", msg.getText());
+        msg.acknowledge();
+    }
+
+    public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
+        ConnectionFactory factory = createConnectionFactory();
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        // Test sending to Queue A
+        // 1st send should not block.
+        fillQueue(queueA);
+
+        // Test sending to Queue B it should block.
+        // Since even though the it's queue limits have not been reached, the
+        // connection
+        // is blocked.
+        CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+        assertFalse(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+    }
+
+    private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException {
+        final AtomicBoolean done = new AtomicBoolean(true);
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+        // Starts an async thread that every time it publishes it sets the done
+        // flag to false.
+        // Once the send starts to block it will not reset the done flag
+        // anymore.
+        new Thread("Fill thread.") {
+            public void run() {
+                Session session = null;
+                try {
+                    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(queue);
+                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                    while (keepGoing.get()) {
+                        done.set(false);
+                        producer.send(session.createTextMessage("Hello World"));
+                    }
+                } catch (JMSException e) {
+                } finally {
+                    safeClose(session);
+                }
+            }
+        }.start();
+
+        waitForBlockedOrResourceLimit(done);
+        keepGoing.set(false);
+    }
+
+    protected void waitForBlockedOrResourceLimit(final AtomicBoolean done)
+            throws InterruptedException {
+        while (true) {
+            Thread.sleep(1000);
+            // the producer is blocked once the done flag stays true or there is a resource exception
+            if (done.get() || gotResourceException.get()) {
+                break;
+            }
+            done.set(true);
+        }
+    }
+
+    private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException {
+        final CountDownLatch done = new CountDownLatch(1);
+        new Thread("Send thread.") {
+            public void run() {
+                Session session = null;
+                try {
+                    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(queue);
+                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                    producer.send(session.createTextMessage(message));
+                    done.countDown();
+                } catch (JMSException e) {
+                } finally {
+                    safeClose(session);
+                }
+            }
+        }.start();
+        return done;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setPersistent(false);
+        service.setUseJmx(false);
+
+        // Setup a destination policy where it takes only 1 message at a time.
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setMemoryLimit(1);
+        policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
+        policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+        policy.setProducerFlowControl(true);
+        policyMap.setDefaultEntry(policy);
+        service.setDestinationPolicy(policyMap);
+
+        connector = service.addConnector("tcp://localhost:0");
+        return service;
+    }
+
+    public void setUp() throws Exception {
+        setAutoFail(true);
+        super.setUp();
+    }
+    
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            TcpTransport t = (TcpTransport)connection.getTransport().narrow(TcpTransport.class);
+            t.getTransportListener().onException(new IOException("Disposed."));
+            connection.getTransport().stop();
+        }
+        super.tearDown();
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(connector.getConnectUri());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java
new file mode 100644
index 0000000..2a61820
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class QueueConsumerPriorityTest extends TestCase {
+
+    private static final String VM_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=true";
+
+    public QueueConsumerPriorityTest(String name) {
+        super(name);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    private Connection createConnection(final boolean start) throws JMSException {
+        ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL);
+        Connection conn = cf.createConnection();
+        if (start) {
+            conn.start();
+        }
+        return conn;
+    }
+
+    public void testQueueConsumerPriority() throws JMSException, InterruptedException {
+        Connection conn = createConnection(true);
+
+        Session consumerLowPriority = null;
+        Session consumerHighPriority = null;
+        Session senderSession = null;
+
+        try {
+
+            consumerLowPriority = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            consumerHighPriority = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            assertNotNull(consumerHighPriority);
+            senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = getClass().getName();
+            ActiveMQQueue low = new ActiveMQQueue(queueName+"?consumer.priority=1");
+            MessageConsumer lowConsumer = consumerLowPriority.createConsumer(low);
+
+            ActiveMQQueue high = new ActiveMQQueue(queueName+"?consumer.priority=2");
+            MessageConsumer highConsumer = consumerLowPriority.createConsumer(high);
+
+            ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
+
+            MessageProducer producer = senderSession.createProducer(senderQueue);
+
+            Message msg = senderSession.createTextMessage("test");
+            for (int i =0; i< 10000;i++) {
+                producer.send(msg);
+                assertNotNull("null on iteration: " + i, highConsumer.receive(500));
+            }
+            assertNull(lowConsumer.receive(2000));
+
+        } finally {
+            conn.close();
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java
new file mode 100644
index 0000000..81d13b2
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.InvalidClientIDException;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import junit.framework.Test;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.spi.LoggingEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReconnectWithSameClientIDTest extends EmbeddedBrokerTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(ReconnectWithSameClientIDTest.class);
+
+    protected Connection connection;
+    protected boolean transacted;
+    protected int authMode = Session.AUTO_ACKNOWLEDGE;
+    public boolean useFailover = false;
+
+    public static Test suite() {
+        return suite(ReconnectWithSameClientIDTest.class);
+    }
+
+    public void initCombosForTestReconnectMultipleTimesWithSameClientID() {
+        addCombinationValues("useFailover", new Object[]{Boolean.FALSE, Boolean.TRUE});
+    }
+
+    public void testReconnectMultipleTimesWithSameClientID() throws Exception {
+
+        org.apache.log4j.Logger log4jLogger =
+                org.apache.log4j.Logger.getLogger(org.apache.activemq.broker.jmx.ManagedTransportConnection.class);
+        final AtomicBoolean failed = new AtomicBoolean(false);
+
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getMessage().toString().startsWith("Failed to register MBean")) {
+                    LOG.info("received unexpected log message: " + event.getMessage());
+                    failed.set(true);
+                }
+            }
+        };
+        log4jLogger.addAppender(appender);
+        try {
+            connection = connectionFactory.createConnection();
+            useConnection(connection);
+
+            // now lets create another which should fail
+            for (int i = 1; i < 11; i++) {
+                Connection connection2 = connectionFactory.createConnection();
+                try {
+                    useConnection(connection2);
+                    fail("Should have thrown InvalidClientIDException on attempt" + i);
+                } catch (InvalidClientIDException e) {
+                    LOG.info("Caught expected: " + e);
+                } finally {
+                    connection2.close();
+                }
+            }
+
+            // now lets try closing the original connection and creating a new
+            // connection with the same ID
+            connection.close();
+            connection = connectionFactory.createConnection();
+            useConnection(connection);
+        } finally {
+            log4jLogger.removeAppender(appender);
+        }
+        assertFalse("failed on unexpected log event", failed.get());
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory((useFailover ? "failover:" : "") +
+                broker.getTransportConnectors().get(0).getPublishableConnectString());
+    }
+
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://localhost:0";
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    protected void useConnection(Connection connection) throws JMSException {
+        connection.setClientID("foo");
+        connection.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
new file mode 100644
index 0000000..e2b5867
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -0,0 +1,830 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import junit.framework.Test;
+
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+
+public class RedeliveryPolicyTest extends JmsTestSupport {
+
+    public static Test suite() {
+        return suite(RedeliveryPolicyTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+
+    public void testGetNext() throws Exception {
+
+        RedeliveryPolicy policy = new RedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(0);
+        policy.setRedeliveryDelay(500);
+        policy.setBackOffMultiplier((short) 2);
+        policy.setUseExponentialBackOff(true);
+
+        long delay = policy.getNextRedeliveryDelay(0);
+        assertEquals(500, delay);
+        delay = policy.getNextRedeliveryDelay(delay);
+        assertEquals(500*2, delay);
+        delay = policy.getNextRedeliveryDelay(delay);
+        assertEquals(500*4, delay);
+
+        policy.setUseExponentialBackOff(false);
+        delay = policy.getNextRedeliveryDelay(delay);
+        assertEquals(500, delay);
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testExponentialRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(0);
+        policy.setRedeliveryDelay(500);
+        policy.setBackOffMultiplier((short) 2);
+        policy.setUseExponentialBackOff(true);
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue(getName());
+        MessageProducer producer = session.createProducer(destination);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+
+        TextMessage m;
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        // No delay on first rollback..
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        session.rollback();
+
+        // Show subsequent re-delivery delay is incrementing.
+        m = (TextMessage)consumer.receive(100);
+        assertNull(m);
+
+        m = (TextMessage)consumer.receive(700);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        // Show re-delivery delay is incrementing exponentially
+        m = (TextMessage)consumer.receive(100);
+        assertNull(m);
+        m = (TextMessage)consumer.receive(500);
+        assertNull(m);
+        m = (TextMessage)consumer.receive(700);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+
+    }
+
+
+    /**
+     * @throws Exception
+     */
+    public void testNornalRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(0);
+        policy.setRedeliveryDelay(500);
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue(getName());
+        MessageProducer producer = session.createProducer(destination);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+
+        TextMessage m;
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        // No delay on first rollback..
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        session.rollback();
+
+        // Show subsequent re-delivery delay is incrementing.
+        m = (TextMessage)consumer.receive(100);
+        assertNull(m);
+        m = (TextMessage)consumer.receive(700);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        // The message gets redelivered after 500 ms every time since
+        // we are not using exponential backoff.
+        m = (TextMessage)consumer.receive(100);
+        assertNull(m);
+        m = (TextMessage)consumer.receive(700);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testDLQHandling() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(100);
+        policy.setUseExponentialBackOff(false);
+        policy.setMaximumRedeliveries(2);
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+
+        TextMessage m;
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        // The last rollback should cause the 1st message to get sent to the DLQ
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.commit();
+
+        // We should be able to get the message off the DLQ now.
+        m = (TextMessage)dlqConsumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+        assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
+        session.commit();
+
+    }
+
+
+    /**
+     * @throws Exception
+     */
+    public void testInfiniteMaximumNumberOfRedeliveries() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(100);
+        policy.setUseExponentialBackOff(false);
+       //  let's set the maximum redeliveries to no maximum (ie. infinite)
+        policy.setMaximumRedeliveries(-1);
+
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+
+        TextMessage m;
+
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        //we should be able to get the 1st message redelivered until a session.commit is called
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.commit();
+
+        m = (TextMessage)consumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.commit();
+
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testMaximumRedeliveryDelay() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(10);
+        policy.setUseExponentialBackOff(true);
+        policy.setMaximumRedeliveries(-1);
+        policy.setRedeliveryDelay(50);
+        policy.setMaximumRedeliveryDelay(1000);
+        policy.setBackOffMultiplier((short) 2);
+        policy.setUseExponentialBackOff(true);
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+
+        TextMessage m;
+
+        for(int i = 0; i < 10; ++i) {
+            // we should be able to get the 1st message redelivered until a session.commit is called
+            m = (TextMessage)consumer.receive(2000);
+            assertNotNull(m);
+            assertEquals("1st", m.getText());
+            session.rollback();
+        }
+
+        m = (TextMessage)consumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.commit();
+
+        m = (TextMessage)consumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.commit();
+
+        assertTrue(policy.getNextRedeliveryDelay(Long.MAX_VALUE) == 1000 );
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testZeroMaximumNumberOfRedeliveries() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(100);
+        policy.setUseExponentialBackOff(false);
+        //let's set the maximum redeliveries to 0
+        policy.setMaximumRedeliveries(0);
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+
+        TextMessage m;
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        //the 1st  message should not be redelivered since maximumRedeliveries is set to 0
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.commit();
+
+
+
+    }
+
+    public void testRepeatedRedeliveryReceiveNoCommit() throws Exception {
+
+        connection.start();
+        Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = dlqSession.createProducer(destination);
+
+        // Send the messages
+        producer.send(dlqSession.createTextMessage("1st"));
+
+        dlqSession.commit();
+        MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+
+        final int maxRedeliveries = 4;
+        for (int i=0;i<=maxRedeliveries +1;i++) {
+
+            connection = (ActiveMQConnection)factory.createConnection(userName, password);
+            connections.add(connection);
+            // Receive a message with the JMS API
+            RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+            policy.setInitialRedeliveryDelay(0);
+            policy.setUseExponentialBackOff(false);
+            policy.setMaximumRedeliveries(maxRedeliveries);
+
+            connection.start();
+            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            ActiveMQTextMessage m = ((ActiveMQTextMessage)consumer.receive(4000));
+            if (i<=maxRedeliveries) {
+                assertEquals("1st", m.getText());
+                assertEquals(i, m.getRedeliveryCounter());
+            } else {
+                assertNull("null on exceeding redelivery count", m);
+            }
+            connection.close();
+            connections.remove(connection);
+        }
+
+        // We should be able to get the message off the DLQ now.
+        TextMessage m = (TextMessage)dlqConsumer.receive(1000);
+        assertNotNull("Got message from DLQ", m);
+        assertEquals("1st", m.getText());
+        String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+        assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
+        dlqSession.commit();
+
+    }
+
+
+    public void testRepeatedRedeliveryOnMessageNoCommit() throws Exception {
+
+        connection.start();
+        Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = dlqSession.createProducer(destination);
+
+        // Send the messages
+        producer.send(dlqSession.createTextMessage("1st"));
+
+        dlqSession.commit();
+        MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+
+        final int maxRedeliveries = 4;
+        final AtomicInteger receivedCount = new AtomicInteger(0);
+
+        for (int i=0;i<=maxRedeliveries+1;i++) {
+
+            connection = (ActiveMQConnection)factory.createConnection(userName, password);
+            connections.add(connection);
+
+            RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+            policy.setInitialRedeliveryDelay(0);
+            policy.setUseExponentialBackOff(false);
+            policy.setMaximumRedeliveries(maxRedeliveries);
+
+            connection.start();
+            final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageConsumer consumer = session.createConsumer(destination);
+            final CountDownLatch done = new CountDownLatch(1);
+
+            consumer.setMessageListener(new MessageListener(){
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        ActiveMQTextMessage m = (ActiveMQTextMessage)message;
+                        assertEquals("1st", m.getText());
+                        assertEquals(receivedCount.get(), m.getRedeliveryCounter());
+                        receivedCount.incrementAndGet();
+                        done.countDown();
+                    } catch (Exception ignored) {
+                        ignored.printStackTrace();
+                    }
+                }
+            });
+
+            if (i<=maxRedeliveries) {
+                assertTrue("listener done", done.await(5, TimeUnit.SECONDS));
+            } else {
+                // final redlivery gets poisoned before dispatch
+                assertFalse("listener done", done.await(1, TimeUnit.SECONDS));
+            }
+            connection.close();
+            connections.remove(connection);
+        }
+
+        // We should be able to get the message off the DLQ now.
+        TextMessage m = (TextMessage)dlqConsumer.receive(1000);
+        assertNotNull("Got message from DLQ", m);
+        assertEquals("1st", m.getText());
+        String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+        assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
+        dlqSession.commit();
+
+    }
+
+    public void testRepeatedRedeliveryServerSessionNoCommit() throws Exception {
+
+        connection.start();
+        Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = dlqSession.createProducer(destination);
+
+        // Send the messages
+        producer.send(dlqSession.createTextMessage("1st"));
+
+        dlqSession.commit();
+        MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+
+        final int maxRedeliveries = 4;
+        final AtomicInteger receivedCount = new AtomicInteger(0);
+
+        for (int i=0;i<=maxRedeliveries+1;i++) {
+
+            connection = (ActiveMQConnection)factory.createConnection(userName, password);
+            connections.add(connection);
+
+            RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+            policy.setInitialRedeliveryDelay(0);
+            policy.setUseExponentialBackOff(false);
+            policy.setMaximumRedeliveries(maxRedeliveries);
+
+            connection.start();
+            final CountDownLatch done = new CountDownLatch(1);
+
+            final ActiveMQSession session = (ActiveMQSession) connection.createSession(true, Session.SESSION_TRANSACTED);
+            session.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        ActiveMQTextMessage m = (ActiveMQTextMessage) message;
+                        assertEquals("1st", m.getText());
+                        assertEquals(receivedCount.get(), m.getRedeliveryCounter());
+                        receivedCount.incrementAndGet();
+                        done.countDown();
+                    } catch (Exception ignored) {
+                        ignored.printStackTrace();
+                    }
+                }
+            });
+
+            connection.createConnectionConsumer(
+                    destination,
+                    null,
+                    new ServerSessionPool() {
+                        @Override
+                        public ServerSession getServerSession() throws JMSException {
+                            return new ServerSession() {
+                                @Override
+                                public Session getSession() throws JMSException {
+                                    return session;
+                                }
+
+                                @Override
+                                public void start() throws JMSException {
+                                }
+                            };
+                        }
+                    },
+                    100,
+                    false);
+
+            Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    session.run();
+                    return done.await(10, TimeUnit.MILLISECONDS);
+                }
+            });
+
+            if (i<=maxRedeliveries) {
+                assertTrue("listener done @" + i, done.await(5, TimeUnit.SECONDS));
+            } else {
+                // final redlivery gets poisoned before dispatch
+                assertFalse("listener not done @" + i, done.await(1, TimeUnit.SECONDS));
+            }
+            connection.close();
+            connections.remove(connection);
+        }
+
+        // We should be able to get the message off the DLQ now.
+        TextMessage m = (TextMessage)dlqConsumer.receive(1000);
+        assertNotNull("Got message from DLQ", m);
+        assertEquals("1st", m.getText());
+        String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+        assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
+        dlqSession.commit();
+
+    }
+
+    public void testInitialRedeliveryDelayZero() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(0);
+        policy.setUseExponentialBackOff(false);
+        policy.setMaximumRedeliveries(1);
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+
+        TextMessage m;
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.commit();
+
+        session.commit();
+    }
+
+
+    public void testInitialRedeliveryDelayOne() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(1000);
+        policy.setUseExponentialBackOff(false);
+        policy.setMaximumRedeliveries(1);
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+
+        TextMessage m;
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(100);
+        assertNull(m);
+
+        m = (TextMessage)consumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.commit();
+    }
+
+    public void testRedeliveryDelayOne() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(0);
+        policy.setRedeliveryDelay(1000);
+        policy.setUseExponentialBackOff(false);
+        policy.setMaximumRedeliveries(2);
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+
+        TextMessage m;
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull("first immediate redelivery", m);
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(100);
+        assertNull("second delivery delayed: " + m, m);
+
+        m = (TextMessage)consumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.commit();
+    }
+
+    public void testRedeliveryPolicyPerDestination() throws Exception {
+
+        RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
+        queuePolicy.setInitialRedeliveryDelay(0);
+        queuePolicy.setRedeliveryDelay(1000);
+        queuePolicy.setUseExponentialBackOff(false);
+        queuePolicy.setMaximumRedeliveries(2);
+
+        RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
+        topicPolicy.setInitialRedeliveryDelay(0);
+        topicPolicy.setRedeliveryDelay(1000);
+        topicPolicy.setUseExponentialBackOff(false);
+        topicPolicy.setMaximumRedeliveries(3);
+
+        // Receive a message with the JMS API
+        RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
+        map.put(new ActiveMQTopic(">"), topicPolicy);
+        map.put(new ActiveMQQueue(">"), queuePolicy);
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue queue = new ActiveMQQueue("TEST");
+        ActiveMQTopic topic = new ActiveMQTopic("TEST");
+
+        MessageProducer producer = session.createProducer(null);
+
+        MessageConsumer queueConsumer = session.createConsumer(queue);
+        MessageConsumer topicConsumer = session.createConsumer(topic);
+
+        // Send the messages
+        producer.send(queue, session.createTextMessage("1st"));
+        producer.send(queue, session.createTextMessage("2nd"));
+        producer.send(topic, session.createTextMessage("1st"));
+        producer.send(topic, session.createTextMessage("2nd"));
+
+        session.commit();
+
+        TextMessage m;
+        m = (TextMessage)queueConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        m = (TextMessage)topicConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        m = (TextMessage)queueConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        m = (TextMessage)topicConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.rollback();
+
+        m = (TextMessage)queueConsumer.receive(100);
+        assertNotNull("first immediate redelivery", m);
+        m = (TextMessage)topicConsumer.receive(100);
+        assertNotNull("first immediate redelivery", m);
+        session.rollback();
+
+        m = (TextMessage)queueConsumer.receive(100);
+        assertNull("second delivery delayed: " + m, m);
+        m = (TextMessage)topicConsumer.receive(100);
+        assertNull("second delivery delayed: " + m, m);
+
+        m = (TextMessage)queueConsumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        m = (TextMessage)topicConsumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+
+        m = (TextMessage)queueConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        m = (TextMessage)topicConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.rollback();
+
+        m = (TextMessage)queueConsumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        m = (TextMessage)topicConsumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+
+        m = (TextMessage)queueConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        m = (TextMessage)topicConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.rollback();
+
+        // No third attempt for the Queue consumer
+        m = (TextMessage)queueConsumer.receive(2000);
+        assertNull(m);
+        m = (TextMessage)topicConsumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+
+        m = (TextMessage)queueConsumer.receive(100);
+        assertNull(m);
+        m = (TextMessage)topicConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.commit();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java
new file mode 100644
index 0000000..009221b
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.management.ObjectName;
+
+import org.apache.activemq.advisory.DestinationSource;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemoveDestinationTest {
+
+    private static final String VM_BROKER_URL = "vm://localhost?create=false";
+    private static final String BROKER_URL = "broker:vm://localhost?broker.persistent=false&broker.useJmx=true";
+
+    BrokerService broker;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = BrokerFactory.createBroker(new URI(BROKER_URL));
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+        broker = null;
+    }
+
+    private Connection createConnection(final boolean start) throws JMSException {
+        ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL);
+        Connection conn = cf.createConnection();
+        if (start) {
+            conn.start();
+        }
+        return conn;
+    }
+
+    @Test
+    public void testRemoveDestinationWithoutSubscriber() throws Exception {
+
+        ActiveMQConnection amqConnection = (ActiveMQConnection) createConnection(true);
+        DestinationSource destinationSource = amqConnection.getDestinationSource();
+        Session session = amqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic("TEST.FOO");
+        MessageProducer producer = session.createProducer(topic);
+        MessageConsumer consumer = session.createConsumer(topic);
+
+        TextMessage msg = session.createTextMessage("Hellow World");
+        producer.send(msg);
+        assertNotNull(consumer.receive(5000));
+        Thread.sleep(1000);
+
+        ActiveMQTopic amqTopic = (ActiveMQTopic) topic;
+        assertTrue(destinationSource.getTopics().contains(amqTopic));
+
+        consumer.close();
+        producer.close();
+        session.close();
+
+        Thread.sleep(3000);
+        amqConnection.destroyDestination((ActiveMQDestination) topic);
+        Thread.sleep(3000);
+        assertFalse(destinationSource.getTopics().contains(amqTopic));
+    }
+
+    @Test
+    public void testRemoveDestinationWithSubscriber() throws Exception {
+        ActiveMQConnection amqConnection = (ActiveMQConnection) createConnection(true);
+        DestinationSource destinationSource = amqConnection.getDestinationSource();
+
+        Session session = amqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic("TEST.FOO");
+        MessageProducer producer = session.createProducer(topic);
+        MessageConsumer consumer = session.createConsumer(topic);
+
+        TextMessage msg = session.createTextMessage("Hellow World");
+        producer.send(msg);
+        assertNotNull(consumer.receive(5000));
+        Thread.sleep(1000);
+
+        ActiveMQTopic amqTopic = (ActiveMQTopic) topic;
+
+        assertTrue(destinationPresentInAdminView(broker, amqTopic));
+        assertTrue(destinationSource.getTopics().contains(amqTopic));
+
+        // This line generates a broker error since the consumer is still active.
+        try {
+            amqConnection.destroyDestination((ActiveMQDestination) topic);
+            fail("expect exception on destroy if comsumer present");
+        } catch (JMSException expected) {
+            assertTrue(expected.getMessage().indexOf(amqTopic.getTopicName()) != -1);
+        }
+
+        Thread.sleep(3000);
+
+        assertTrue(destinationSource.getTopics().contains(amqTopic));
+        assertTrue(destinationPresentInAdminView(broker, amqTopic));
+
+        consumer.close();
+        producer.close();
+        session.close();
+
+        Thread.sleep(3000);
+
+        // The destination will not be removed with this call, but if you remove
+        // the call above that generates the error it will.
+        amqConnection.destroyDestination(amqTopic);
+        Thread.sleep(3000);
+        assertFalse(destinationSource.getTopics().contains(amqTopic));
+        assertFalse(destinationPresentInAdminView(broker, amqTopic));
+    }
+
+    private boolean destinationPresentInAdminView(BrokerService broker2, ActiveMQTopic amqTopic) throws Exception {
+        boolean found = false;
+        for (ObjectName name : broker.getAdminView().getTopics()) {
+
+            DestinationViewMBean proxy = (DestinationViewMBean)
+                broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
+
+            if (proxy.getName().equals(amqTopic.getPhysicalName())) {
+                found = true;
+                break;
+            }
+        }
+        return found;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/SpringTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/SpringTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/SpringTestSupport.java
new file mode 100644
index 0000000..f713120
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/SpringTestSupport.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.springframework.context.support.AbstractApplicationContext;
+
+/**
+ * A useful base class for spring based unit test cases
+ *
+ *
+ */
+public abstract class SpringTestSupport extends TestCase {
+
+    protected AbstractApplicationContext context;
+
+    @Override
+    protected void setUp() throws Exception {
+        context = createApplicationContext();
+    }
+
+    protected abstract AbstractApplicationContext createApplicationContext();;
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (context != null) {
+            context.destroy();
+        }
+    }
+
+    protected Object getBean(String name) {
+        Object bean = context.getBean(name);
+        if (bean == null) {
+            fail("Should have found bean named '" + name + "' in the Spring ApplicationContext");
+        }
+        return bean;
+    }
+
+    protected void assertSetEquals(String description, Object[] expected, Set<?> actual) {
+        Set<Object> expectedSet = new HashSet<Object>();
+        expectedSet.addAll(Arrays.asList(expected));
+        assertEquals(description, expectedSet, actual);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
new file mode 100644
index 0000000..a762f89
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+
+/**
+ * Useful base class for unit test cases
+ *
+ *
+ */
+public abstract class TestSupport extends CombinationTestSupport {
+
+    protected ActiveMQConnectionFactory connectionFactory;
+    protected boolean topic = true;
+    public PersistenceAdapterChoice defaultPersistenceAdapter = PersistenceAdapterChoice.KahaDB;
+
+    protected ActiveMQMessage createMessage() {
+        return new ActiveMQMessage();
+    }
+
+    protected Destination createDestination(String subject) {
+        if (topic) {
+            return new ActiveMQTopic(subject);
+        } else {
+            return new ActiveMQQueue(subject);
+        }
+    }
+
+    protected Destination createDestination() {
+        return createDestination(getDestinationString());
+    }
+
+    /**
+     * Returns the name of the destination used in this test case
+     */
+    protected String getDestinationString() {
+        return getClass().getName() + "." + getName(true);
+    }
+
+    /**
+     * @param messsage
+     * @param firstSet
+     * @param secondSet
+     */
+    protected void assertTextMessagesEqual(String messsage, Message[] firstSet, Message[] secondSet)
+        throws JMSException {
+        assertEquals("Message count does not match: " + messsage, firstSet.length, secondSet.length);
+        for (int i = 0; i < secondSet.length; i++) {
+            TextMessage m1 = (TextMessage)firstSet[i];
+            TextMessage m2 = (TextMessage)secondSet[i];
+            assertFalse("Message " + (i + 1) + " did not match : " + messsage + ": expected {" + m1
+                        + "}, but was {" + m2 + "}", m1 == null ^ m2 == null);
+            assertEquals("Message " + (i + 1) + " did not match: " + messsage + ": expected {" + m1
+                         + "}, but was {" + m2 + "}", m1.getText(), m2.getText());
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+    }
+
+    /**
+     * Factory method to create a new connection
+     */
+    protected Connection createConnection() throws Exception {
+        return getConnectionFactory().createConnection();
+    }
+
+    public ActiveMQConnectionFactory getConnectionFactory() throws Exception {
+        if (connectionFactory == null) {
+            connectionFactory = createConnectionFactory();
+            assertTrue("Should have created a connection factory!", connectionFactory != null);
+        }
+        return connectionFactory;
+    }
+
+    protected String getConsumerSubject() {
+        return getSubject();
+    }
+
+    protected String getProducerSubject() {
+        return getSubject();
+    }
+
+    protected String getSubject() {
+        return getName();
+    }
+
+    public static void recursiveDelete(File f) {
+        if (f.isDirectory()) {
+            File[] files = f.listFiles();
+            for (int i = 0; i < files.length; i++) {
+                recursiveDelete(files[i]);
+            }
+        }
+        f.delete();
+    }
+
+    public static void removeMessageStore() {
+        if (System.getProperty("activemq.store.dir") != null) {
+            recursiveDelete(new File(System.getProperty("activemq.store.dir")));
+        }
+        if (System.getProperty("derby.system.home") != null) {
+            recursiveDelete(new File(System.getProperty("derby.system.home")));
+        }
+    }
+
+    public static DestinationStatistics getDestinationStatistics(BrokerService broker, ActiveMQDestination destination) {
+        DestinationStatistics result = null;
+        org.apache.activemq.broker.region.Destination dest = getDestination(broker, destination);
+        if (dest != null) {
+            result = dest.getDestinationStatistics();
+        }
+        return result;
+    }
+
+    public static org.apache.activemq.broker.region.Destination getDestination(BrokerService target, ActiveMQDestination destination) {
+        org.apache.activemq.broker.region.Destination result = null;
+        for (org.apache.activemq.broker.region.Destination dest : getDestinationMap(target, destination).values()) {
+            if (dest.getName().equals(destination.getPhysicalName())) {
+                result = dest;
+                break;
+            }
+        }
+        return result;
+    }
+
+    private static Map<ActiveMQDestination, org.apache.activemq.broker.region.Destination> getDestinationMap(BrokerService target,
+            ActiveMQDestination destination) {
+        RegionBroker regionBroker = (RegionBroker) target.getRegionBroker();
+        if (destination.isTemporary()) {
+            return destination.isQueue() ? regionBroker.getTempQueueRegion().getDestinationMap() :
+                    regionBroker.getTempTopicRegion().getDestinationMap();
+        }
+        return destination.isQueue() ?
+                    regionBroker.getQueueRegion().getDestinationMap() :
+                        regionBroker.getTopicRegion().getDestinationMap();
+    }
+
+    public static enum PersistenceAdapterChoice {LevelDB, KahaDB, AMQ, JDBC, MEM };
+
+    public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
+        return setPersistenceAdapter(broker, defaultPersistenceAdapter);
+    }
+
+    public static PersistenceAdapter setPersistenceAdapter(BrokerService broker, PersistenceAdapterChoice choice) throws IOException {
+        PersistenceAdapter adapter = null;
+        switch (choice) {
+        case JDBC:
+            JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
+            jdbcPersistenceAdapter.setUseLock(false); // rollback (at shutdown) on derby can take a long time with file io etc
+            adapter = jdbcPersistenceAdapter;
+            break;
+        case KahaDB:
+            adapter = new KahaDBPersistenceAdapter();
+            break;
+        case LevelDB:
+            adapter = new LevelDBPersistenceAdapter();
+            break;
+        case MEM:
+            adapter = new MemoryPersistenceAdapter();
+            break;
+        }
+        broker.setPersistenceAdapter(adapter);
+        adapter.setDirectory(new File(broker.getBrokerDataDirectory(), choice.name()));
+        return adapter;
+    }
+
+    public void stopBrokerWithStoreFailure(BrokerService broker, PersistenceAdapterChoice choice) throws Exception {
+        switch (choice) {
+            case KahaDB:
+                KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+
+                // have the broker stop with an IOException on next checkpoint so it has a pending local transaction to recover
+                kahaDBPersistenceAdapter.getStore().getJournal().close();
+                break;
+            default:
+                // just stop normally by default
+                broker.stop();
+        }
+        broker.waitUntilStopped();
+    }
+
+
+    /**
+     * Test if base directory contains spaces
+     */
+    protected void assertBaseDirectoryContainsSpaces() {
+        assertFalse("Base directory cannot contain spaces.", new File(System.getProperty("basedir", ".")).getAbsoluteFile().toString().contains(" "));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java
new file mode 100644
index 0000000..2ec57b5
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.util.UDPTraceBrokerPlugin;
+import org.apache.activemq.broker.view.ConnectionDotFilePlugin;
+
+public class TimeStampTest extends TestCase {
+    public void test() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(true);
+        broker.setPlugins(new BrokerPlugin[] {new ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin()});
+        TransportConnector tcpConnector = broker.addConnector("tcp://localhost:0");
+        broker.addConnector("stomp://localhost:0");
+        broker.start();
+
+        // Create a ConnectionFactory
+        ActiveMQConnectionFactory connectionFactory =
+            new ActiveMQConnectionFactory(tcpConnector.getConnectUri());
+
+        // Create a Connection
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        // Create a Session
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Create the destination Queue
+        Destination destination = session.createQueue("TEST.FOO");
+
+        // Create a MessageProducer from the Session to the Topic or Queue
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        // Create a messages
+        Message sentMessage = session.createMessage();
+
+        // Tell the producer to send the message
+        long beforeSend = System.currentTimeMillis();
+        producer.send(sentMessage);
+        long afterSend = System.currentTimeMillis();
+
+        // assert message timestamp is in window
+        assertTrue(beforeSend <= sentMessage.getJMSTimestamp() && sentMessage.getJMSTimestamp() <= afterSend);
+
+        // Create a MessageConsumer from the Session to the Topic or Queue
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Wait for a message
+        Message receivedMessage = consumer.receive(1000);
+
+        // assert we got the same message ID we sent
+        assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID());
+
+        // assert message timestamp is in window
+        assertTrue("JMS Message Timestamp should be set during the send method: \n" + "        beforeSend = " + beforeSend + "\n" + "   getJMSTimestamp = "
+                   + receivedMessage.getJMSTimestamp() + "\n" + "         afterSend = " + afterSend + "\n", beforeSend <= receivedMessage.getJMSTimestamp()
+                                                                                                            && receivedMessage.getJMSTimestamp() <= afterSend);
+
+        // assert message timestamp is unchanged
+        assertEquals("JMS Message Timestamp of recieved message should be the same as the sent message\n        ", sentMessage.getJMSTimestamp(), receivedMessage.getJMSTimestamp());
+
+        // Clean up
+        producer.close();
+        consumer.close();
+        session.close();
+        connection.close();
+        broker.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java
new file mode 100644
index 0000000..5e45d52
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.TransactionRolledBackException;
+
+import org.apache.activemq.transaction.Synchronization;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TransactionContextTest {
+    
+    TransactionContext underTest;
+    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+    ActiveMQConnection connection;
+    
+    
+    @Before
+    public void setup() throws Exception {
+        connection = factory.createActiveMQConnection();
+        underTest = new TransactionContext(connection);
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        connection.close();
+    }
+    
+    @Test
+    public void testSyncBeforeEndCalledOnceOnRollback() throws Exception {
+        final AtomicInteger beforeEndCountA = new AtomicInteger(0);
+        final AtomicInteger beforeEndCountB = new AtomicInteger(0);
+        final AtomicInteger rollbackCountA = new AtomicInteger(0);
+        final AtomicInteger rollbackCountB = new AtomicInteger(0);
+        underTest.addSynchronization(new Synchronization() {
+            @Override
+            public void beforeEnd() throws Exception {
+                if (beforeEndCountA.getAndIncrement() == 0) {
+                    throw new TransactionRolledBackException("force rollback");
+                }
+            }
+
+            @Override
+            public void afterCommit() throws Exception {
+                fail("exepcted rollback exception");
+            }
+
+            @Override
+            public void afterRollback() throws Exception {
+                rollbackCountA.incrementAndGet();
+            }
+            
+        });
+        
+        underTest.addSynchronization(new Synchronization() {
+            @Override
+            public void beforeEnd() throws Exception {
+                beforeEndCountB.getAndIncrement();
+            }
+            
+            @Override     
+            public void afterCommit() throws Exception {
+                fail("exepcted rollback exception");
+            }
+
+            @Override
+            public void afterRollback() throws Exception {
+                rollbackCountB.incrementAndGet();
+            }
+
+        });
+        
+        
+        try {
+            underTest.commit();
+            fail("exepcted rollback exception");
+        } catch (TransactionRolledBackException expected) {
+        }
+        
+        assertEquals("beforeEnd A called once", 1, beforeEndCountA.get());
+        assertEquals("beforeEnd B called once", 1, beforeEndCountA.get());
+        assertEquals("rollbackCount B 0", 1, rollbackCountB.get());
+        assertEquals("rollbackCount A B", rollbackCountA.get(), rollbackCountB.get());
+    }
+    
+    @Test
+    public void testSyncIndexCleared() throws Exception {
+        final AtomicInteger beforeEndCountA = new AtomicInteger(0);
+        final AtomicInteger rollbackCountA = new AtomicInteger(0);
+        Synchronization sync = new Synchronization() {
+            @Override
+            public void beforeEnd() throws Exception {
+                beforeEndCountA.getAndIncrement();
+            }
+            @Override
+            public void afterCommit() throws Exception {
+                fail("exepcted rollback exception");
+            }
+            @Override
+            public void afterRollback() throws Exception {
+                rollbackCountA.incrementAndGet();
+            } 
+        };
+        
+        underTest.begin();
+        underTest.addSynchronization(sync);
+        underTest.rollback();
+        
+        assertEquals("beforeEnd", 1, beforeEndCountA.get());
+        assertEquals("rollback", 1, rollbackCountA.get());
+     
+        // do it again
+        underTest.begin();
+        underTest.addSynchronization(sync);
+        underTest.rollback();
+     
+        assertEquals("beforeEnd", 2, beforeEndCountA.get());
+        assertEquals("rollback", 2, rollbackCountA.get());
+    }
+}


Mime
View raw message