activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [46/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:37:17 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSMessageTest.java
new file mode 100644
index 0000000..5cc279c
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSMessageTest.java
@@ -0,0 +1,547 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.net.URISyntaxException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Vector;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import junit.framework.Test;
+
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * Test cases used to test the JMS message consumer.
+ *
+ *
+ */
+public class JMSMessageTest extends JmsTestSupport {
+
+    public ActiveMQDestination destination;
+    public int deliveryMode = DeliveryMode.NON_PERSISTENT;
+    public int prefetch;
+    public int ackMode;
+    public byte destinationType = ActiveMQDestination.QUEUE_TYPE;
+    public boolean durableConsumer;
+    public String connectURL = "vm://localhost?marshal=false";
+
+    /**
+     * Run all these tests in both marshaling and non-marshaling mode.
+     */
+    public void initCombos() {
+        addCombinationValues("connectURL", new Object[] {"vm://localhost?marshal=false",
+                                                         "vm://localhost?marshal=true"});
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
+    }
+
+    public void testTextMessage() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the message.
+        {
+            TextMessage message = session.createTextMessage();
+            message.setText("Hi");
+            producer.send(message);
+        }
+
+        // Check the Message
+        {
+            TextMessage message = (TextMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals("Hi", message.getText());
+        }
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public static Test suite() {
+        return suite(JMSMessageTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws URISyntaxException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectURL);
+        return factory;
+    }
+
+    public void testBytesMessageLength() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the message
+        {
+            BytesMessage message = session.createBytesMessage();
+            message.writeInt(1);
+            message.writeInt(2);
+            message.writeInt(3);
+            message.writeInt(4);
+            producer.send(message);
+        }
+
+        // Check the message.
+        {
+            BytesMessage message = (BytesMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals(16, message.getBodyLength());
+        }
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void testObjectMessage() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // send the message.
+        {
+            ObjectMessage message = session.createObjectMessage();
+            message.setObject("Hi");
+            producer.send(message);
+        }
+
+        // Check the message
+        {
+            ObjectMessage message = (ObjectMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals("Hi", message.getObject());
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void testBytesMessage() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the message
+        {
+            BytesMessage message = session.createBytesMessage();
+            message.writeBoolean(true);
+            producer.send(message);
+        }
+
+        // Check the message
+        {
+            BytesMessage message = (BytesMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertTrue(message.readBoolean());
+
+            try {
+                message.readByte();
+                fail("Expected exception not thrown.");
+            } catch (MessageEOFException e) {
+            }
+
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void testStreamMessage() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the message.
+        {
+            StreamMessage message = session.createStreamMessage();
+            message.writeString("This is a test to see how it works.");
+            producer.send(message);
+        }
+
+        // Check the message.
+        {
+            StreamMessage message = (StreamMessage)consumer.receive(1000);
+            assertNotNull(message);
+
+            // Invalid conversion should throw exception and not move the stream
+            // position.
+            try {
+                message.readByte();
+                fail("Should have received NumberFormatException");
+            } catch (NumberFormatException e) {
+            }
+
+            assertEquals("This is a test to see how it works.", message.readString());
+
+            // Invalid conversion should throw exception and not move the stream
+            // position.
+            try {
+                message.readByte();
+                fail("Should have received MessageEOFException");
+            } catch (MessageEOFException e) {
+            }
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void testMapMessage() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // send the message.
+        {
+            MapMessage message = session.createMapMessage();
+            message.setBoolean("boolKey", true);
+            producer.send(message);
+        }
+
+        // get the message.
+        {
+            MapMessage message = (MapMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertTrue(message.getBoolean("boolKey"));
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    static class ForeignMessage implements TextMessage {
+
+        public int deliveryMode;
+
+        private String messageId;
+        private long timestamp;
+        private String correlationId;
+        private Destination replyTo;
+        private Destination destination;
+        private boolean redelivered;
+        private String type;
+        private long expiration;
+        private int priority;
+        private String text;
+        private final HashMap<String, Object> props = new HashMap<String, Object>();
+
+        @Override
+        public String getJMSMessageID() throws JMSException {
+            return messageId;
+        }
+
+        @Override
+        public void setJMSMessageID(String arg0) throws JMSException {
+            messageId = arg0;
+        }
+
+        @Override
+        public long getJMSTimestamp() throws JMSException {
+            return timestamp;
+        }
+
+        @Override
+        public void setJMSTimestamp(long arg0) throws JMSException {
+            timestamp = arg0;
+        }
+
+        @Override
+        public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
+            return null;
+        }
+
+        @Override
+        public void setJMSCorrelationIDAsBytes(byte[] arg0) throws JMSException {
+        }
+
+        @Override
+        public void setJMSCorrelationID(String arg0) throws JMSException {
+            correlationId = arg0;
+        }
+
+        @Override
+        public String getJMSCorrelationID() throws JMSException {
+            return correlationId;
+        }
+
+        @Override
+        public Destination getJMSReplyTo() throws JMSException {
+            return replyTo;
+        }
+
+        @Override
+        public void setJMSReplyTo(Destination arg0) throws JMSException {
+            replyTo = arg0;
+        }
+
+        @Override
+        public Destination getJMSDestination() throws JMSException {
+            return destination;
+        }
+
+        @Override
+        public void setJMSDestination(Destination arg0) throws JMSException {
+            destination = arg0;
+        }
+
+        @Override
+        public int getJMSDeliveryMode() throws JMSException {
+            return deliveryMode;
+        }
+
+        @Override
+        public void setJMSDeliveryMode(int arg0) throws JMSException {
+            deliveryMode = arg0;
+        }
+
+        @Override
+        public boolean getJMSRedelivered() throws JMSException {
+            return redelivered;
+        }
+
+        @Override
+        public void setJMSRedelivered(boolean arg0) throws JMSException {
+            redelivered = arg0;
+        }
+
+        @Override
+        public String getJMSType() throws JMSException {
+            return type;
+        }
+
+        @Override
+        public void setJMSType(String arg0) throws JMSException {
+            type = arg0;
+        }
+
+        @Override
+        public long getJMSExpiration() throws JMSException {
+            return expiration;
+        }
+
+        @Override
+        public void setJMSExpiration(long arg0) throws JMSException {
+            expiration = arg0;
+        }
+
+        @Override
+        public int getJMSPriority() throws JMSException {
+            return priority;
+        }
+
+        @Override
+        public void setJMSPriority(int arg0) throws JMSException {
+            priority = arg0;
+        }
+
+        @Override
+        public void clearProperties() throws JMSException {
+        }
+
+        @Override
+        public boolean propertyExists(String arg0) throws JMSException {
+            return false;
+        }
+
+        @Override
+        public boolean getBooleanProperty(String arg0) throws JMSException {
+            return false;
+        }
+
+        @Override
+        public byte getByteProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        @Override
+        public short getShortProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        @Override
+        public int getIntProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        @Override
+        public long getLongProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        @Override
+        public float getFloatProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        @Override
+        public double getDoubleProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        @Override
+        public String getStringProperty(String arg0) throws JMSException {
+            return (String)props.get(arg0);
+        }
+
+        @Override
+        public Object getObjectProperty(String arg0) throws JMSException {
+            return props.get(arg0);
+        }
+
+        @Override
+        public Enumeration<?> getPropertyNames() throws JMSException {
+            return new Vector<String>(props.keySet()).elements();
+        }
+
+        @Override
+        public void setBooleanProperty(String arg0, boolean arg1) throws JMSException {
+        }
+
+        @Override
+        public void setByteProperty(String arg0, byte arg1) throws JMSException {
+        }
+
+        @Override
+        public void setShortProperty(String arg0, short arg1) throws JMSException {
+        }
+
+        @Override
+        public void setIntProperty(String arg0, int arg1) throws JMSException {
+        }
+
+        @Override
+        public void setLongProperty(String arg0, long arg1) throws JMSException {
+        }
+
+        @Override
+        public void setFloatProperty(String arg0, float arg1) throws JMSException {
+        }
+
+        @Override
+        public void setDoubleProperty(String arg0, double arg1) throws JMSException {
+        }
+
+        @Override
+        public void setStringProperty(String arg0, String arg1) throws JMSException {
+            props.put(arg0, arg1);
+        }
+
+        @Override
+        public void setObjectProperty(String arg0, Object arg1) throws JMSException {
+            props.put(arg0, arg1);
+        }
+
+        @Override
+        public void acknowledge() throws JMSException {
+        }
+
+        @Override
+        public void clearBody() throws JMSException {
+        }
+
+        @Override
+        public void setText(String arg0) throws JMSException {
+            text = arg0;
+        }
+
+        @Override
+        public String getText() throws JMSException {
+            return text;
+        }
+    }
+
+    public void testForeignMessage() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the message.
+        {
+            ForeignMessage message = new ForeignMessage();
+            message.text = "Hello";
+            message.setStringProperty("test", "value");
+            long timeToLive = 10000L;
+            long start = System.currentTimeMillis();
+            producer.send(message, Session.AUTO_ACKNOWLEDGE, 7, timeToLive);
+            long end = System.currentTimeMillis();
+
+
+            //validate jms spec 1.1 section 3.4.11 table 3.1
+            // JMSDestination, JMSDeliveryMode,  JMSExpiration, JMSPriority, JMSMessageID, and JMSTimestamp
+            //must be set by sending a message.
+
+            assertNotNull(message.getJMSDestination());
+            assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode());
+            assertTrue(start  + timeToLive <= message.getJMSExpiration());
+            assertTrue(end + timeToLive >= message.getJMSExpiration());
+            assertEquals(7, message.getJMSPriority());
+            assertNotNull(message.getJMSMessageID());
+            assertTrue(start <= message.getJMSTimestamp());
+            assertTrue(end >= message.getJMSTimestamp());
+        }
+
+        // Validate message is OK.
+        {
+            TextMessage message = (TextMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals("Hello", message.getText());
+            assertEquals("value", message.getStringProperty("test"));
+        }
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSQueueRedeliverTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSQueueRedeliverTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSQueueRedeliverTest.java
new file mode 100644
index 0000000..be8a6f2
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSQueueRedeliverTest.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+/**
+ * 
+ */
+public class JMSQueueRedeliverTest extends JmsTopicRedeliverTest {
+    protected void setUp() throws Exception {
+        topic = false;
+        super.setUp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSUsecaseTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSUsecaseTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSUsecaseTest.java
new file mode 100644
index 0000000..d22907a
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSUsecaseTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.Enumeration;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.Test;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+
+public class JMSUsecaseTest extends JmsTestSupport {
+
+    public ActiveMQDestination destination;
+    public int deliveryMode;
+    public int prefetch;
+    public byte destinationType;
+    public boolean durableConsumer;
+
+    public static Test suite() {
+        return suite(JMSUsecaseTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    public void initCombosForTestQueueBrowser() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+    }
+
+    public void testQueueBrowser() throws Exception {
+
+        // Send a message to the broker.
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(this.deliveryMode);
+        sendMessages(session, producer, 5);
+        producer.close();
+
+        QueueBrowser browser = session.createBrowser((Queue)destination);
+        Enumeration<?> enumeration = browser.getEnumeration();
+        for (int i = 0; i < 5; i++) {
+            Thread.sleep(100);
+            assertTrue(enumeration.hasMoreElements());
+            Message m = (Message)enumeration.nextElement();
+            assertNotNull(m);
+            assertEquals("" + i, ((TextMessage)m).getText());
+        }
+        assertFalse(enumeration.hasMoreElements());
+    }
+
+    public void initCombosForTestSendReceive() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                                              Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testSendReceive() throws Exception {
+        // Send a message to the broker.
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(this.deliveryMode);
+        MessageConsumer consumer = session.createConsumer(destination);
+        ActiveMQMessage message = new ActiveMQMessage();
+        producer.send(message);
+
+        // Make sure only 1 message was delivered.
+        assertNotNull(consumer.receive(1000));
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void initCombosForTestSendReceiveTransacted() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                                              Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testSendReceiveTransacted() throws Exception {
+        // Send a message to the broker.
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        destination = createDestination(session, destinationType);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(this.deliveryMode);
+        MessageConsumer consumer = session.createConsumer(destination);
+        producer.send(session.createTextMessage("test"));
+
+        // Message should not be delivered until commit.
+        assertNull(consumer.receiveNoWait());
+        session.commit();
+
+        // Make sure only 1 message was delivered.
+        Message message = consumer.receive(1000);
+        assertNotNull(message);
+        assertFalse(message.getJMSRedelivered());
+        assertNull(consumer.receiveNoWait());
+
+        // Message should be redelivered is rollback is used.
+        session.rollback();
+
+        // Make sure only 1 message was delivered.
+        message = consumer.receive(2000);
+        assertNotNull(message);
+        assertTrue(message.getJMSRedelivered());
+        assertNull(consumer.receiveNoWait());
+
+        // If we commit now, the message should not be redelivered.
+        session.commit();
+        assertNull(consumer.receiveNoWait());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
new file mode 100644
index 0000000..7deff27
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.ConnectionFactory;
+import junit.framework.Test;
+
+/*
+ * allow an XA session to be used as an auto ack session when no XA transaction
+ * https://issues.apache.org/activemq/browse/AMQ-2659
+ */
+public class JMSXAConsumerTest extends JMSConsumerTest {
+
+    public static Test suite() {
+        return suite(JMSXAConsumerTest.class);
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQXAConnectionFactory("vm://localhost");
+    }
+
+    // some tests use transactions, these will not work unless an XA transaction is in place
+    // slip these
+    public void testPrefetch1MessageNotDispatched() throws Exception {
+    }
+
+    public void testRedispatchOfUncommittedTx() throws Exception {
+    }
+
+    public void testRedispatchOfRolledbackTx() throws Exception {
+    }
+
+    public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java
new file mode 100644
index 0000000..5f34106
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+/**
+ * 
+ */
+public class JmsAutoAckListenerTest extends TestSupport implements MessageListener {
+
+    private Connection connection;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    /**
+     * Tests if acknowleged messages are being consumed.
+     * 
+     * @throws javax.jms.JMSException
+     */
+    public void testAckedMessageAreConsumed() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test");
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        consumer.setMessageListener(this);
+
+        Thread.sleep(10000);
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        // Attempt to Consume the message...check if message was acknowledge
+        consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNull(msg);
+
+        session.close();
+    }
+
+    public void onMessage(Message message) {
+        assertNotNull(message);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckTest.java
new file mode 100644
index 0000000..90ee032
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+/**
+ * 
+ */
+public class JmsAutoAckTest extends TestSupport {
+
+    private Connection connection;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+    
+    /**
+     * Tests if acknowleged messages are being consumed.
+     * 
+     * @throws javax.jms.JMSException
+     */
+    public void testAckedMessageAreConsumed() throws JMSException {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test");
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        
+        // Reset the session.
+        session.close();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(1000);
+        assertNull(msg);        
+
+        session.close();
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsBenchmark.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsBenchmark.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsBenchmark.java
new file mode 100644
index 0000000..52a70a0
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsBenchmark.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Benchmarks the broker by starting many consumer and producers against the
+ * same destination. Make sure you run with jvm option -server (makes a big
+ * difference). The tests simulate storing 1000 1k jms messages to see the rate
+ * of processing msg/sec.
+ *
+ *
+ */
+public class JmsBenchmark extends JmsTestSupport {
+    private static final transient Logger LOG = LoggerFactory.getLogger(JmsBenchmark.class);
+
+    private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY", "" + 1000 * 5));
+    private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "10"));
+    private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 60));
+    private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "10"));
+    private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "10"));
+
+    public ActiveMQDestination destination;
+
+    public static Test suite() {
+        return suite(JmsBenchmark.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(JmsBenchmark.class);
+    }
+
+    public void initCombos() {
+        addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")});
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://(tcp://localhost:0)?persistent=false"));
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
+        return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getServer().getConnectURI());
+    }
+
+    /**
+     * @throws Throwable
+     */
+    public void testConcurrentSendReceive() throws Throwable {
+
+        final Semaphore connectionsEstablished = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
+        final Semaphore workerDone = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
+        final CountDownLatch sampleTimeDone = new CountDownLatch(1);
+
+        final AtomicInteger producedMessages = new AtomicInteger(0);
+        final AtomicInteger receivedMessages = new AtomicInteger(0);
+
+        final Callable<Object> producer = new Callable<Object>() {
+            @Override
+            public Object call() throws JMSException, InterruptedException {
+                Connection connection = factory.createConnection();
+                connections.add(connection);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageProducer producer = session.createProducer(destination);
+                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                BytesMessage message = session.createBytesMessage();
+                message.writeBytes(new byte[1024]);
+                connection.start();
+                connectionsEstablished.release();
+
+                while (!sampleTimeDone.await(0, TimeUnit.MILLISECONDS)) {
+                    producer.send(message);
+                    producedMessages.incrementAndGet();
+                }
+
+                connection.close();
+                workerDone.release();
+                return null;
+            }
+        };
+
+        final Callable<Object> consumer = new Callable<Object>() {
+            @Override
+            public Object call() throws JMSException, InterruptedException {
+                Connection connection = factory.createConnection();
+                connections.add(connection);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer = session.createConsumer(destination);
+
+                consumer.setMessageListener(new MessageListener() {
+                    @Override
+                    public void onMessage(Message msg) {
+                        receivedMessages.incrementAndGet();
+                    }
+                });
+                connection.start();
+
+                connectionsEstablished.release();
+                sampleTimeDone.await();
+
+                connection.close();
+                workerDone.release();
+                return null;
+            }
+        };
+
+        final Throwable workerError[] = new Throwable[1];
+        for (int i = 0; i < PRODUCER_COUNT; i++) {
+            new Thread("Producer:" + i) {
+                @Override
+                public void run() {
+                    try {
+                        producer.call();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        workerError[0] = e;
+                    }
+                }
+            }.start();
+        }
+
+        for (int i = 0; i < CONSUMER_COUNT; i++) {
+            new Thread("Consumer:" + i) {
+                @Override
+                public void run() {
+                    try {
+                        consumer.call();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        workerError[0] = e;
+                    }
+                }
+            }.start();
+        }
+
+        LOG.info(getName() + ": Waiting for Producers and Consumers to startup.");
+        connectionsEstablished.acquire();
+        LOG.info("Producers and Consumers are now running.  Waiting for system to reach steady state: " + (SAMPLE_DELAY / 1000.0f) + " seconds");
+        Thread.sleep(1000 * 10);
+
+        LOG.info("Starting sample: " + SAMPLES + " each lasting " + (SAMPLE_DURATION / 1000.0f) + " seconds");
+
+        for (int i = 0; i < SAMPLES; i++) {
+
+            long start = System.currentTimeMillis();
+            producedMessages.set(0);
+            receivedMessages.set(0);
+
+            Thread.sleep(SAMPLE_DURATION);
+
+            long end = System.currentTimeMillis();
+            int r = receivedMessages.get();
+            int p = producedMessages.get();
+
+            LOG.info("published: " + p + " msgs at " + (p * 1000f / (end - start)) + " msgs/sec, " + "consumed: " + r + " msgs at " + (r * 1000f / (end - start)) + " msgs/sec");
+        }
+
+        LOG.info("Sample done.");
+        sampleTimeDone.countDown();
+
+        workerDone.acquire();
+        if (workerError[0] != null) {
+            throw workerError[0];
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java
new file mode 100644
index 0000000..c122807
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+/**
+ * 
+ */
+public class JmsClientAckListenerTest extends TestSupport implements MessageListener {
+
+    private Connection connection;
+    private boolean dontAck;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    /**
+     * Tests if acknowleged messages are being consumed.
+     * 
+     * @throws javax.jms.JMSException
+     */
+    public void testAckedMessageAreConsumed() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test");
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        consumer.setMessageListener(this);
+
+        Thread.sleep(10000);
+
+        // Reset the session.
+        session.close();
+
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNull(msg);
+
+        session.close();
+    }
+
+    /**
+     * Tests if unacknowleged messages are being redelivered when the consumer
+     * connects again.
+     * 
+     * @throws javax.jms.JMSException
+     */
+    public void testUnAckedMessageAreNotConsumedOnSessionClose() throws Exception {
+        connection.start();
+        // don't aknowledge message on onMessage() call
+        dontAck = true;
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test");
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        consumer.setMessageListener(this);
+        // Don't ack the message.
+
+        // Reset the session. This should cause the Unacked message to be
+        // redelivered.
+        session.close();
+
+        Thread.sleep(10000);
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(2000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        session.close();
+    }
+
+    public void onMessage(Message message) {
+
+        assertNotNull(message);
+        if (!dontAck) {
+            try {
+                message.acknowledge();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckTest.java
new file mode 100644
index 0000000..ef33f9a
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+/**
+ * 
+ */
+public class JmsClientAckTest extends TestSupport {
+
+    private Connection connection;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    /**
+     * Tests if acknowledged messages are being consumed.
+     *
+     * @throws JMSException
+     */
+    public void testAckedMessageAreConsumed() throws JMSException {
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        // Reset the session.
+        session.close();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(1000);
+        assertNull(msg);
+
+        session.close();
+    }
+
+    /**
+     * Tests if acknowledged messages are being consumed.
+     *
+     * @throws JMSException
+     */
+    public void testLastMessageAcked() throws JMSException {
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+        producer.send(session.createTextMessage("Hello2"));
+        producer.send(session.createTextMessage("Hello3"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg = consumer.receive(1000);
+        assertNotNull(msg);        
+        msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        // Reset the session.
+        session.close();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(1000);
+        assertNull(msg);
+
+        session.close();
+    }
+    
+    /**
+     * Tests if unacknowledged messages are being re-delivered when the consumer connects again.
+     * 
+     * @throws JMSException
+     */
+    public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException {
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);        
+        // Don't ack the message.
+        
+        // Reset the session.  This should cause the unacknowledged message to be re-delivered.
+        session.close();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(2000);
+        assertNotNull(msg);        
+        msg.acknowledge();
+        
+        session.close();
+    }
+
+    protected String getQueueName() {
+        return getClass().getName() + "." + getName();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java
new file mode 100644
index 0000000..c972b1e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.Random;
+import java.util.Vector;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+/**
+ *
+ */
+public class JmsConnectionStartStopTest extends TestSupport {
+
+    private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
+        .getLog(JmsConnectionStartStopTest.class);
+
+    private Connection startedConnection;
+    private Connection stoppedConnection;
+
+    /**
+     * @see junit.framework.TestCase#setUp()
+     */
+    @Override
+    protected void setUp() throws Exception {
+
+        LOG.info(getClass().getClassLoader().getResource("log4j.properties"));
+
+        ActiveMQConnectionFactory factory = createConnectionFactory();
+        startedConnection = factory.createConnection();
+        startedConnection.start();
+        stoppedConnection = factory.createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    @Override
+    protected void tearDown() throws Exception {
+        stoppedConnection.close();
+        startedConnection.close();
+    }
+
+    /**
+     * Tests if the consumer receives the messages that were sent before the
+     * connection was started.
+     *
+     * @throws JMSException
+     */
+    public void testStoppedConsumerHoldsMessagesTillStarted() throws JMSException {
+        Session startedSession = startedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session stoppedSession = stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Setup the consumers.
+        Topic topic = startedSession.createTopic("test");
+        MessageConsumer startedConsumer = startedSession.createConsumer(topic);
+        MessageConsumer stoppedConsumer = stoppedSession.createConsumer(topic);
+
+        // Send the message.
+        MessageProducer producer = startedSession.createProducer(topic);
+        TextMessage message = startedSession.createTextMessage("Hello");
+        producer.send(message);
+
+        // Test the assertions.
+        Message m = startedConsumer.receive(1000);
+        assertNotNull(m);
+
+        m = stoppedConsumer.receive(1000);
+        assertNull(m);
+
+        stoppedConnection.start();
+        m = stoppedConsumer.receive(5000);
+        assertNotNull(m);
+
+        startedSession.close();
+        stoppedSession.close();
+    }
+
+    /**
+     * Tests if the consumer is able to receive messages eveb when the
+     * connecction restarts multiple times.
+     *
+     * @throws Exception
+     */
+    public void testMultipleConnectionStops() throws Exception {
+        testStoppedConsumerHoldsMessagesTillStarted();
+        stoppedConnection.stop();
+        testStoppedConsumerHoldsMessagesTillStarted();
+        stoppedConnection.stop();
+        testStoppedConsumerHoldsMessagesTillStarted();
+    }
+
+
+    public void testConcurrentSessionCreateWithStart() throws Exception {
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(50, Integer.MAX_VALUE,
+                                      60L, TimeUnit.SECONDS,
+                                      new SynchronousQueue<Runnable>());
+        final Vector<Throwable> exceptions = new Vector<Throwable>();
+        final Random rand = new Random();
+        Runnable createSessionTask = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(rand.nextInt(10));
+                    stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                } catch (Exception e) {
+                    exceptions.add(e);
+                }
+            }
+        };
+
+        Runnable startStopTask = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(rand.nextInt(10));
+                    stoppedConnection.start();
+                    stoppedConnection.stop();
+                } catch (Exception e) {
+                    exceptions.add(e);
+                }
+            }
+        };
+
+        for (int i=0; i<1000; i++) {
+            executor.execute(createSessionTask);
+            executor.execute(startStopTask);
+        }
+
+        executor.shutdown();
+        assertTrue("executor terminated", executor.awaitTermination(30, TimeUnit.SECONDS));
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java
new file mode 100644
index 0000000..1a1a958
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+
+public class JmsConsumerResetActiveListenerTest extends TestCase {
+
+    private Connection connection;
+    private ActiveMQConnectionFactory factory;
+   
+    protected void setUp() throws Exception {
+        factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        connection = factory.createConnection();
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+    }
+    
+    /**
+     * verify the (undefined by spec) behaviour of setting a listener while receiving a message.
+     * 
+     * @throws Exception
+     */
+    public void testSetListenerFromListener() throws Exception {
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Destination dest = session.createQueue("Queue-" + getName());
+        final MessageConsumer consumer = session.createConsumer(dest);
+       
+        final CountDownLatch latch = new CountDownLatch(2);
+        final AtomicBoolean first = new AtomicBoolean(true);
+        final Vector<Object> results = new Vector<Object>();
+        consumer.setMessageListener(new MessageListener() {
+
+            public void onMessage(Message message) {
+                if (first.compareAndSet(true, false)) {
+                    try {
+                        consumer.setMessageListener(this);
+                        results.add(message);
+                    } catch (JMSException e) {
+                        results.add(e);
+                    }
+                } else {
+                    results.add(message);
+                }
+                latch.countDown();
+            }
+        });
+
+        connection.start();
+        
+        MessageProducer producer = session.createProducer(dest);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        producer.send(session.createTextMessage("First"));
+        producer.send(session.createTextMessage("Second"));
+        
+        assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS));
+        
+        assertEquals("we have a result", 2, results.size());
+        Object result = results.get(0);
+        assertTrue(result instanceof TextMessage);
+        assertEquals("result is first", "First", ((TextMessage)result).getText());
+        result = results.get(1);
+        assertTrue(result instanceof TextMessage);
+        assertEquals("result is first", "Second", ((TextMessage)result).getText());
+    }
+    
+    /**
+     * and a listener on a new consumer, just in case.
+      *
+     * @throws Exception
+     */
+    public void testNewConsumerSetListenerFromListener() throws Exception {
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final Destination dest = session.createQueue("Queue-" + getName());
+        final MessageConsumer consumer = session.createConsumer(dest);
+       
+        final CountDownLatch latch = new CountDownLatch(2);
+        final AtomicBoolean first = new AtomicBoolean(true);
+        final Vector<Object> results = new Vector<Object>();
+        consumer.setMessageListener(new MessageListener() {
+
+            public void onMessage(Message message) {
+                if (first.compareAndSet(true, false)) {
+                    try {
+                        MessageConsumer anotherConsumer = session.createConsumer(dest);
+                        anotherConsumer.setMessageListener(this);
+                        results.add(message);
+                    } catch (JMSException e) {
+                        results.add(e);
+                    }
+                } else {
+                    results.add(message);
+                }
+                latch.countDown();
+            }
+        });
+
+        connection.start();
+        
+        MessageProducer producer = session.createProducer(dest);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        producer.send(session.createTextMessage("First"));
+        producer.send(session.createTextMessage("Second"));
+        
+        assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS));
+        
+        assertEquals("we have a result", 2, results.size());
+        Object result = results.get(0);
+        assertTrue(result instanceof TextMessage);
+        assertEquals("result is first", "First", ((TextMessage)result).getText());
+        result = results.get(1);
+        assertTrue(result instanceof TextMessage);
+        assertEquals("result is first", "Second", ((TextMessage)result).getText());
+    }
+ }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
new file mode 100644
index 0000000..7a219e2
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+/**
+ * 
+ */
+public class JmsCreateConsumerInOnMessageTest extends TestSupport implements MessageListener {
+
+    private Connection connection;
+    private Session publisherSession;
+    private Session consumerSession;
+    private MessageConsumer consumer;
+    private MessageConsumer testConsumer;
+    private MessageProducer producer;
+    private Topic topic;
+    private Object lock = new Object();
+
+    /*
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        super.setUp();
+        super.topic = true;
+        connection = createConnection();
+        connection.setClientID("connection:" + getSubject());
+        publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        topic = (Topic)super.createDestination("Test.Topic");
+        consumer = consumerSession.createConsumer(topic);
+        consumer.setMessageListener(this);
+        producer = publisherSession.createProducer(topic);
+        connection.start();
+    }
+
+    /*
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        connection.close();
+    }
+
+    /**
+     * Tests if a consumer can be created asynchronusly
+     * 
+     * @throws Exception
+     */
+    public void testCreateConsumer() throws Exception {
+        Message msg = super.createMessage();
+        producer.send(msg);
+        if (testConsumer == null) {
+            synchronized (lock) {
+                lock.wait(3000);
+            }
+        }
+        assertTrue(testConsumer != null);
+    }
+
+    /**
+     * Use the asynchronous subscription mechanism
+     * 
+     * @param message
+     */
+    public void onMessage(Message message) {
+        try {
+            testConsumer = consumerSession.createConsumer(topic);
+            consumerSession.createProducer(topic);
+            synchronized (lock) {
+                lock.notify();
+            }
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            assertTrue(false);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java
new file mode 100644
index 0000000..72dd8bc
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.DeliveryMode;
+
+import org.apache.activemq.test.JmsTopicSendReceiveTest;
+
+/**
+ * 
+ */
+public class JmsDurableQueueWildcardSendReceiveTest extends JmsTopicSendReceiveTest {
+
+    /**
+     * Set up the test with a queue and persistent delivery mode.
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        topic = false;
+        deliveryMode = DeliveryMode.PERSISTENT;
+        super.setUp();
+    }
+
+    /**
+     * Returns the consumer subject.
+     */
+    protected String getConsumerSubject() {
+        return "FOO.>";
+    }
+
+    /**
+     * Returns the producer subject.
+     */
+    protected String getProducerSubject() {
+        return "FOO.BAR.HUMBUG";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java
new file mode 100644
index 0000000..cc212ab
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+/**
+ * 
+ */
+public class JmsDurableTopicSelectorTest extends JmsTopicSelectorTest {
+    public void setUp() throws Exception {
+        durable = true;
+        super.setUp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java
new file mode 100644
index 0000000..fa47ea9
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.test.JmsTopicSendReceiveTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class JmsDurableTopicSendReceiveTest extends JmsTopicSendReceiveTest {
+    private static final Logger LOG = LoggerFactory.getLogger(JmsDurableTopicSendReceiveTest.class);
+
+    protected Connection connection2;
+    protected Session session2;
+    protected Session consumeSession2;
+    protected MessageConsumer consumer2;
+    protected MessageProducer producer2;
+    protected Destination consumerDestination2;
+    protected Destination producerDestination2;
+
+    /**
+     * Set up a durable suscriber test.
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        this.durable = true;
+        super.setUp();
+    }
+
+    /**
+     * Test if all the messages sent are being received.
+     * 
+     * @throws Exception
+     */
+    public void testSendWhileClosed() throws Exception {
+        connection2 = createConnection();
+        connection2.setClientID("test");
+        connection2.start();
+        session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer2 = session2.createProducer(null);
+        producer2.setDeliveryMode(deliveryMode);
+        producerDestination2 = session2.createTopic(getProducerSubject() + "2");
+        Thread.sleep(1000);
+
+        consumeSession2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumerDestination2 = session2.createTopic(getConsumerSubject() + "2");
+        consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName());
+        Thread.sleep(1000);
+        consumer2.close();
+        TextMessage message = session2.createTextMessage("test");
+        message.setStringProperty("test", "test");
+        message.setJMSType("test");
+        producer2.send(producerDestination2, message);
+        LOG.info("Creating durable consumer");
+        consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName());
+        Message msg = consumer2.receive(1000);
+        assertNotNull(msg);
+        assertEquals(((TextMessage)msg).getText(), "test");
+        assertEquals(msg.getJMSType(), "test");
+        assertEquals(msg.getStringProperty("test"), "test");
+        connection2.stop();
+        connection2.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java
new file mode 100644
index 0000000..c01fb7f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.DeliveryMode;
+
+import org.apache.activemq.test.JmsResourceProvider;
+
+/**
+ * 
+ */
+public class JmsDurableTopicTransactionTest extends JmsTopicTransactionTest {
+
+    /**
+     * @see JmsTransactionTestSupport#getJmsResourceProvider()
+     */
+    protected JmsResourceProvider getJmsResourceProvider() {
+        JmsResourceProvider provider = new JmsResourceProvider();
+        provider.setTopic(true);
+        provider.setDeliveryMode(DeliveryMode.PERSISTENT);
+        provider.setClientID(getClass().getName());
+        provider.setDurableName(getName());
+        return provider;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicWildcardSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicWildcardSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicWildcardSendReceiveTest.java
new file mode 100644
index 0000000..3058a57
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicWildcardSendReceiveTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.DeliveryMode;
+
+import org.apache.activemq.test.JmsTopicSendReceiveTest;
+
+/**
+ * 
+ */
+public class JmsDurableTopicWildcardSendReceiveTest extends JmsTopicSendReceiveTest {
+
+    /**
+     * Sets up a test with a topic destination, durable suscriber and persistent
+     * delivery mode.
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        topic = true;
+        durable = true;
+        deliveryMode = DeliveryMode.PERSISTENT;
+        super.setUp();
+    }
+
+    /**
+     * Returns the consumer subject.
+     */
+    protected String getConsumerSubject() {
+        return "FOO.>";
+    }
+
+    /**
+     * Returns the producer subject.
+     */
+    protected String getProducerSubject() {
+        return "FOO.BAR.HUMBUG";
+    }
+}


Mime
View raw message