activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [44/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:37:15 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java
new file mode 100644
index 0000000..b689664
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.test.JmsTopicSendReceiveTest;
+
+/**
+ * 
+ */
+public class JmsQueueWildcardSendReceiveTest extends JmsTopicSendReceiveTest {
+
+    private String destination1String = "TEST.ONE.ONE";
+    private String destination2String = "TEST.ONE.ONE.ONE";
+    private String destination3String = "TEST.ONE.TWO";
+    private String destination4String = "TEST.TWO.ONE";
+
+    /**
+     * Sets a test to have a queue destination and non-persistent delivery mode.
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        topic = false;
+        deliveryMode = DeliveryMode.NON_PERSISTENT;
+        super.setUp();
+    }
+
+    /**
+     * Returns the consumer subject.
+     * 
+     * @return String - consumer subject
+     * @see org.apache.activemq.test.TestSupport#getConsumerSubject()
+     */
+    protected String getConsumerSubject() {
+        return "FOO.>";
+    }
+
+    /**
+     * Returns the producer subject.
+     * 
+     * @return String - producer subject
+     * @see org.apache.activemq.test.TestSupport#getProducerSubject()
+     */
+    protected String getProducerSubject() {
+        return "FOO.BAR.HUMBUG";
+    }
+
+    public void testReceiveWildcardQueueEndAsterisk() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQDestination destination1 = (ActiveMQDestination)session.createQueue(destination1String);
+        ActiveMQDestination destination3 = (ActiveMQDestination)session.createQueue(destination3String);
+
+        Message m = null;
+        MessageConsumer consumer = null;
+        String text = null;
+
+        sendMessage(session, destination1, destination1String);
+        sendMessage(session, destination3, destination3String);
+        ActiveMQDestination destination6 = (ActiveMQDestination)session.createQueue("TEST.ONE.*");
+        consumer = session.createConsumer(destination6);
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        text = ((TextMessage)m).getText();
+        if (!(text.equals(destination1String) || text.equals(destination3String))) {
+            fail("unexpected message:" + text);
+        }
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        text = ((TextMessage)m).getText();
+        if (!(text.equals(destination1String) || text.equals(destination3String))) {
+            fail("unexpected message:" + text);
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void testReceiveWildcardQueueEndGreaterThan() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQDestination destination1 = (ActiveMQDestination)session.createQueue(destination1String);
+        ActiveMQDestination destination2 = (ActiveMQDestination)session.createQueue(destination2String);
+        ActiveMQDestination destination3 = (ActiveMQDestination)session.createQueue(destination3String);
+
+        Message m = null;
+        MessageConsumer consumer = null;
+        String text = null;
+
+        sendMessage(session, destination1, destination1String);
+        sendMessage(session, destination2, destination2String);
+        sendMessage(session, destination3, destination3String);
+        ActiveMQDestination destination7 = (ActiveMQDestination)session.createQueue("TEST.ONE.>");
+        consumer = session.createConsumer(destination7);
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        text = ((TextMessage)m).getText();
+        if (!(text.equals(destination1String) || text.equals(destination2String) || text.equals(destination3String))) {
+            fail("unexpected message:" + text);
+        }
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        if (!(text.equals(destination1String) || text.equals(destination2String) || text.equals(destination3String))) {
+            fail("unexpected message:" + text);
+        }
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        if (!(text.equals(destination1String) || text.equals(destination2String) || text.equals(destination3String))) {
+            fail("unexpected message:" + text);
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void testReceiveWildcardQueueMidAsterisk() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQDestination destination1 = (ActiveMQDestination)session.createQueue(destination1String);
+        ActiveMQDestination destination4 = (ActiveMQDestination)session.createQueue(destination4String);
+
+        Message m = null;
+        MessageConsumer consumer = null;
+        String text = null;
+
+        sendMessage(session, destination1, destination1String);
+        sendMessage(session, destination4, destination4String);
+        ActiveMQDestination destination8 = (ActiveMQDestination)session.createQueue("TEST.*.ONE");
+        consumer = session.createConsumer(destination8);
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        text = ((TextMessage)m).getText();
+        if (!(text.equals(destination1String) || text.equals(destination4String))) {
+            fail("unexpected message:" + text);
+        }
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        text = ((TextMessage)m).getText();
+        if (!(text.equals(destination1String) || text.equals(destination4String))) {
+            fail("unexpected message:" + text);
+        }
+        assertNull(consumer.receiveNoWait());
+
+    }
+
+    private void sendMessage(Session session, Destination destination, String text) throws JMSException {
+        MessageProducer producer = session.createProducer(destination);
+        producer.send(session.createTextMessage(text));
+        producer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
new file mode 100644
index 0000000..e5d90d6
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
@@ -0,0 +1,562 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+import org.apache.activemq.transport.vm.VMTransport;
+import org.apache.activemq.util.Wait;
+
+/**
+ * 
+ */
+public class JmsRedeliveredTest extends TestCase {
+
+    private Connection connection;
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+    }
+
+    /**
+     * Creates a connection.
+     * 
+     * @return connection
+     * @throws Exception
+     */
+    protected Connection createConnection() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                                                                          "vm://localhost?broker.persistent=false");
+        return factory.createConnection();
+    }
+
+    /**
+     * Tests if a message unacknowledged message gets to be resent when the
+     * session is closed and then a new consumer session is created.
+     * 
+     */
+    public void testQueueSessionCloseMarksMessageRedelivered() throws JMSException {
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        MessageProducer producer = createProducer(session, queue);
+        producer.send(createTextMessage(session));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
+        // Don't ack the message.
+
+        // Reset the session. This should cause the Unacked message to be
+        // redelivered.
+        session.close();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(2000);
+        assertNotNull(msg);
+        assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
+        msg.acknowledge();
+
+        session.close();
+    }
+
+    
+
+    public void testQueueSessionCloseMarksUnAckedMessageRedelivered() throws JMSException {
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        MessageProducer producer = createProducer(session, queue);
+        producer.send(createTextMessage(session, "1"));
+        producer.send(createTextMessage(session, "2"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
+        assertEquals("1", ((TextMessage)msg).getText());
+        msg.acknowledge();
+        
+        // Don't ack the message.
+        msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
+        assertEquals("2", ((TextMessage)msg).getText());
+        
+        // Reset the session. This should cause the Unacked message to be
+        // redelivered.
+        session.close();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(2000);
+        assertNotNull(msg);
+        assertEquals("2", ((TextMessage)msg).getText());
+        assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
+        msg.acknowledge();
+
+        session.close();
+    }
+
+    /**
+     * Tests session recovery and that the redelivered message is marked as
+     * such. Session uses client acknowledgement, the destination is a queue.
+     * 
+     * @throws JMSException
+     */
+    public void testQueueRecoverMarksMessageRedelivered() throws JMSException {
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        MessageProducer producer = createProducer(session, queue);
+        producer.send(createTextMessage(session));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
+        // Don't ack the message.
+
+        // Reset the session. This should cause the Unacked message to be
+        // redelivered.
+        session.recover();
+
+        // Attempt to Consume the message...
+        msg = consumer.receive(2000);
+        assertNotNull(msg);
+        assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
+        msg.acknowledge();
+
+        session.close();
+    }
+
+    /**
+     * Tests rollback message to be marked as redelivered. Session uses client
+     * acknowledgement and the destination is a queue.
+     * 
+     * @throws JMSException
+     */
+    public void testQueueRollbackMarksMessageRedelivered() throws JMSException {
+        connection.start();
+
+        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        MessageProducer producer = createProducer(session, queue);
+        producer.send(createTextMessage(session));
+        session.commit();
+
+        // Get the message... Should not be redelivered.
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
+
+        // Rollback.. should cause redelivery.
+        session.rollback();
+
+        // Attempt to Consume the message...
+        msg = consumer.receive(2000);
+        assertNotNull(msg);
+        assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
+
+        session.commit();
+        session.close();
+    }
+
+    /**
+     * Tests if the message gets to be re-delivered when the session closes and
+     * that the re-delivered message is marked as such. Session uses client
+     * acknowledgment, the destination is a topic and the consumer is a durable
+     * subscriber.
+     * 
+     * @throws JMSException
+     */
+    public void testDurableTopicSessionCloseMarksMessageRedelivered() throws JMSException {
+        connection.setClientID(getName());
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Topic topic = session.createTopic("topic-" + getName());
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
+
+        // This case only works with persistent messages since transient
+        // messages
+        // are dropped when the consumer goes offline.
+        MessageProducer producer = session.createProducer(topic);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        producer.send(createTextMessage(session));
+
+        // Consume the message...
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertFalse("Message should not be re-delivered.", msg.getJMSRedelivered());
+        // Don't ack the message.
+
+        // Reset the session. This should cause the Unacked message to be
+        // re-delivered.
+        session.close();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        // Attempt to Consume the message...
+        consumer = session.createDurableSubscriber(topic, "sub1");
+        msg = consumer.receive(2000);
+        assertNotNull(msg);
+        assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
+        msg.acknowledge();
+
+        session.close();
+    }
+
+    /**
+     * Tests session recovery and that the redelivered message is marked as
+     * such. Session uses client acknowledgement, the destination is a topic and
+     * the consumer is a durable suscriber.
+     * 
+     * @throws JMSException
+     */
+    public void testDurableTopicRecoverMarksMessageRedelivered() throws JMSException {
+        connection.setClientID(getName());
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Topic topic = session.createTopic("topic-" + getName());
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
+
+        MessageProducer producer = createProducer(session, topic);
+        producer.send(createTextMessage(session));
+
+        // Consume the message...
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
+        // Don't ack the message.
+
+        // Reset the session. This should cause the Unacked message to be
+        // redelivered.
+        session.recover();
+
+        // Attempt to Consume the message...
+        msg = consumer.receive(2000);
+        assertNotNull(msg);
+        assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
+        msg.acknowledge();
+
+        session.close();
+    }
+
+    /**
+     * Tests rollback message to be marked as redelivered. Session uses client
+     * acknowledgement and the destination is a topic.
+     * 
+     * @throws JMSException
+     */
+    public void testDurableTopicRollbackMarksMessageRedelivered() throws JMSException {
+        connection.setClientID(getName());
+        connection.start();
+
+        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        Topic topic = session.createTopic("topic-" + getName());
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
+
+        MessageProducer producer = createProducer(session, topic);
+        producer.send(createTextMessage(session));
+        session.commit();
+
+        // Get the message... Should not be redelivered.
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
+
+        // Rollback.. should cause redelivery.
+        session.rollback();
+
+        // Attempt to Consume the message...
+        msg = consumer.receive(2000);
+        assertNotNull(msg);
+        assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
+
+        session.commit();
+        session.close();
+    }
+
+    /**
+     * 
+     * 
+     * @throws JMSException
+     */
+    public void testTopicRecoverMarksMessageRedelivered() throws JMSException {
+
+        connection.setClientID(getName());
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Topic topic = session.createTopic("topic-" + getName());
+        MessageConsumer consumer = session.createConsumer(topic);
+
+        MessageProducer producer = createProducer(session, topic);
+        producer.send(createTextMessage(session));
+
+        // Consume the message...
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
+        // Don't ack the message.
+
+        // Reset the session. This should cause the Unacked message to be
+        // redelivered.
+        session.recover();
+
+        // Attempt to Consume the message...
+        msg = consumer.receive(2000);
+        assertNotNull(msg);
+        assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
+        msg.acknowledge();
+
+        session.close();
+    }
+
+    /**
+     * Tests rollback message to be marked as redelivered. Session uses client
+     * acknowledgement and the destination is a topic.
+     * 
+     * @throws JMSException
+     */
+    public void testTopicRollbackMarksMessageRedelivered() throws JMSException {
+        connection.setClientID(getName());
+        connection.start();
+
+        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        Topic topic = session.createTopic("topic-" + getName());
+        MessageConsumer consumer = session.createConsumer(topic);
+
+        MessageProducer producer = createProducer(session, topic);
+        producer.send(createTextMessage(session));
+        session.commit();
+
+        // Get the message... Should not be redelivered.
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
+
+        // Rollback.. should cause redelivery.
+        session.rollback();
+
+        // Attempt to Consume the message...
+        msg = consumer.receive(2000);
+        assertNotNull(msg);
+        assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
+
+        session.commit();
+        session.close();
+    }
+
+    public void testNoReceiveConsumerDisconnectDoesNotIncrementRedelivery() throws Exception {
+        connection.setClientID(getName());
+        connection.start();
+
+        Connection keepBrokerAliveConnection = createConnection();
+        keepBrokerAliveConnection.start();
+
+        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        final MessageConsumer consumer = session.createConsumer(queue);
+
+        MessageProducer producer = createProducer(session, queue);
+        producer.send(createTextMessage(session));
+        session.commit();
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return ((ActiveMQMessageConsumer)consumer).getMessageSize() == 1;
+            }
+        });
+
+        // whack the connection - like a rebalance or tcp drop
+        ((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class).stop();
+
+        session = keepBrokerAliveConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer messageConsumer = session.createConsumer(queue);
+        Message msg = messageConsumer.receive(1000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
+        session.close();
+        keepBrokerAliveConnection.close();
+    }
+
+    public void testNoReceiveConsumerDoesNotIncrementRedelivery() throws Exception {
+        connection.setClientID(getName());
+        connection.start();
+
+        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        MessageProducer producer = createProducer(session, queue);
+        producer.send(createTextMessage(session));
+        session.commit();
+
+        TimeUnit.SECONDS.sleep(1);
+        consumer.close();
+
+        consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+
+        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
+        session.close();
+    }
+
+    public void testNoReceiveDurableConsumerDoesNotIncrementRedelivery() throws Exception {
+        connection.setClientID(getName());
+        connection.start();
+
+        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        Topic topic = session.createTopic("topic-" + getName());
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "sub");
+
+        MessageProducer producer = createProducer(session, topic);
+        producer.send(createTextMessage(session));
+        session.commit();
+
+        TimeUnit.SECONDS.sleep(1);
+        consumer.close();
+
+        consumer = session.createDurableSubscriber(topic, "sub");
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+
+        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
+        session.close();
+    }
+
+    /**
+     * Creates a text message.
+     * 
+     * @param session
+     * @return TextMessage.
+     * @throws JMSException
+     */
+    private TextMessage createTextMessage(Session session) throws JMSException {
+        return createTextMessage(session, "Hello");
+    }
+
+    private TextMessage createTextMessage(Session session, String txt) throws JMSException {
+        return session.createTextMessage(txt);
+    }
+    
+    /**
+     * Creates a producer.
+     * 
+     * @param session
+     * @param queue - destination.
+     * @return MessageProducer
+     * @throws JMSException
+     */
+    private MessageProducer createProducer(Session session, Destination queue) throws JMSException {
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(getDeliveryMode());
+        return producer;
+    }
+
+    /**
+     * Returns delivery mode.
+     * 
+     * @return int - persistent delivery mode.
+     */
+    protected int getDeliveryMode() {
+        return DeliveryMode.PERSISTENT;
+    }
+
+    /**
+     * Run the JmsRedeliverTest with the delivery mode set as persistent.
+     */
+    public static final class PersistentCase extends JmsRedeliveredTest {
+
+        /**
+         * Returns delivery mode.
+         * 
+         * @return int - persistent delivery mode.
+         */
+        protected int getDeliveryMode() {
+            return DeliveryMode.PERSISTENT;
+        }
+    }
+
+    /**
+     * Run the JmsRedeliverTest with the delivery mode set as non-persistent.
+     */
+    public static final class TransientCase extends JmsRedeliveredTest {
+
+        /**
+         * Returns delivery mode.
+         * 
+         * @return int - non-persistent delivery mode.
+         */
+        protected int getDeliveryMode() {
+            return DeliveryMode.NON_PERSISTENT;
+        }
+    }
+
+    public static Test suite() {
+        TestSuite suite = new TestSuite();
+        suite.addTestSuite(PersistentCase.class);
+        suite.addTestSuite(TransientCase.class);
+        return suite;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
new file mode 100644
index 0000000..91e29c0
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+public class JmsRollbackRedeliveryTest {
+    @Rule
+    public TestName testName = new TestName();
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JmsRollbackRedeliveryTest.class);
+    final int nbMessages = 10;
+    final String destinationName = "Destination";
+    final String brokerUrl = "vm://localhost?create=false";
+    boolean consumerClose = true;
+    boolean rollback = true;
+    BrokerService broker;
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.debug("Starting " + testName.getMethodName());
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+        LOG.debug("Finishing " + testName.getMethodName());
+        Thread.sleep(100);
+    }
+    
+    @Test
+    public void testRedelivery() throws Exception {
+        doTestRedelivery(brokerUrl, false);
+    }
+
+    @Test
+    public void testRedeliveryWithInterleavedProducer() throws Exception {
+        doTestRedelivery(brokerUrl, true);
+    }
+
+
+    @Test
+    public void testRedeliveryWithPrefetch0() throws Exception {
+        doTestRedelivery(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=0", true);
+    }
+
+    @Test
+    public void testRedeliveryWithPrefetch1() throws Exception {
+        doTestRedelivery(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=1", true);
+    }
+    
+    public void doTestRedelivery(String brokerUrl, boolean interleaveProducer) throws Exception {
+        LOG.debug("entering doTestRedelivery interleaveProducer is " + interleaveProducer);
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
+        
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        if (interleaveProducer) {
+            populateDestinationWithInterleavedProducer(nbMessages, destinationName, connection);
+        } else {
+            populateDestination(nbMessages, destinationName, connection);
+        }
+        
+        // Consume messages and rollback transactions
+        {
+            AtomicInteger received = new AtomicInteger();
+            Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>();
+            while (received.get() < nbMessages) {
+                Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+                Destination destination = session.createQueue(destinationName);
+                MessageConsumer consumer = session.createConsumer(destination);
+                TextMessage msg = (TextMessage) consumer.receive(6000000);
+                if (msg != null) {
+                    if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) {
+                        LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
+                        assertTrue(msg.getJMSRedelivered());
+                        assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+                        session.commit();
+                    } else {
+                        LOG.info("Rollback message " + msg.getText() + " id: " +  msg.getJMSMessageID());
+                        assertFalse("should not have redelivery flag set, id: " + msg.getJMSMessageID(), msg.getJMSRedelivered());
+                        session.rollback();
+                    }
+                }
+                consumer.close();
+                session.close();
+            }
+        }
+    }
+
+    @Test
+    public void testRedeliveryOnSingleConsumer() throws Exception {
+
+        ConnectionFactory connectionFactory = 
+            new ActiveMQConnectionFactory(brokerUrl);
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        populateDestinationWithInterleavedProducer(nbMessages, destinationName, connection);
+
+        // Consume messages and rollback transactions
+        {
+            AtomicInteger received = new AtomicInteger();
+            Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>();
+            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createQueue(destinationName);
+            MessageConsumer consumer = session.createConsumer(destination);            
+            while (received.get() < nbMessages) {
+                TextMessage msg = (TextMessage) consumer.receive(6000000);
+                if (msg != null) {
+                    if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) {
+                        LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
+                        assertTrue(msg.getJMSRedelivered());
+                        session.commit();
+                    } else {
+                        LOG.info("Rollback message " + msg.getText() + " id: " +  msg.getJMSMessageID());
+                        session.rollback();
+                    }
+                }
+            }
+            consumer.close();
+            session.close();
+        }
+    }
+
+
+    @Test
+    public void testRedeliveryOnSingleSession() throws Exception {
+
+        ConnectionFactory connectionFactory = 
+            new ActiveMQConnectionFactory(brokerUrl); 
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        populateDestination(nbMessages, destinationName, connection);
+
+        // Consume messages and rollback transactions
+        {
+            AtomicInteger received = new AtomicInteger();
+            Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>();
+            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createQueue(destinationName);
+            while (received.get() < nbMessages) {
+                MessageConsumer consumer = session.createConsumer(destination);            
+                TextMessage msg = (TextMessage) consumer.receive(6000000);
+                if (msg != null) {
+                    if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) {
+                        LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
+                        assertTrue(msg.getJMSRedelivered());
+                        session.commit();
+                    } else {
+                        LOG.info("Rollback message " + msg.getText() + " id: " +  msg.getJMSMessageID());
+                        session.rollback();
+                    }
+                }
+                consumer.close();
+            }
+            session.close();
+        }
+    }
+    
+    // AMQ-1593
+    @Test
+    public void testValidateRedeliveryCountOnRollback() throws Exception {
+
+        final int numMessages = 1;
+       ConnectionFactory connectionFactory = 
+            new ActiveMQConnectionFactory(brokerUrl);
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        populateDestination(numMessages, destinationName, connection);
+
+        {
+            AtomicInteger received = new AtomicInteger();
+            final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries();
+            while (received.get() < maxRetries) {
+                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+                Destination destination = session.createQueue(destinationName);
+
+                MessageConsumer consumer = session.createConsumer(destination);            
+                TextMessage msg = (TextMessage) consumer.receive(1000);
+                if (msg != null) {
+                    LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
+                    assertEquals("redelivery property matches deliveries", received.get(), msg.getLongProperty("JMSXDeliveryCount"));
+                    session.rollback();
+                }
+                session.close();
+            }
+            consumeMessage(connection, maxRetries + 1);
+        }
+    }
+    
+    // AMQ-1593
+    @Test
+    public void testValidateRedeliveryCountOnRollbackWithPrefetch0() throws Exception {
+
+       final int numMessages = 1;
+       ConnectionFactory connectionFactory = 
+            new ActiveMQConnectionFactory(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=0");
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        populateDestination(numMessages, destinationName, connection);
+
+        {
+            AtomicInteger received = new AtomicInteger();
+            final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries();
+            while (received.get() < maxRetries) {
+                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+                Destination destination = session.createQueue(destinationName);
+
+                MessageConsumer consumer = session.createConsumer(destination);            
+                TextMessage msg = (TextMessage) consumer.receive(1000);
+                if (msg != null) {
+                    LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
+                    assertEquals("redelivery property matches deliveries", received.get(), msg.getLongProperty("JMSXDeliveryCount"));
+                    session.rollback();
+                }
+                session.close();
+            }
+            
+            consumeMessage(connection, maxRetries + 1);
+        }
+    }
+
+
+    private void consumeMessage(Connection connection, final int deliveryCount)
+            throws JMSException {
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Destination destination = session.createQueue(destinationName);
+        MessageConsumer consumer = session.createConsumer(destination);            
+        TextMessage msg = (TextMessage) consumer.receive(1000);
+        assertNotNull(msg);
+        assertEquals("redelivery property matches deliveries", deliveryCount, msg.getLongProperty("JMSXDeliveryCount"));
+        session.commit();
+        session.close();
+    }
+
+    @Test
+    public void testRedeliveryPropertyWithNoRollback() throws Exception {
+        final int numMessages = 1;
+        ConnectionFactory connectionFactory = 
+            new ActiveMQConnectionFactory(brokerUrl);
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        populateDestination(numMessages, destinationName, connection);
+        connection.close();
+        
+        {
+            AtomicInteger received = new AtomicInteger();
+            final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries();
+            while (received.get() < maxRetries) {
+                connection = connectionFactory.createConnection();
+                connection.start();
+                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+                Destination destination = session.createQueue(destinationName);
+
+                MessageConsumer consumer = session.createConsumer(destination);            
+                TextMessage msg = (TextMessage) consumer.receive(2000);
+                if (msg != null) {
+                    LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
+                    assertEquals("redelivery property matches deliveries", received.get(), msg.getLongProperty("JMSXDeliveryCount"));
+                }
+                session.close();
+                connection.close();
+            }
+            connection = connectionFactory.createConnection();
+            connection.start();
+            consumeMessage(connection, maxRetries + 1);
+        }
+    }
+    
+    private void populateDestination(final int nbMessages,
+            final String destinationName, Connection connection)
+            throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(destinationName);
+        MessageProducer producer = session.createProducer(destination);
+        for (int i = 1; i <= nbMessages; i++) {
+            producer.send(session.createTextMessage("<hello id='" + i + "'/>"));
+        }
+        producer.close();
+        session.close();
+    }
+
+    
+    private void populateDestinationWithInterleavedProducer(final int nbMessages,
+            final String destinationName, Connection connection)
+            throws JMSException {
+        Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination1 = session1.createQueue(destinationName);
+        MessageProducer producer1 = session1.createProducer(destination1);
+        Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination2 = session2.createQueue(destinationName);
+        MessageProducer producer2 = session2.createProducer(destination2);
+        
+        for (int i = 1; i <= nbMessages; i++) {
+            if (i%2 == 0) {
+                producer1.send(session1.createTextMessage("<hello id='" + i + "'/>"));
+            } else {
+                producer2.send(session2.createTextMessage("<hello id='" + i + "'/>"));
+            }
+        }
+        producer1.close();
+        session1.close();
+        producer2.close();
+        session2.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java
new file mode 100644
index 0000000..d852f54
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class JmsSendReceiveTestSupport extends TestSupport implements MessageListener {
+    private static final Logger LOG = LoggerFactory.getLogger(JmsSendReceiveTestSupport.class);
+
+    protected int messageCount = 100;
+    protected String[] data;
+    protected Session session;
+    protected MessageConsumer consumer;
+    protected MessageProducer producer;
+    protected Destination consumerDestination;
+    protected Destination producerDestination;
+    protected List<Message> messages = createConcurrentList();
+    protected boolean topic = true;
+    protected boolean durable;
+    protected int deliveryMode = DeliveryMode.PERSISTENT;
+    protected final Object lock = new Object();
+    protected boolean verbose;
+
+    /*
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        super.setUp();
+        String temp = System.getProperty("messageCount");
+
+        if (temp != null) {
+            int i = Integer.parseInt(temp);
+            if (i > 0) {
+                messageCount = i;
+            }
+        }
+
+        LOG.info("Message count for test case is: " + messageCount);
+        data = new String[messageCount];
+
+        for (int i = 0; i < messageCount; i++) {
+            data[i] = "Text for message: " + i + " at " + new Date();
+        }
+    }
+
+    /**
+     * Sends and consumes the messages.
+     * 
+     * @throws Exception
+     */
+    public void testSendReceive() throws Exception {
+        messages.clear();
+        for (int i = 0; i < data.length; i++) {
+            Message message = session.createTextMessage(data[i]);
+            message.setStringProperty("stringProperty", data[i]);
+            message.setIntProperty("intProperty", i);
+
+            if (verbose) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("About to send a message: " + message + " with text: " + data[i]);
+                }
+            }
+
+            sendToProducer(producer, producerDestination, message);
+            messageSent();
+        }
+
+        assertMessagesAreReceived();
+        LOG.info("" + data.length + " messages(s) received, closing down connections");
+    }
+
+    /**
+     * Sends a message to a destination using the supplied producer
+     * @param producer
+     * @param producerDestination
+     * @param message
+     * @throws JMSException
+     */
+    protected void sendToProducer(MessageProducer producer,
+            Destination producerDestination, Message message) throws JMSException {
+        producer.send(producerDestination, message);   
+    }
+
+    /**
+     * Asserts messages are received.
+     * 
+     * @throws JMSException
+     */
+    protected void assertMessagesAreReceived() throws JMSException {
+        waitForMessagesToBeDelivered();
+        assertMessagesReceivedAreValid(messages);
+    }
+
+    /**
+     * Tests if the messages received are valid.
+     * 
+     * @param receivedMessages - list of received messages.
+     * @throws JMSException
+     */
+    protected void assertMessagesReceivedAreValid(List<Message> receivedMessages) throws JMSException {
+        List<Object> copyOfMessages = Arrays.asList(receivedMessages.toArray());
+        int counter = 0;
+
+        if (data.length != copyOfMessages.size()) {
+            for (Iterator<Object> iter = copyOfMessages.iterator(); iter.hasNext();) {
+                TextMessage message = (TextMessage)iter.next();
+                if (LOG.isInfoEnabled()) {
+                    LOG.info("<== " + counter++ + " = " + message.getText());
+                }
+            }
+        }
+
+        assertEquals("Not enough messages received", data.length, receivedMessages.size());
+
+        for (int i = 0; i < data.length; i++) {
+            TextMessage received = (TextMessage)receivedMessages.get(i);
+            String text = received.getText();
+            String stringProperty = received.getStringProperty("stringProperty");
+            int intProperty = received.getIntProperty("intProperty");
+
+            if (verbose) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.info("Received Text: " + text);
+                }
+            }
+
+            assertEquals("Message: " + i, data[i], text);
+            assertEquals(data[i], stringProperty);
+            assertEquals(i, intProperty);
+        }
+    }
+
+    /**
+     * Waits for messages to be delivered.
+     */
+    protected void waitForMessagesToBeDelivered() {
+        long maxWaitTime = 60000;
+        long waitTime = maxWaitTime;
+        long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
+
+        synchronized (lock) {
+            while (messages.size() < data.length && waitTime >= 0) {
+                try {
+                    lock.wait(200);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+
+                waitTime = maxWaitTime - (System.currentTimeMillis() - start);
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
+     */
+    public synchronized void onMessage(Message message) {
+        consumeMessage(message, messages);
+    }
+
+    /**
+     * Consumes messages.
+     * 
+     * @param message - message to be consumed.
+     * @param messageList -list of consumed messages.
+     */
+    protected void consumeMessage(Message message, List<Message> messageList) {
+        if (verbose) {
+            if (LOG.isDebugEnabled()) {
+                LOG.info("Received message: " + message);
+            }
+        }
+
+        messageList.add(message);
+
+        if (messageList.size() >= data.length) {
+            synchronized (lock) {
+                lock.notifyAll();
+            }
+        }
+    }
+
+    /**
+     * Returns the ArrayList as a synchronized list.
+     * 
+     * @return List
+     */
+    protected List<Message> createConcurrentList() {
+        return Collections.synchronizedList(new ArrayList<Message>());
+    }
+
+    /**
+     * Just a hook so can insert failure tests
+     * 
+     * @throws Exception
+     */
+    protected void messageSent() throws Exception {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
new file mode 100644
index 0000000..391253e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.Date;
+import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class JmsSendReceiveWithMessageExpirationTest extends TestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsSendReceiveWithMessageExpirationTest.class);
+
+    protected int messageCount = 100;
+    protected String[] data;
+    protected Session session;
+    protected Destination consumerDestination;
+    protected Destination producerDestination;
+    protected boolean durable;
+    protected int deliveryMode = DeliveryMode.PERSISTENT;
+    protected long timeToLive = 5000;
+    protected boolean verbose;
+
+    protected Connection connection;
+
+    protected void setUp() throws Exception {
+
+        super.setUp();
+
+        data = new String[messageCount];
+
+        for (int i = 0; i < messageCount; i++) {
+            data[i] = "Text for message: " + i + " at " + new Date();
+        }
+
+        connectionFactory = createConnectionFactory();
+        connection = createConnection();
+
+        if (durable) {
+            connection.setClientID(getClass().getName());
+        }
+
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    /**
+     * Test consuming an expired queue.
+     * 
+     * @throws Exception
+     */
+    public void testConsumeExpiredQueue() throws Exception {
+
+        MessageProducer producer = createProducer(timeToLive);
+
+        consumerDestination = session.createQueue(getConsumerSubject());
+        producerDestination = session.createQueue(getProducerSubject());
+
+        MessageConsumer consumer = createConsumer();
+        connection.start();
+
+        for (int i = 0; i < data.length; i++) {
+            Message message = session.createTextMessage(data[i]);
+            message.setStringProperty("stringProperty", data[i]);
+            message.setIntProperty("intProperty", i);
+
+            if (verbose) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("About to send a queue message: " + message + " with text: " + data[i]);
+                }
+            }
+
+            producer.send(producerDestination, message);
+        }
+
+        // sleeps a second longer than the expiration time.
+        // Basically waits till queue expires.
+        Thread.sleep(timeToLive + 1000);
+
+        // message should have expired.
+        assertNull(consumer.receive(1000));
+    }
+
+     public void testConsumeExpiredQueueAndDlq() throws Exception {
+
+         MessageProducer producerNormal = createProducer(0);
+         MessageProducer producerExpire = createProducer(500);
+
+         consumerDestination = session.createQueue("ActiveMQ.DLQ");
+         MessageConsumer dlqConsumer = createConsumer();
+
+         consumerDestination = session.createQueue(getConsumerSubject());
+         producerDestination = session.createQueue(getProducerSubject());
+
+
+         Connection consumerConnection = createConnection();
+         ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+         prefetchPolicy.setAll(10);
+         ((ActiveMQConnection)consumerConnection).setPrefetchPolicy(prefetchPolicy);
+         Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
+         consumerConnection.start();
+         connection.start();
+
+         String msgBody = new String(new byte[20*1024]);
+         for (int i = 0; i < data.length; i++) {
+             Message message = session.createTextMessage(msgBody);
+             producerExpire.send(producerDestination, message);
+         }
+
+         for (int i = 0; i < data.length; i++) {
+             Message message = session.createTextMessage(msgBody);
+             producerNormal.send(producerDestination, message);
+         }
+
+         Vector<Message> messages = new Vector<Message>();
+         Message received;
+         while ((received = consumer.receive(1000)) != null) {
+             messages.add(received);
+             if (messages.size() == 1) {
+                TimeUnit.SECONDS.sleep(1);
+             }
+             received.acknowledge();
+         };
+
+         assertEquals("got all (normal plus one with ttl) messages", messageCount + 1, messages.size());
+
+         Vector<Message> dlqMessages = new Vector<Message>();
+         while ((received = dlqConsumer.receive(1000)) != null) {
+             dlqMessages.add(received);
+         };
+
+         assertEquals("got dlq messages", data.length - 1, dlqMessages.size());
+
+         final DestinationStatistics view = getDestinationStatistics(BrokerRegistry.getInstance().findFirst(), ActiveMQDestination.transform(consumerDestination));
+
+         // wait for all to inflight to expire
+         assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() {
+             @Override
+             public boolean isSatisified() throws Exception {
+                 return view.getInflight().getCount() == 0;
+             }
+         }));
+         assertEquals("Wrong inFlightCount: ", 0, view.getInflight().getCount());
+
+         LOG.info("Stats: received: "  + messages.size() + ", messages: " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount()
+                 + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expired: " + view.getExpired().getCount());
+
+    }
+    
+    /**
+     * Sends and consumes the messages to a queue destination.
+     * 
+     * @throws Exception
+     */
+    public void testConsumeQueue() throws Exception {
+
+        MessageProducer producer = createProducer(0);
+
+        consumerDestination = session.createQueue(getConsumerSubject());
+        producerDestination = session.createQueue(getProducerSubject());
+
+        MessageConsumer consumer = createConsumer();
+        connection.start();
+
+        for (int i = 0; i < data.length; i++) {
+            Message message = session.createTextMessage(data[i]);
+            message.setStringProperty("stringProperty", data[i]);
+            message.setIntProperty("intProperty", i);
+
+            if (verbose) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("About to send a queue message: " + message + " with text: " + data[i]);
+                }
+            }
+
+            producer.send(producerDestination, message);
+        }
+
+        // should receive a queue since there is no expiration.
+        assertNotNull(consumer.receive(1000));
+    }
+
+    /**
+     * Test consuming an expired topic.
+     * 
+     * @throws Exception
+     */
+    public void testConsumeExpiredTopic() throws Exception {
+
+        MessageProducer producer = createProducer(timeToLive);
+
+        consumerDestination = session.createTopic(getConsumerSubject());
+        producerDestination = session.createTopic(getProducerSubject());
+
+        MessageConsumer consumer = createConsumer();
+        connection.start();
+
+        for (int i = 0; i < data.length; i++) {
+            Message message = session.createTextMessage(data[i]);
+            message.setStringProperty("stringProperty", data[i]);
+            message.setIntProperty("intProperty", i);
+
+            if (verbose) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("About to send a topic message: " + message + " with text: " + data[i]);
+                }
+            }
+
+            producer.send(producerDestination, message);
+        }
+
+        // sleeps a second longer than the expiration time.
+        // Basically waits till topic expires.
+        Thread.sleep(timeToLive + 1000);
+
+        // message should have expired.
+        assertNull(consumer.receive(1000));
+    }
+
+    /**
+     * Sends and consumes the messages to a topic destination.
+     * 
+     * @throws Exception
+     */
+    public void testConsumeTopic() throws Exception {
+
+        MessageProducer producer = createProducer(0);
+
+        consumerDestination = session.createTopic(getConsumerSubject());
+        producerDestination = session.createTopic(getProducerSubject());
+
+        MessageConsumer consumer = createConsumer();
+        connection.start();
+
+        for (int i = 0; i < data.length; i++) {
+            Message message = session.createTextMessage(data[i]);
+            message.setStringProperty("stringProperty", data[i]);
+            message.setIntProperty("intProperty", i);
+
+            if (verbose) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("About to send a topic message: " + message + " with text: " + data[i]);
+                }
+            }
+
+            producer.send(producerDestination, message);
+        }
+
+        // should receive a topic since there is no expiration.
+        assertNotNull(consumer.receive(1000));
+    }
+
+    protected MessageProducer createProducer(long timeToLive) throws JMSException {
+        MessageProducer producer = session.createProducer(null);
+        producer.setDeliveryMode(deliveryMode);
+        producer.setTimeToLive(timeToLive);
+
+        return producer;
+    }
+
+    protected MessageConsumer createConsumer() throws JMSException {
+        if (durable) {
+            LOG.info("Creating durable consumer");
+            return session.createDurableSubscriber((Topic)consumerDestination, getName());
+        }
+        return session.createConsumer(consumerDestination);
+    }
+
+    protected void tearDown() throws Exception {
+        LOG.info("Dumping stats...");
+        LOG.info("Closing down connection");
+
+        session.close();
+        connection.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java
new file mode 100644
index 0000000..cf14453
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class JmsSendWithAsyncCallbackTest extends TestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsSendWithAsyncCallbackTest.class);
+
+    private Connection connection;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    @Override
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    public void testAsyncCallbackIsFaster() throws JMSException, InterruptedException {
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getName());
+
+        // setup a consumer to drain messages..
+        MessageConsumer consumer = session.createConsumer(queue);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+            }
+        });
+
+        // warmup...
+        for (int i = 0; i < 10; i++) {
+            benchmarkNonCallbackRate();
+            benchmarkCallbackRate();
+        }
+
+        double callbackRate = benchmarkCallbackRate();
+        double nonCallbackRate = benchmarkNonCallbackRate();
+
+        LOG.info(String.format("AsyncCallback Send rate: %,.2f m/s", callbackRate));
+        LOG.info(String.format("NonAsyncCallback Send rate: %,.2f m/s", nonCallbackRate));
+
+        // The async style HAS to be faster than the non-async style..
+        assertTrue("async rate[" + callbackRate + "] should beat non-async rate[" + nonCallbackRate + "]", callbackRate / nonCallbackRate > 1.5);
+    }
+
+    private double benchmarkNonCallbackRate() throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getName());
+        int count = 1000;
+        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < count; i++) {
+            producer.send(session.createTextMessage("Hello"));
+        }
+        return 1000.0 * count / (System.currentTimeMillis() - start);
+    }
+
+    private double benchmarkCallbackRate() throws JMSException, InterruptedException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getName());
+        int count = 1000;
+        final CountDownLatch messagesSent = new CountDownLatch(count);
+        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < count; i++) {
+            producer.send(session.createTextMessage("Hello"), new AsyncCallback() {
+                @Override
+                public void onSuccess() {
+                    messagesSent.countDown();
+                }
+
+                @Override
+                public void onException(JMSException exception) {
+                    exception.printStackTrace();
+                }
+            });
+        }
+        messagesSent.await();
+        return 1000.0 * count / (System.currentTimeMillis() - start);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSessionRecoverTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSessionRecoverTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSessionRecoverTest.java
new file mode 100644
index 0000000..a9e1b24
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSessionRecoverTest.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+
+/**
+ * Testcases to see if Session.recover() work.
+ * 
+ * 
+ */
+public class JmsSessionRecoverTest extends TestCase {
+
+    private Connection connection;
+    private ActiveMQConnectionFactory factory;
+    private Destination dest;
+
+    /**
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        connection = factory.createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+    }
+
+    /**
+     * 
+     * @throws JMSException
+     * @throws InterruptedException
+     */
+    public void testQueueSynchRecover() throws JMSException, InterruptedException {
+        dest = new ActiveMQQueue("Queue-" + System.currentTimeMillis());
+        doTestSynchRecover();
+    }
+
+    /**
+     * 
+     * @throws JMSException
+     * @throws InterruptedException
+     */
+    public void testQueueAsynchRecover() throws JMSException, InterruptedException {
+        dest = new ActiveMQQueue("Queue-" + System.currentTimeMillis());
+        doTestAsynchRecover();
+    }
+
+    /**
+     * 
+     * @throws JMSException
+     * @throws InterruptedException
+     */
+    public void testTopicSynchRecover() throws JMSException, InterruptedException {
+        dest = new ActiveMQTopic("Topic-" + System.currentTimeMillis());
+        doTestSynchRecover();
+    }
+
+    /**
+     * 
+     * @throws JMSException
+     * @throws InterruptedException
+     */
+    public void testTopicAsynchRecover() throws JMSException, InterruptedException {
+        dest = new ActiveMQTopic("Topic-" + System.currentTimeMillis());
+        doTestAsynchRecover();
+    }
+
+    /**
+     * 
+     * @throws JMSException
+     * @throws InterruptedException
+     */
+    public void testQueueAsynchRecoverWithAutoAck() throws JMSException, InterruptedException {
+        dest = new ActiveMQQueue("Queue-" + System.currentTimeMillis());
+        doTestAsynchRecoverWithAutoAck();
+    }
+
+    /**
+     * 
+     * @throws JMSException
+     * @throws InterruptedException
+     */
+    public void testTopicAsynchRecoverWithAutoAck() throws JMSException, InterruptedException {
+        dest = new ActiveMQTopic("Topic-" + System.currentTimeMillis());
+        doTestAsynchRecoverWithAutoAck();
+    }
+
+    /**
+     * Test to make sure that a Sync recover works.
+     * 
+     * @throws JMSException
+     */
+    public void doTestSynchRecover() throws JMSException {
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(dest);
+        connection.start();
+
+        MessageProducer producer = session.createProducer(dest);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        producer.send(session.createTextMessage("First"));
+        producer.send(session.createTextMessage("Second"));
+
+        TextMessage message = (TextMessage)consumer.receive(1000);
+        assertEquals("First", message.getText());
+        assertFalse(message.getJMSRedelivered());
+        message.acknowledge();
+
+        message = (TextMessage)consumer.receive(1000);
+        assertEquals("Second", message.getText());
+        assertFalse(message.getJMSRedelivered());
+
+        session.recover();
+
+        message = (TextMessage)consumer.receive(2000);
+        assertEquals("Second", message.getText());
+        assertTrue(message.getJMSRedelivered());
+
+        message.acknowledge();
+    }
+
+    /**
+     * Test to make sure that a Async recover works.
+     * 
+     * @throws JMSException
+     * @throws InterruptedException
+     */
+    public void doTestAsynchRecover() throws JMSException, InterruptedException {
+
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final String errorMessage[] = new String[] {null};
+        final CountDownLatch doneCountDownLatch = new CountDownLatch(1);
+
+        MessageConsumer consumer = session.createConsumer(dest);
+
+        MessageProducer producer = session.createProducer(dest);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        producer.send(session.createTextMessage("First"));
+        producer.send(session.createTextMessage("Second"));
+
+        consumer.setMessageListener(new MessageListener() {
+            int counter;
+
+            public void onMessage(Message msg) {
+                counter++;
+                try {
+                    TextMessage message = (TextMessage)msg;
+                    switch (counter) {
+                    case 1:
+                        assertEquals("First", message.getText());
+                        assertFalse(message.getJMSRedelivered());
+                        message.acknowledge();
+
+                        break;
+                    case 2:
+                        assertEquals("Second", message.getText());
+                        assertFalse(message.getJMSRedelivered());
+                        session.recover();
+                        break;
+
+                    case 3:
+                        assertEquals("Second", message.getText());
+                        assertTrue(message.getJMSRedelivered());
+                        message.acknowledge();
+                        doneCountDownLatch.countDown();
+                        break;
+
+                    default:
+                        errorMessage[0] = "Got too many messages: " + counter;
+                        doneCountDownLatch.countDown();
+                    }
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                    errorMessage[0] = "Got exception: " + e;
+                    doneCountDownLatch.countDown();
+                }
+            }
+        });
+        connection.start();
+
+        if (doneCountDownLatch.await(5, TimeUnit.SECONDS)) {
+            if (errorMessage[0] != null) {
+                fail(errorMessage[0]);
+            }
+        } else {
+            fail("Timeout waiting for async message delivery to complete.");
+        }
+
+    }
+
+    /**
+     * Test to make sure that a Async recover works when using AUTO_ACKNOWLEDGE.
+     * 
+     * @throws JMSException
+     * @throws InterruptedException
+     */
+    public void doTestAsynchRecoverWithAutoAck() throws JMSException, InterruptedException {
+
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final String errorMessage[] = new String[] {null};
+        final CountDownLatch doneCountDownLatch = new CountDownLatch(1);
+
+        MessageConsumer consumer = session.createConsumer(dest);
+
+        MessageProducer producer = session.createProducer(dest);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        producer.send(session.createTextMessage("First"));
+        producer.send(session.createTextMessage("Second"));
+
+        consumer.setMessageListener(new MessageListener() {
+            int counter;
+
+            public void onMessage(Message msg) {
+                counter++;
+                try {
+                    TextMessage message = (TextMessage)msg;
+                    switch (counter) {
+                    case 1:
+                        assertEquals("First", message.getText());
+                        assertFalse(message.getJMSRedelivered());
+                        break;
+                    case 2:
+                        // This should rollback the delivery of this message..
+                        // and re-deliver.
+                        assertEquals("Second", message.getText());
+                        assertFalse(message.getJMSRedelivered());
+                        session.recover();
+                        break;
+
+                    case 3:
+                        assertEquals("Second", message.getText());
+                        assertTrue(message.getJMSRedelivered());
+                        doneCountDownLatch.countDown();
+                        break;
+
+                    default:
+                        errorMessage[0] = "Got too many messages: " + counter;
+                        doneCountDownLatch.countDown();
+                    }
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                    errorMessage[0] = "Got exception: " + e;
+                    doneCountDownLatch.countDown();
+                }
+            }
+        });
+        connection.start();
+
+        if (doneCountDownLatch.await(5000, TimeUnit.SECONDS)) {
+            if (errorMessage[0] != null) {
+                fail(errorMessage[0]);
+            }
+        } else {
+            fail("Timeout waiting for async message delivery to complete.");
+        }
+    }
+}


Mime
View raw message