activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [43/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:37:14 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java
new file mode 100644
index 0000000..bd87b3d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.vm.VMTransport;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @version
+ */
+public class JmsTempDestinationTest extends TestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsTempDestinationTest.class);
+    private Connection connection;
+    private ActiveMQConnectionFactory factory;
+    protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
+
+    @Override
+    protected void setUp() throws Exception {
+        factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        factory.setAlwaysSyncSend(true);
+        connection = factory.createConnection();
+        connections.add(connection);
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    @Override
+    protected void tearDown() throws Exception {
+        for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
+            Connection conn = iter.next();
+            try {
+                conn.close();
+            } catch (Throwable e) {
+            }
+            iter.remove();
+        }
+    }
+
+    /**
+     * Make sure Temp destination can only be consumed by local connection
+     *
+     * @throws JMSException
+     */
+    public void testTempDestOnlyConsumedByLocalConn() throws JMSException {
+        connection.start();
+
+        Session tempSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TemporaryQueue queue = tempSession.createTemporaryQueue();
+        MessageProducer producer = tempSession.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        TextMessage message = tempSession.createTextMessage("First");
+        producer.send(message);
+
+        // temp destination should not be consume when using another connection
+        Connection otherConnection = factory.createConnection();
+        connections.add(otherConnection);
+        Session otherSession = otherConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TemporaryQueue otherQueue = otherSession.createTemporaryQueue();
+        MessageConsumer consumer = otherSession.createConsumer(otherQueue);
+        Message msg = consumer.receive(3000);
+        assertNull(msg);
+
+        // should throw InvalidDestinationException when consuming a temp
+        // destination from another connection
+        try {
+            consumer = otherSession.createConsumer(queue);
+            fail("Send should fail since temp destination should be used from another connection");
+        } catch (InvalidDestinationException e) {
+            assertTrue("failed to throw an exception", true);
+        }
+
+        // should be able to consume temp destination from the same connection
+        consumer = tempSession.createConsumer(queue);
+        msg = consumer.receive(3000);
+        assertNotNull(msg);
+
+    }
+
+    /**
+     * Make sure that a temp queue does not drop message if there is an active
+     * consumers.
+     *
+     * @throws JMSException
+     */
+    public void testTempQueueHoldsMessagesWithConsumers() throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createTemporaryQueue();
+        MessageConsumer consumer = session.createConsumer(queue);
+        connection.start();
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        TextMessage message = session.createTextMessage("Hello");
+        producer.send(message);
+
+        Message message2 = consumer.receive(1000);
+        assertNotNull(message2);
+        assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage);
+        assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage)message2).getText().equals(message.getText()));
+    }
+
+    /**
+     * Make sure that a temp queue does not drop message if there are no active
+     * consumers.
+     *
+     * @throws JMSException
+     */
+    public void testTempQueueHoldsMessagesWithoutConsumers() throws JMSException {
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createTemporaryQueue();
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        TextMessage message = session.createTextMessage("Hello");
+        producer.send(message);
+
+        connection.start();
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message message2 = consumer.receive(3000);
+        assertNotNull(message2);
+        assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage);
+        assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage)message2).getText().equals(message.getText()));
+
+    }
+
+    /**
+     * Test temp queue works under load
+     *
+     * @throws JMSException
+     */
+    public void testTmpQueueWorksUnderLoad() throws JMSException {
+        int count = 500;
+        int dataSize = 1024;
+
+        ArrayList<BytesMessage> list = new ArrayList<BytesMessage>(count);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createTemporaryQueue();
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        byte[] data = new byte[dataSize];
+        for (int i = 0; i < count; i++) {
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(data);
+            message.setIntProperty("c", i);
+            producer.send(message);
+            list.add(message);
+        }
+
+        connection.start();
+        MessageConsumer consumer = session.createConsumer(queue);
+        for (int i = 0; i < count; i++) {
+            Message message2 = consumer.receive(2000);
+            assertTrue(message2 != null);
+            assertEquals(i, message2.getIntProperty("c"));
+            assertTrue(message2.equals(list.get(i)));
+        }
+    }
+
+    /**
+     * Make sure you cannot publish to a temp destination that does not exist
+     * anymore.
+     *
+     * @throws JMSException
+     * @throws InterruptedException
+     * @throws URISyntaxException
+     */
+    public void testPublishFailsForClosedConnection() throws Exception {
+
+        Connection tempConnection = factory.createConnection();
+        connections.add(tempConnection);
+        Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final TemporaryQueue queue = tempSession.createTemporaryQueue();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        connection.start();
+
+        final ActiveMQConnection activeMQConnection = (ActiveMQConnection) connection;
+        assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return activeMQConnection.activeTempDestinations.containsKey(queue);
+            }
+        }));
+
+        // This message delivery should work since the temp connection is still
+        // open.
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        TextMessage message = session.createTextMessage("First");
+        producer.send(message);
+
+        // Closing the connection should destroy the temp queue that was
+        // created.
+        tempConnection.close();
+        Thread.sleep(5000); // Wait a little bit to let the delete take effect.
+
+        // This message delivery NOT should work since the temp connection is
+        // now closed.
+        try {
+            message = session.createTextMessage("Hello");
+            producer.send(message);
+            fail("Send should fail since temp destination should not exist anymore.");
+        } catch (JMSException e) {
+        }
+    }
+
+    /**
+     * Make sure you cannot publish to a temp destination that does not exist
+     * anymore.
+     *
+     * @throws JMSException
+     * @throws InterruptedException
+     */
+    public void testPublishFailsForDestroyedTempDestination() throws Exception {
+
+        Connection tempConnection = factory.createConnection();
+        connections.add(tempConnection);
+        Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final TemporaryQueue queue = tempSession.createTemporaryQueue();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        connection.start();
+
+        final ActiveMQConnection activeMQConnection = (ActiveMQConnection) connection;
+        assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return activeMQConnection.activeTempDestinations.containsKey(queue);
+            }
+        }));
+
+        // This message delivery should work since the temp connection is still
+        // open.
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        TextMessage message = session.createTextMessage("First");
+        producer.send(message);
+
+        // deleting the Queue will cause sends to fail
+        queue.delete();
+        Thread.sleep(5000); // Wait a little bit to let the delete take effect.
+
+        // This message delivery NOT should work since the temp connection is
+        // now closed.
+        try {
+            message = session.createTextMessage("Hello");
+            producer.send(message);
+            fail("Send should fail since temp destination should not exist anymore.");
+        } catch (JMSException e) {
+            assertTrue("failed to throw an exception", true);
+        }
+    }
+
+    /**
+     * Test you can't delete a Destination with Active Subscribers
+     *
+     * @throws JMSException
+     */
+    public void testDeleteDestinationWithSubscribersFails() throws JMSException {
+        Connection connection = factory.createConnection();
+        connections.add(connection);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TemporaryQueue queue = session.createTemporaryQueue();
+
+        connection.start();
+
+        session.createConsumer(queue);
+
+        // This message delivery should NOT work since the temp connection is
+        // now closed.
+        try {
+            queue.delete();
+            fail("Should fail as Subscribers are active");
+        } catch (JMSException e) {
+            assertTrue("failed to throw an exception", true);
+        }
+    }
+
+    public void testSlowConsumerDoesNotBlockFastTempUsers() throws Exception {
+        ActiveMQConnectionFactory advisoryConnFactory = new ActiveMQConnectionFactory("vm://localhost?asyncQueueDepth=20");
+        Connection connection = advisoryConnFactory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final CountDownLatch done = new CountDownLatch(1);
+        final AtomicBoolean ok = new AtomicBoolean(true);
+        final AtomicBoolean first = new AtomicBoolean(true);
+        VMTransport t = ((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class);
+        t.setTransportListener(new TransportListener() {
+            @Override
+            public void onCommand(Object command) {
+                // block first dispatch for a while so broker backs up, but other connection should be able to proceed
+                if (first.compareAndSet(true, false)) {
+                    try {
+                        ok.set(done.await(35, TimeUnit.SECONDS));
+                        LOG.info("Done waiting: " + ok.get());
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+
+            @Override
+            public void onException(IOException error) {
+            }
+
+            @Override
+            public void transportInterupted() {
+            }
+
+            @Override
+            public void transportResumed() {
+            }
+        });
+
+        connection = factory.createConnection();
+        connections.add(connection);
+        ((ActiveMQConnection)connection).setWatchTopicAdvisories(false);
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        for (int i=0; i<2500; i++) {
+            TemporaryQueue queue = session.createTemporaryQueue();
+            MessageConsumer consumer = session.createConsumer(queue);
+            consumer.close();
+            queue.delete();
+        }
+        LOG.info("Done with work: " + ok.get());
+        done.countDown();
+        assertTrue("ok", ok.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTestSupport.java
new file mode 100644
index 0000000..5531410
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTestSupport.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * Test cases used to test the JMS message consumer.
+ *
+ *
+ */
+public class JmsTestSupport extends CombinationTestSupport {
+
+    static final private AtomicLong TEST_COUNTER = new AtomicLong();
+    public String userName;
+    public String password;
+    public String messageTextPrefix = "";
+
+    protected ConnectionFactory factory;
+    protected ActiveMQConnection connection;
+    protected BrokerService broker;
+
+    protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
+
+    // /////////////////////////////////////////////////////////////////
+    //
+    // Test support methods.
+    //
+    // /////////////////////////////////////////////////////////////////
+    protected ActiveMQDestination createDestination(Session session, byte type) throws JMSException {
+        String testMethod = getName();
+        if( testMethod.indexOf(" ")>0 ) {
+            testMethod = testMethod.substring(0, testMethod.indexOf(" "));
+        }
+        String name = "TEST." + getClass().getName() + "." +testMethod+"."+TEST_COUNTER.getAndIncrement();
+        switch (type) {
+        case ActiveMQDestination.QUEUE_TYPE:
+            return (ActiveMQDestination)session.createQueue(name);
+        case ActiveMQDestination.TOPIC_TYPE:
+            return (ActiveMQDestination)session.createTopic(name);
+        case ActiveMQDestination.TEMP_QUEUE_TYPE:
+            return (ActiveMQDestination)session.createTemporaryQueue();
+        case ActiveMQDestination.TEMP_TOPIC_TYPE:
+            return (ActiveMQDestination)session.createTemporaryTopic();
+        default:
+            throw new IllegalArgumentException("type: " + type);
+        }
+    }
+
+    protected void sendMessages(Destination destination, int count) throws Exception {
+        ConnectionFactory factory = createConnectionFactory();
+        Connection connection = factory.createConnection();
+        connection.start();
+        sendMessages(connection, destination, count);
+        connection.close();
+    }
+
+    protected void sendMessages(Connection connection, Destination destination, int count) throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        sendMessages(session, destination, count);
+        session.close();
+    }
+
+    protected void sendMessages(Session session, Destination destination, int count) throws JMSException {
+        MessageProducer producer = session.createProducer(destination);
+        sendMessages(session, producer, count);
+        producer.close();
+    }
+
+    protected void sendMessages(Session session, MessageProducer producer, int count) throws JMSException {
+        for (int i = 0; i < count; i++) {
+            producer.send(session.createTextMessage(messageTextPrefix  + i));
+        }
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://localhost");
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        if (System.getProperty("basedir") == null) {
+            File file = new File(".");
+            System.setProperty("basedir", file.getAbsolutePath());
+        }
+
+        broker = createBroker();
+        broker.start();
+        factory = createConnectionFactory();
+        connection = (ActiveMQConnection)factory.createConnection(userName, password);
+        connections.add(connection);
+    }
+
+    protected void tearDown() throws Exception {
+        for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
+            Connection conn = iter.next();
+            try {
+                conn.close();
+            } catch (Throwable e) {
+            }
+            iter.remove();
+        }
+        broker.stop();
+        super.tearDown();
+    }
+
+    protected void safeClose(Connection c) {
+        try {
+            c.close();
+        } catch (Throwable e) {
+        }
+    }
+
+    protected void safeClose(Session s) {
+        try {
+            s.close();
+        } catch (Throwable e) {
+        }
+    }
+
+    protected void safeClose(MessageConsumer c) {
+        try {
+            c.close();
+        } catch (Throwable e) {
+        }
+    }
+
+    protected void safeClose(MessageProducer p) {
+        try {
+            p.close();
+        } catch (Throwable e) {
+        }
+    }
+
+    protected void profilerPause(String prompt) throws IOException {
+        if (System.getProperty("profiler") != null) {
+            pause(prompt);
+        }
+    }
+
+    protected void pause(String prompt) throws IOException {
+        System.out.println();
+        System.out.println(prompt + "> Press enter to continue: ");
+        while (System.in.read() != '\n') {
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicCompositeSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicCompositeSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicCompositeSendReceiveTest.java
new file mode 100644
index 0000000..a948ffa
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicCompositeSendReceiveTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.Topic;
+
+import org.apache.activemq.test.JmsTopicSendReceiveTest;
+
+
+/**
+ * 
+ */
+public class JmsTopicCompositeSendReceiveTest extends JmsTopicSendReceiveTest {
+    private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
+            .getLog(JmsTopicCompositeSendReceiveTest.class);
+    
+    Destination consumerDestination2;
+    MessageConsumer consumer2;
+
+    /**
+     * Sets a test to have a queue destination and non-persistent delivery mode.
+     *
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        deliveryMode = DeliveryMode.NON_PERSISTENT;
+        super.setUp();
+        consumerDestination2 = consumeSession.createTopic("FOO.BAR.HUMBUG2");
+        LOG.info("Created  consumer destination: " + consumerDestination2 + " of type: " + consumerDestination2.getClass());
+        if (durable) {
+            LOG.info("Creating durable consumer");
+            consumer2 = consumeSession.createDurableSubscriber((Topic) consumerDestination2, getName());
+        } else {
+            consumer2 = consumeSession.createConsumer(consumerDestination2);
+        }
+
+    }
+
+    /**
+     * Returns the consumer subject.
+     *
+     * @return String - consumer subject
+     * @see org.apache.activemq.test.TestSupport#getConsumerSubject()
+     */
+    protected String getConsumerSubject() {
+        return "FOO.BAR.HUMBUG";
+    }
+
+    /**
+     * Returns the producer subject.
+     *
+     * @return String - producer subject
+     * @see org.apache.activemq.test.TestSupport#getProducerSubject()
+     */
+    protected String getProducerSubject() {
+        return "FOO.BAR.HUMBUG,FOO.BAR.HUMBUG2";
+    }
+
+    /**
+     * Test if all the messages sent are being received.
+     *
+     * @throws Exception
+     */
+    public void testSendReceive() throws Exception {
+        super.testSendReceive();
+        messages.clear();
+        consumer2.setMessageListener(this);
+        assertMessagesAreReceived();
+        LOG.info("" + data.length + " messages(s) received, closing down connections");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRedeliverTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRedeliverTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRedeliverTest.java
new file mode 100644
index 0000000..188e4cd
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRedeliverTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class JmsTopicRedeliverTest extends TestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsTopicRedeliverTest.class);
+
+    protected Connection connection;
+    protected Session session;
+    protected Session consumeSession;
+    protected MessageConsumer consumer;
+    protected MessageProducer producer;
+    protected Destination consumerDestination;
+    protected Destination producerDestination;
+    protected boolean topic = true;
+    protected boolean durable;
+    protected boolean verbose;
+    protected long initRedeliveryDelay;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        connectionFactory = createConnectionFactory();
+        connection = createConnection();
+        initRedeliveryDelay = ((ActiveMQConnection)connection).getRedeliveryPolicy().getInitialRedeliveryDelay();
+
+        if (durable) {
+            connection.setClientID(getClass().getName());
+        }
+
+        LOG.info("Created connection: " + connection);
+
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        LOG.info("Created session: " + session);
+        LOG.info("Created consumeSession: " + consumeSession);
+        producer = session.createProducer(null);
+        // producer.setDeliveryMode(deliveryMode);
+
+        LOG.info("Created producer: " + producer);
+
+        if (topic) {
+            consumerDestination = session.createTopic(getConsumerSubject());
+            producerDestination = session.createTopic(getProducerSubject());
+        } else {
+            consumerDestination = session.createQueue(getConsumerSubject());
+            producerDestination = session.createQueue(getProducerSubject());
+        }
+
+        LOG.info("Created  consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass());
+        LOG.info("Created  producer destination: " + producerDestination + " of type: " + producerDestination.getClass());
+        consumer = createConsumer();
+        connection.start();
+
+        LOG.info("Created connection: " + connection);
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+
+    /**
+     * Returns the consumer subject.
+     * 
+     * @return String - consumer subject
+     * @see org.apache.activemq.test.TestSupport#getConsumerSubject()
+     */
+    protected String getConsumerSubject() {
+        return "TEST";
+    }
+
+    /**
+     * Returns the producer subject.
+     * 
+     * @return String - producer subject
+     * @see org.apache.activemq.test.TestSupport#getProducerSubject()
+     */
+    protected String getProducerSubject() {
+        return "TEST";
+    }
+
+    /**
+     * Sends and consumes the messages.
+     * 
+     * @throws Exception
+     */
+    public void testRecover() throws Exception {
+        String text = "TEST";
+        Message sendMessage = session.createTextMessage(text);
+
+        if (verbose) {
+            LOG.info("About to send a message: " + sendMessage + " with text: " + text);
+        }
+        producer.send(producerDestination, sendMessage);
+
+        // receive but don't acknowledge
+        Message unackMessage = consumer.receive(initRedeliveryDelay + 1000);
+        assertNotNull(unackMessage);
+        String unackId = unackMessage.getJMSMessageID();
+        assertEquals(((TextMessage)unackMessage).getText(), text);
+        assertFalse(unackMessage.getJMSRedelivered());
+        // assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"),1);
+
+        // receive then acknowledge
+        consumeSession.recover();
+        Message ackMessage = consumer.receive(initRedeliveryDelay + 1000);
+        assertNotNull(ackMessage);
+        ackMessage.acknowledge();
+        String ackId = ackMessage.getJMSMessageID();
+        assertEquals(((TextMessage)ackMessage).getText(), text);
+        assertTrue(ackMessage.getJMSRedelivered());
+        // assertEquals(ackMessage.getIntProperty("JMSXDeliveryCount"),2);
+        assertEquals(unackId, ackId);
+        consumeSession.recover();
+        assertNull(consumer.receiveNoWait());
+    }
+
+    protected MessageConsumer createConsumer() throws JMSException {
+        if (durable) {
+            LOG.info("Creating durable consumer");
+            return consumeSession.createDurableSubscriber((Topic)consumerDestination, getName());
+        }
+        return consumeSession.createConsumer(consumerDestination);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRequestReplyTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRequestReplyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRequestReplyTest.java
new file mode 100644
index 0000000..f5d1d2c
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRequestReplyTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.List;
+import java.util.Vector;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.test.TestSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class JmsTopicRequestReplyTest extends TestSupport implements MessageListener {
+    private static final Logger LOG = LoggerFactory.getLogger(JmsTopicRequestReplyTest.class);
+
+    protected boolean useAsyncConsume;
+    private Connection serverConnection;
+    private Connection clientConnection;
+    private MessageProducer replyProducer;
+    private Session serverSession;
+    private Destination requestDestination;
+    private List<JMSException> failures = new Vector<JMSException>();
+    private boolean dynamicallyCreateProducer;
+    private String clientSideClientID;
+
+    public void testSendAndReceive() throws Exception {
+        clientConnection = createConnection();
+        clientConnection.setClientID("ClientConnection:" + getSubject());
+
+        Session session = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        clientConnection.start();
+
+        Destination replyDestination = createTemporaryDestination(session);
+
+        // lets test the destination
+        clientSideClientID = clientConnection.getClientID();
+
+        // TODO
+        // String value = ActiveMQDestination.getClientId((ActiveMQDestination)
+        // replyDestination);
+        // assertEquals("clientID from the temporary destination must be the
+        // same", clientSideClientID, value);
+        LOG.info("Both the clientID and destination clientID match properly: " + clientSideClientID);
+
+        /* build queues */
+        MessageProducer requestProducer = session.createProducer(requestDestination);
+        MessageConsumer replyConsumer = session.createConsumer(replyDestination);
+
+        /* build requestmessage */
+        TextMessage requestMessage = session.createTextMessage("Olivier");
+        requestMessage.setJMSReplyTo(replyDestination);
+        requestProducer.send(requestMessage);
+
+        LOG.info("Sent request.");
+        LOG.info(requestMessage.toString());
+
+        Message msg = replyConsumer.receive(5000);
+
+        if (msg instanceof TextMessage) {
+            TextMessage replyMessage = (TextMessage)msg;
+            LOG.info("Received reply.");
+            LOG.info(replyMessage.toString());
+            assertEquals("Wrong message content", "Hello: Olivier", replyMessage.getText());
+        } else {
+            fail("Should have received a reply by now");
+        }
+        replyConsumer.close();
+        deleteTemporaryDestination(replyDestination);
+
+        assertEquals("Should not have had any failures: " + failures, 0, failures.size());
+    }
+
+    public void testSendAndReceiveWithDynamicallyCreatedProducer() throws Exception {
+        dynamicallyCreateProducer = true;
+        testSendAndReceive();
+    }
+
+    /**
+     * Use the asynchronous subscription mechanism
+     */
+    public void onMessage(Message message) {
+        try {
+            TextMessage requestMessage = (TextMessage)message;
+
+            LOG.info("Received request.");
+            LOG.info(requestMessage.toString());
+
+            Destination replyDestination = requestMessage.getJMSReplyTo();
+
+            // TODO
+            // String value =
+            // ActiveMQDestination.getClientId((ActiveMQDestination)
+            // replyDestination);
+            // assertEquals("clientID from the temporary destination must be the
+            // same", clientSideClientID, value);
+
+            TextMessage replyMessage = serverSession.createTextMessage("Hello: " + requestMessage.getText());
+
+            replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
+
+            if (dynamicallyCreateProducer) {
+                replyProducer = serverSession.createProducer(replyDestination);
+                replyProducer.send(replyMessage);
+            } else {
+                replyProducer.send(replyDestination, replyMessage);
+            }
+
+            LOG.info("Sent reply.");
+            LOG.info(replyMessage.toString());
+        } catch (JMSException e) {
+            onException(e);
+        }
+    }
+
+    /**
+     * Use the synchronous subscription mechanism
+     */
+    protected void syncConsumeLoop(MessageConsumer requestConsumer) {
+        try {
+            Message message = requestConsumer.receive(5000);
+            if (message != null) {
+                onMessage(message);
+            } else {
+                LOG.error("No message received");
+            }
+        } catch (JMSException e) {
+            onException(e);
+        }
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        serverConnection = createConnection();
+        serverConnection.setClientID("serverConnection:" + getSubject());
+        serverSession = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        replyProducer = serverSession.createProducer(null);
+
+        requestDestination = createDestination(serverSession);
+
+        /* build queues */
+        final MessageConsumer requestConsumer = serverSession.createConsumer(requestDestination);
+        if (useAsyncConsume) {
+            requestConsumer.setMessageListener(this);
+        } else {
+            Thread thread = new Thread(new Runnable() {
+                public void run() {
+                    syncConsumeLoop(requestConsumer);
+                }
+            });
+            thread.start();
+        }
+        serverConnection.start();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+
+        serverConnection.close();
+        clientConnection.stop();
+        clientConnection.close();
+    }
+
+    protected void onException(JMSException e) {
+        LOG.info("Caught: " + e);
+        e.printStackTrace();
+        failures.add(e);
+    }
+
+    protected Destination createDestination(Session session) throws JMSException {
+        if (topic) {
+            return session.createTopic(getSubject());
+        }
+        return session.createQueue(getSubject());
+    }
+
+    protected Destination createTemporaryDestination(Session session) throws JMSException {
+        if (topic) {
+            return session.createTemporaryTopic();
+        }
+        return session.createTemporaryQueue();
+    }
+    
+    protected void deleteTemporaryDestination(Destination dest) throws JMSException {
+        if (topic) {
+            ((TemporaryTopic)dest).delete();
+        } else {
+            ((TemporaryQueue)dest).delete();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java
new file mode 100644
index 0000000..e6a2503
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class JmsTopicSelectorTest extends TestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(JmsTopicSelectorTest.class);
+
+    protected Connection connection;
+    protected Session session;
+    protected MessageConsumer consumer;
+    protected MessageProducer producer;
+    protected Destination consumerDestination;
+    protected Destination producerDestination;
+    protected boolean topic = true;
+    protected boolean durable;
+    protected int deliveryMode = DeliveryMode.PERSISTENT;
+
+    public void setUp() throws Exception {
+        super.setUp();
+
+        connectionFactory = createConnectionFactory();
+        connection = createConnection();
+        if (durable) {
+            connection.setClientID(getClass().getName());
+        }
+
+        LOG.info("Created connection: " + connection);
+
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        LOG.info("Created session: " + session);
+
+        if (topic) {
+            consumerDestination = session.createTopic(getConsumerSubject());
+            producerDestination = session.createTopic(getProducerSubject());
+        } else {
+            consumerDestination = session.createQueue(getConsumerSubject());
+            producerDestination = session.createQueue(getProducerSubject());
+        }
+
+        LOG.info("Created  consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass());
+        LOG.info("Created  producer destination: " + producerDestination + " of type: " + producerDestination.getClass());
+        producer = session.createProducer(producerDestination);
+        producer.setDeliveryMode(deliveryMode);
+
+        LOG.info("Created producer: " + producer + " delivery mode = " + (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT"));
+        connection.start();
+    }
+
+    public void tearDown() throws Exception {
+        session.close();
+        connection.close();
+    }
+
+    protected MessageConsumer createConsumer(String selector) throws JMSException {
+        if (durable) {
+            LOG.info("Creating durable consumer");
+            return session.createDurableSubscriber((Topic)consumerDestination, getName(), selector, false);
+        }
+        return session.createConsumer(consumerDestination, selector);
+    }
+
+    public void sendMessages() throws Exception {
+        TextMessage message = session.createTextMessage("1");
+        message.setIntProperty("id", 1);
+        message.setJMSType("a");
+        message.setStringProperty("stringProperty", "a");
+        message.setLongProperty("longProperty", 1);
+        message.setBooleanProperty("booleanProperty", true);
+        producer.send(message);
+
+        message = session.createTextMessage("2");
+        message.setIntProperty("id", 2);
+        message.setJMSType("a");
+        message.setStringProperty("stringProperty", "a");
+        message.setLongProperty("longProperty", 1);
+        message.setBooleanProperty("booleanProperty", false);
+        producer.send(message);
+
+        message = session.createTextMessage("3");
+        message.setIntProperty("id", 3);
+        message.setJMSType("a");
+        message.setStringProperty("stringProperty", "a");
+        message.setLongProperty("longProperty", 1);
+        message.setBooleanProperty("booleanProperty", true);
+        producer.send(message);
+
+        message = session.createTextMessage("4");
+        message.setIntProperty("id", 4);
+        message.setJMSType("b");
+        message.setStringProperty("stringProperty", "b");
+        message.setLongProperty("longProperty", 2);
+        message.setBooleanProperty("booleanProperty", false);
+        producer.send(message);
+
+        message = session.createTextMessage("5");
+        message.setIntProperty("id", 5);
+        message.setJMSType("c");
+        message.setStringProperty("stringProperty", "c");
+        message.setLongProperty("longProperty", 3);
+        message.setBooleanProperty("booleanProperty", true);
+        producer.send(message);
+    }
+
+    public void consumeMessages(int remaining) throws Exception {
+        consumer = createConsumer(null);
+        for (int i = 0; i < remaining; i++) {
+            consumer.receive(1000);
+        }
+        consumer.close();
+
+    }
+
+    public void testEmptyPropertySelector() throws Exception {
+        int remaining = 5;
+        Message message = null;
+        consumer = createConsumer("");
+        sendMessages();
+        while (true) {
+            message = consumer.receive(1000);
+            if (message == null) {
+                break;
+            }
+
+            remaining--;
+        }
+        assertEquals(remaining, 0);
+        consumer.close();
+        consumeMessages(remaining);
+    }
+
+    public void testPropertySelector() throws Exception {
+        int remaining = 5;
+        Message message = null;
+        consumer = createConsumer("stringProperty = 'a' and longProperty = 1 and booleanProperty = true");
+        sendMessages();
+        while (true) {
+            message = consumer.receive(1000);
+            if (message == null) {
+                break;
+            }
+            String text = ((TextMessage)message).getText();
+            if (!text.equals("1") && !text.equals("3")) {
+                fail("unexpected message: " + text);
+            }
+            remaining--;
+        }
+        assertEquals(remaining, 3);
+        consumer.close();
+        consumeMessages(remaining);
+
+    }
+
+    public void testJMSPropertySelector() throws Exception {
+        int remaining = 5;
+        Message message = null;
+        consumer = createConsumer("JMSType = 'a' and stringProperty = 'a'");
+        sendMessages();
+        while (true) {
+            message = consumer.receive(1000);
+            if (message == null) {
+                break;
+            }
+            String text = ((TextMessage)message).getText();
+            if (!text.equals("1") && !text.equals("2") && !text.equals("3")) {
+                fail("unexpected message: " + text);
+            }
+            remaining--;
+        }
+        assertEquals(remaining, 2);
+        consumer.close();
+        consumeMessages(remaining);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveSubscriberTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveSubscriberTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveSubscriberTest.java
new file mode 100644
index 0000000..7cb09ea
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveSubscriberTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+/**
+ * 
+ */
+public class JmsTopicSendReceiveSubscriberTest extends JmsTopicSendReceiveTest {
+    protected MessageConsumer createConsumer() throws JMSException {
+        if (durable) {
+            return super.createConsumer();
+        } else {
+            TopicSession topicSession = (TopicSession)session;
+            return topicSession.createSubscriber((Topic)consumerDestination, null, false);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveTest.java
new file mode 100644
index 0000000..fd91ec4
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class JmsTopicSendReceiveTest extends JmsSendReceiveTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(JmsTopicSendReceiveTest.class);
+
+    protected Connection connection;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        connectionFactory = createConnectionFactory();
+        connection = createConnection();
+        if (durable) {
+            connection.setClientID(getClass().getName());
+        }
+
+        LOG.info("Created connection: " + connection);
+
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        LOG.info("Created session: " + session);
+        producer = session.createProducer(null);
+        producer.setDeliveryMode(deliveryMode);
+
+        LOG.info("Created producer: " + producer + " delivery mode = " + (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT"));
+
+        if (topic) {
+            consumerDestination = session.createTopic(getConsumerSubject());
+            producerDestination = session.createTopic(getProducerSubject());
+        } else {
+            consumerDestination = session.createQueue(getConsumerSubject());
+            producerDestination = session.createQueue(getProducerSubject());
+        }
+
+        LOG.info("Created  consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass());
+        LOG.info("Created  producer destination: " + producerDestination + " of type: " + producerDestination.getClass());
+        consumer = createConsumer();
+        consumer.setMessageListener(this);
+        connection.start();
+
+        // log.info("Created connection: " + connection);
+    }
+
+    protected MessageConsumer createConsumer() throws JMSException {
+        if (durable) {
+            LOG.info("Creating durable consumer");
+            return session.createDurableSubscriber((Topic)consumerDestination, getName());
+        }
+        return session.createConsumer(consumerDestination);
+    }
+
+    protected void tearDown() throws Exception {
+        LOG.info("Dumping stats...");
+        // connectionFactory.getStats().reset();
+
+        LOG.info("Closing down connection");
+
+        /** TODO we should be able to shut down properly */
+        session.close();
+        connection.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
new file mode 100644
index 0000000..b56c15d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+/**
+ * @version
+ */
+public class JmsTopicSendReceiveWithTwoConnectionsTest extends JmsSendReceiveTestSupport {
+
+    private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
+        .getLog(JmsTopicSendReceiveWithTwoConnectionsTest.class);
+
+    protected Connection sendConnection;
+    protected Connection receiveConnection;
+    protected Session receiveSession;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        connectionFactory = createConnectionFactory();
+
+        sendConnection = createSendConnection();
+        sendConnection.start();
+
+        receiveConnection = createReceiveConnection();
+        receiveConnection.start();
+
+        LOG.info("Created sendConnection: " + sendConnection);
+        LOG.info("Created receiveConnection: " + receiveConnection);
+
+        session = createSendSession(sendConnection);
+        receiveSession = createReceiveSession(receiveConnection);
+
+        LOG.info("Created sendSession: " + session);
+        LOG.info("Created receiveSession: " + receiveSession);
+
+        producer = session.createProducer(null);
+        producer.setDeliveryMode(deliveryMode);
+
+        LOG.info("Created producer: " + producer + " delivery mode = "
+                 + (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT"));
+
+        if (topic) {
+            consumerDestination = session.createTopic(getConsumerSubject());
+            producerDestination = session.createTopic(getProducerSubject());
+        } else {
+            consumerDestination = session.createQueue(getConsumerSubject());
+            producerDestination = session.createQueue(getProducerSubject());
+        }
+
+        LOG.info("Created  consumer destination: " + consumerDestination + " of type: "
+                 + consumerDestination.getClass());
+        LOG.info("Created  producer destination: " + producerDestination + " of type: "
+                 + producerDestination.getClass());
+
+        consumer = createConsumer(receiveSession, consumerDestination);
+        consumer.setMessageListener(this);
+
+        LOG.info("Started connections");
+    }
+
+    protected Session createReceiveSession(Connection receiveConnection) throws Exception {
+        return receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    protected Session createSendSession(Connection sendConnection) throws Exception {
+        return sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    protected Connection createReceiveConnection() throws Exception {
+        return createConnection();
+    }
+
+    protected Connection createSendConnection() throws Exception {
+        return createConnection();
+    }
+
+    protected MessageConsumer createConsumer(Session session, Destination dest) throws JMSException {
+        return session.createConsumer(dest);
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+    }
+
+    protected void tearDown() throws Exception {
+        session.close();
+        receiveSession.close();
+        sendConnection.close();
+        receiveConnection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsWithJMXTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsWithJMXTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsWithJMXTest.java
new file mode 100644
index 0000000..da2c80c
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsWithJMXTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+/**
+ * 
+ * 
+ */
+public class JmsTopicSendReceiveWithTwoConnectionsWithJMXTest extends
+    JmsTopicSendReceiveWithTwoConnectionsTest {
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&broker.useJmx=true");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendSameMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendSameMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendSameMessageTest.java
new file mode 100644
index 0000000..9a92e49
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendSameMessageTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.TextMessage;
+
+/**
+ * 
+ */
+public class JmsTopicSendSameMessageTest extends JmsTopicSendReceiveWithTwoConnectionsTest {
+
+    private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
+        .getLog(JmsTopicSendSameMessageTest.class);
+
+    public void testSendReceive() throws Exception {
+        messages.clear();
+
+        TextMessage message = session.createTextMessage();
+
+        for (int i = 0; i < data.length; i++) {
+            message.setText(data[i]);
+            message.setStringProperty("stringProperty", data[i]);
+            message.setIntProperty("intProperty", i);
+
+            if (verbose) {
+                LOG.info("About to send a message: " + message + " with text: " + data[i]);
+            }
+
+            producer.send(producerDestination, message);
+        }
+
+        assertMessagesAreReceived();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicTransactionTest.java
new file mode 100644
index 0000000..cf5cdc0
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicTransactionTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import org.apache.activemq.test.JmsResourceProvider;
+
+
+/**
+ * 
+ */
+public class JmsTopicTransactionTest extends JmsTransactionTestSupport {
+
+    /**
+     * @see org.apache.activemq.JmsTransactionTestSupport#getJmsResourceProvider()
+     */
+    protected JmsResourceProvider getJmsResourceProvider() {
+        JmsResourceProvider p = new JmsResourceProvider();
+        p.setTopic(true);
+        p.setDurableName("testsub");
+        p.setClientID("testclient");
+        return p;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicWildcardSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicWildcardSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicWildcardSendReceiveTest.java
new file mode 100644
index 0000000..eeb5999
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicWildcardSendReceiveTest.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.test.JmsTopicSendReceiveTest;
+
+/**
+ * 
+ */
+public class JmsTopicWildcardSendReceiveTest extends JmsTopicSendReceiveTest {
+
+    private String destination1String = "TEST.ONE.ONE";
+    private String destination2String = "TEST.ONE.ONE.ONE";
+    private String destination3String = "TEST.ONE.TWO";
+    private String destination4String = "TEST.TWO.ONE";
+
+    protected void setUp() throws Exception {
+        topic = true;
+        durable = false;
+        deliveryMode = DeliveryMode.NON_PERSISTENT;
+        super.setUp();
+    }
+
+    protected String getConsumerSubject() {
+        return "FOO.>";
+    }
+
+    protected String getProducerSubject() {
+        return "FOO.BAR.HUMBUG";
+    }
+
+    public void testReceiveWildcardTopicEndAsterisk() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQDestination destination1 = (ActiveMQDestination)session.createTopic(destination1String);
+        ActiveMQDestination destination3 = (ActiveMQDestination)session.createTopic(destination3String);
+
+        Message m = null;
+        MessageConsumer consumer = null;
+        String text = null;
+
+        ActiveMQDestination destination6 = (ActiveMQDestination)session.createTopic("TEST.ONE.*");
+        consumer = session.createConsumer(destination6);
+        sendMessage(session, destination1, destination1String);
+        sendMessage(session, destination3, destination3String);
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        text = ((TextMessage)m).getText();
+        if (!(text.equals(destination1String) || text.equals(destination3String))) {
+            fail("unexpected message:" + text);
+        }
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        text = ((TextMessage)m).getText();
+        if (!(text.equals(destination1String) || text.equals(destination3String))) {
+            fail("unexpected message:" + text);
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void testReceiveWildcardTopicEndGreaterThan() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQDestination destination1 = (ActiveMQDestination)session.createTopic(destination1String);
+        ActiveMQDestination destination2 = (ActiveMQDestination)session.createTopic(destination2String);
+        ActiveMQDestination destination3 = (ActiveMQDestination)session.createTopic(destination3String);
+
+        Message m = null;
+        MessageConsumer consumer = null;
+        String text = null;
+
+        ActiveMQDestination destination7 = (ActiveMQDestination)session.createTopic("TEST.ONE.>");
+        consumer = session.createConsumer(destination7);
+        sendMessage(session, destination1, destination1String);
+        sendMessage(session, destination2, destination2String);
+        sendMessage(session, destination3, destination3String);
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        text = ((TextMessage)m).getText();
+        if (!(text.equals(destination1String) || text.equals(destination2String) || text.equals(destination3String))) {
+            fail("unexpected message:" + text);
+        }
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        if (!(text.equals(destination1String) || text.equals(destination2String) || text.equals(destination3String))) {
+            fail("unexpected message:" + text);
+        }
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        if (!(text.equals(destination1String) || text.equals(destination2String) || text.equals(destination3String))) {
+            fail("unexpected message:" + text);
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void testReceiveWildcardTopicMidAsterisk() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQDestination destination1 = (ActiveMQDestination)session.createTopic(destination1String);
+        ActiveMQDestination destination4 = (ActiveMQDestination)session.createTopic(destination4String);
+
+        Message m = null;
+        MessageConsumer consumer = null;
+        String text = null;
+
+        ActiveMQDestination destination8 = (ActiveMQDestination)session.createTopic("TEST.*.ONE");
+        consumer = session.createConsumer(destination8);
+        sendMessage(session, destination1, destination1String);
+        sendMessage(session, destination4, destination4String);
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        text = ((TextMessage)m).getText();
+        if (!(text.equals(destination1String) || text.equals(destination4String))) {
+            fail("unexpected message:" + text);
+        }
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        text = ((TextMessage)m).getText();
+        if (!(text.equals(destination1String) || text.equals(destination4String))) {
+            fail("unexpected message:" + text);
+        }
+        assertNull(consumer.receiveNoWait());
+
+    }
+
+    public void testReceiveWildcardTopicMatchDoubleWildcard() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQDestination destination1 = (ActiveMQDestination)session.createTopic("a.*.>.>");
+        ActiveMQDestination destination2 = (ActiveMQDestination)session.createTopic("a.b");
+
+        Message m = null;
+        MessageConsumer consumer = null;
+        String text = null;
+
+
+        consumer = session.createConsumer(destination1);
+        sendMessage(session, destination2, destination3String);
+
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        text = ((TextMessage)m).getText();
+        if (!(text.equals(destination1String) || text.equals(destination3String))) {
+            fail("unexpected message:" + text);
+        }
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void testReceiveWildcardTopicMatchSinglePastTheEndWildcard() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQDestination destination1 = (ActiveMQDestination)session.createTopic("a.>");
+        ActiveMQDestination destination2 = (ActiveMQDestination)session.createTopic("a");
+
+        Message m = null;
+        MessageConsumer consumer = null;
+        String text = null;
+
+
+        consumer = session.createConsumer(destination1);
+        sendMessage(session, destination2, destination3String);
+
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        text = ((TextMessage)m).getText();
+        if (!(text.equals(destination1String) || text.equals(destination3String))) {
+            fail("unexpected message:" + text);
+        }
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+
+
+    private void sendMessage(Session session, Destination destination, String text) throws JMSException {
+        MessageProducer producer = session.createProducer(destination);
+        producer.send(session.createTextMessage(text));
+        producer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
new file mode 100644
index 0000000..fc77218
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.IdGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class LargeMessageTestSupport extends ClientTestSupport implements MessageListener {
+
+    protected static final int LARGE_MESSAGE_SIZE = 128 * 1024;
+    protected static final int MESSAGE_COUNT = 100;
+
+    private static final Logger LOG = LoggerFactory.getLogger(LargeMessageTestSupport.class);
+
+    protected Connection producerConnection;
+    protected Connection consumerConnection;
+    protected MessageConsumer consumer;
+    protected MessageProducer producer;
+    protected Session producerSession;
+    protected Session consumerSession;
+    protected byte[] largeMessageData;
+    protected Destination destination;
+    protected boolean isTopic = true;
+    protected boolean isDurable = true;
+    protected int deliveryMode = DeliveryMode.PERSISTENT;
+    protected IdGenerator idGen = new IdGenerator();
+    protected boolean validMessageConsumption = true;
+    protected AtomicInteger messageCount = new AtomicInteger(0);
+
+    protected int prefetchValue = 10000000;
+
+    protected Destination createDestination() {
+        String subject = getClass().getName();
+        if (isTopic) {
+            return new ActiveMQTopic(subject);
+        } else {
+            return new ActiveMQQueue(subject);
+        }
+    }
+
+    protected MessageConsumer createConsumer() throws JMSException {
+        if (isTopic && isDurable) {
+            return consumerSession.createDurableSubscriber((Topic)destination, idGen.generateId());
+        } else {
+            return consumerSession.createConsumer(destination);
+        }
+    }
+
+    public void setUp() throws Exception {
+        super.setUp();
+        ClientTestSupport.removeMessageStore();
+        LOG.info("Setting up . . . . . ");
+        messageCount.set(0);
+
+        destination = createDestination();
+        largeMessageData = new byte[LARGE_MESSAGE_SIZE];
+        for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) {
+            if (i % 2 == 0) {
+                largeMessageData[i] = 'a';
+            } else {
+                largeMessageData[i] = 'z';
+            }
+        }
+
+        try {
+            // allow the broker to start
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            throw new JMSException(e.getMessage());
+        }
+
+        ActiveMQConnectionFactory fac = getConnectionFactory();
+        producerConnection = fac.createConnection();
+        setPrefetchPolicy((ActiveMQConnection)producerConnection);
+        producerConnection.start();
+
+        consumerConnection = fac.createConnection();
+        setPrefetchPolicy((ActiveMQConnection)consumerConnection);
+        consumerConnection.setClientID(idGen.generateId());
+        consumerConnection.start();
+        producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = producerSession.createProducer(createDestination());
+        producer.setDeliveryMode(deliveryMode);
+        consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = createConsumer();
+        consumer.setMessageListener(this);
+        LOG.info("Setup complete");
+    }
+
+    protected void setPrefetchPolicy(ActiveMQConnection activeMQConnection) {
+        activeMQConnection.getPrefetchPolicy().setTopicPrefetch(prefetchValue);
+        activeMQConnection.getPrefetchPolicy().setQueuePrefetch(prefetchValue);
+        activeMQConnection.getPrefetchPolicy().setDurableTopicPrefetch(prefetchValue);
+        activeMQConnection.getPrefetchPolicy().setQueueBrowserPrefetch(prefetchValue);
+        activeMQConnection.getPrefetchPolicy().setOptimizeDurableTopicPrefetch(prefetchValue);
+    }
+
+    public void tearDown() throws Exception {
+        Thread.sleep(1000);
+        producerConnection.close();
+        consumerConnection.close();
+
+        super.tearDown();
+
+        largeMessageData = null;
+    }
+
+    protected boolean isSame(BytesMessage msg1) throws Exception {
+        boolean result = false;
+        ((ActiveMQMessage)msg1).setReadOnlyBody(true);
+
+        for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) {
+            result = msg1.readByte() == largeMessageData[i];
+            if (!result) {
+                break;
+            }
+        }
+
+        return result;
+    }
+
+    public void onMessage(Message msg) {
+        try {
+            BytesMessage ba = (BytesMessage)msg;
+            validMessageConsumption &= isSame(ba);
+            assertTrue(ba.getBodyLength() == LARGE_MESSAGE_SIZE);
+            if (messageCount.incrementAndGet() >= MESSAGE_COUNT) {
+                synchronized (messageCount) {
+                    messageCount.notify();
+                }
+            }
+            LOG.info("got message = " + messageCount);
+            if (messageCount.get() % 50 == 0) {
+                LOG.info("count = " + messageCount);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void testLargeMessages() throws Exception {
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            LOG.info("Sending message: " + i);
+            BytesMessage msg = producerSession.createBytesMessage();
+            msg.writeBytes(largeMessageData);
+            producer.send(msg);
+        }
+        long now = System.currentTimeMillis();
+        while (now + 60000 > System.currentTimeMillis() && messageCount.get() < MESSAGE_COUNT) {
+            LOG.info("message count = " + messageCount);
+            synchronized (messageCount) {
+                messageCount.wait(1000);
+            }
+        }
+        LOG.info("Finished count = " + messageCount);
+        assertTrue("Not enough messages - expected " + MESSAGE_COUNT + " but got " + messageCount, messageCount.get() == MESSAGE_COUNT);
+        assertTrue("received messages are not valid", validMessageConsumption);
+        Thread.sleep(1000);
+        LOG.info("FINAL count = " + messageCount);
+    }
+}


Mime
View raw message