activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [25/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:36:56 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java
new file mode 100644
index 0000000..9cd240f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+/**
+ * An AMQ-1282 Test
+ */
+public class AMQ1282 extends TestCase {
+    private ConnectionFactory factory;
+    private Connection connection;
+    private MapMessage message;
+
+    @Override
+    protected void setUp() throws Exception {
+        factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        message = session.createMapMessage();
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        connection.close();
+        super.tearDown();
+    }
+
+    public void testUnmappedBooleanMessage() throws JMSException {
+        Object expected;
+        try {
+            expected = Boolean.valueOf(null);
+        } catch (Exception ex) {
+            expected = ex;
+        }
+        try {
+            Boolean actual = message.getBoolean("foo");
+            assertEquals(expected, actual);
+        } catch (Exception ex) {
+            assertEquals(expected, ex);
+        }
+    }
+
+    public void testUnmappedIntegerMessage() throws JMSException {
+        Object expected;
+        try {
+            expected = Integer.valueOf(null);
+        } catch (Exception ex) {
+            expected = ex;
+        }
+        try {
+            Integer actual = message.getInt("foo");
+            assertEquals(expected, actual);
+        } catch (Exception ex) {
+            Class<?> aClass = expected.getClass();
+            assertTrue(aClass.isInstance(ex));
+        }
+    }
+
+    public void testUnmappedShortMessage() throws JMSException {
+        Object expected;
+        try {
+            expected = Short.valueOf(null);
+        } catch (Exception ex) {
+            expected = ex;
+        }
+        try {
+            Short actual = message.getShort("foo");
+            assertEquals(expected, actual);
+        } catch (Exception ex) {
+            Class<?> aClass = expected.getClass();
+            assertTrue(aClass.isInstance(ex));
+        }
+    }
+
+    public void testUnmappedLongMessage() throws JMSException {
+        Object expected;
+        try {
+            expected = Long.valueOf(null);
+        } catch (Exception ex) {
+            expected = ex;
+        }
+        try {
+            Long actual = message.getLong("foo");
+            assertEquals(expected, actual);
+        } catch (Exception ex) {
+            Class<?> aClass = expected.getClass();
+            assertTrue(aClass.isInstance(ex));
+        }
+    }
+
+    public void testUnmappedStringMessage() throws JMSException {
+        Object expected;
+        try {
+            expected = String.valueOf(null);
+        } catch (Exception ex) {
+            expected = ex;
+        }
+        try {
+            String actual = message.getString("foo");
+            assertEquals(expected, actual);
+        } catch (Exception ex) {
+            Class<?> aClass = expected.getClass();
+            assertTrue(aClass.isInstance(ex));
+        }
+    }
+
+    public void testUnmappedCharMessage() throws JMSException {
+        try {
+            message.getChar("foo");
+            fail("should have thrown NullPointerException");
+        } catch (NullPointerException success) {
+            assertNotNull(success);
+        }
+    }
+
+    public void testUnmappedByteMessage() throws JMSException {
+        Object expected;
+        try {
+            expected = Byte.valueOf(null);
+        } catch (Exception ex) {
+            expected = ex;
+        }
+        try {
+            Byte actual = message.getByte("foo");
+            assertEquals(expected, actual);
+        } catch (Exception ex) {
+            Class<?> aClass = expected.getClass();
+            assertTrue(aClass.isInstance(ex));
+        }
+    }
+
+    public void testUnmappedDoubleMessage() throws JMSException {
+        Object expected;
+        try {
+            expected = Double.valueOf(null);
+        } catch (Exception ex) {
+            expected = ex;
+        }
+        try {
+            Double actual = message.getDouble("foo");
+            assertEquals(expected, actual);
+        } catch (Exception ex) {
+            Class<?> aClass = expected.getClass();
+            assertTrue(aClass.isInstance(ex));
+        }
+    }
+
+    public void testUnmappedFloatMessage() throws JMSException {
+        Object expected;
+        try {
+            expected = Float.valueOf(null);
+        } catch (Exception ex) {
+            expected = ex;
+        }
+        try {
+            Float actual = message.getFloat("foo");
+            assertEquals(expected, actual);
+        } catch (Exception ex) {
+            Class<?> aClass = expected.getClass();
+            assertTrue(aClass.isInstance(ex));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java
new file mode 100644
index 0000000..8e636d0
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.spring.ConsumerBean;
+
+/**
+ *
+ *
+ */
+public class AMQ1687Test extends EmbeddedBrokerTestSupport {
+
+    private Connection connection;
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        //prefetch change is not required, but test will not fail w/o it, only spew errors in the AMQ log.
+        return new ActiveMQConnectionFactory(
+                broker.getTransportConnectors().get(0).getPublishableConnectString() +"?jms.prefetchPolicy.all=5");
+    }
+
+    public void testVirtualTopicCreation() throws Exception {
+        if (connection == null) {
+            connection = createConnection();
+        }
+        connection.start();
+
+        ConsumerBean messageList = new ConsumerBean();
+        messageList.setVerbose(true);
+
+        String queueAName = getVirtualTopicConsumerName();
+        String queueBName = getVirtualTopicConsumerNameB();
+
+        // create consumer 'cluster'
+        ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
+        ActiveMQQueue queue2 = new ActiveMQQueue(queueBName);
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer c1 = session.createConsumer(queue1);
+        MessageConsumer c2 = session.createConsumer(queue2);
+
+        c1.setMessageListener(messageList);
+        c2.setMessageListener(messageList);
+
+        // create topic producer
+        ActiveMQTopic topic = new ActiveMQTopic(getVirtualTopicName());
+        MessageProducer producer = session.createProducer(topic);
+        assertNotNull(producer);
+
+        int total = 100;
+        for (int i = 0; i < total; i++) {
+            producer.send(session.createTextMessage("message: " + i));
+        }
+
+        messageList.assertMessagesArrived(total*2);
+    }
+
+    protected String getVirtualTopicName() {
+        return "VirtualTopic.TEST";
+    }
+
+    protected String getVirtualTopicConsumerName() {
+        return "Consumer.A.VirtualTopic.TEST";
+    }
+
+    protected String getVirtualTopicConsumerNameB() {
+        return "Consumer.B.VirtualTopic.TEST";
+    }
+
+    protected void setUp() throws Exception {
+        this.bindAddress="tcp://localhost:0";
+        super.setUp();
+    }
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java
new file mode 100644
index 0000000..a3c3b1d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.*;
+
+import java.net.URI;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageProducer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test validates that the AMQ consumer blocks on redelivery of a message,
+ * through all redeliveries, until the message is either successfully consumed
+ * or sent to the DLQ.
+ */
+public class AMQ1853Test {
+    private static BrokerService broker;
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ1853Test.class);
+    static final String jmsConnectionURI = "failover:(vm://localhost)";
+
+    // Virtual Topic that the test publishes 10 messages to
+    private static final String queueFail = "Queue.BlockingConsumer.QueueFail";
+
+    // Number of messages
+
+    private final int producerMessages = 5;
+    private final int totalNumberMessages = producerMessages * 2;
+    private final int maxRedeliveries = 2;
+    private final int redeliveryDelay = 1000;
+
+    private Map<String, AtomicInteger> messageList = null;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false"));
+        broker.setUseJmx(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        }
+    }
+
+    @Test
+    public void testConsumerMessagesAreNotOrdered() throws Exception {
+
+        TestConsumer consumerAllFail = null;
+        messageList = new Hashtable<String, AtomicInteger>();
+
+        try {
+
+            // The first 2 consumers will rollback, ultimately causing messages to land on the DLQ
+
+            TestProducer producerAllFail = new TestProducer(queueFail);
+            thread(producerAllFail, false);
+
+            consumerAllFail = new TestConsumer(queueFail, true);
+            thread(consumerAllFail, false);
+
+            // Give the consumers a second to start
+            Thread.sleep(1000);
+
+            thread(producerAllFail, false);
+
+            // Give the consumers a second to start
+            Thread.sleep(1000);
+
+            producerAllFail.getLatch().await();
+
+            LOG.info("producer successful, count = " + producerAllFail.getLatch().getCount());
+            LOG.info("final message list size =  " + messageList.size());
+
+            assertTrue("message list size =  " + messageList.size() + " exptected:" + totalNumberMessages,
+                Wait.waitFor(new Condition() {
+                    @Override
+                    public boolean isSatisified() throws Exception {
+                        return totalNumberMessages == messageList.size();
+                    }
+                }));
+
+            consumerAllFail.getLatch().await();
+
+            LOG.info("consumerAllFail successful, count = " + consumerAllFail.getLatch().getCount());
+
+            Iterator<String> keys = messageList.keySet().iterator();
+            for (AtomicInteger counter : messageList.values()) {
+                String message = keys.next();
+                LOG.info("final count for message " + message + " counter =  " + counter.get());
+                assertTrue("for message " + message + " counter =  " + counter.get(), counter.get() == maxRedeliveries + 1);
+            }
+
+            assertFalse(consumerAllFail.messageReceiptIsOrdered());
+        } finally {
+            if (consumerAllFail != null) {
+                consumerAllFail.setStop(true);
+            }
+        }
+    }
+
+    private static Thread thread(Runnable runnable, boolean daemon) {
+        Thread brokerThread = new Thread(runnable);
+        brokerThread.setDaemon(daemon);
+        brokerThread.start();
+        return brokerThread;
+    }
+
+    private class TestProducer implements Runnable {
+
+        private CountDownLatch latch = null;
+        private String destinationName = null;
+
+        public TestProducer(String destinationName) {
+            this.destinationName = destinationName;
+            // We run the producer 2 times
+            latch = new CountDownLatch(totalNumberMessages);
+        }
+
+        public CountDownLatch getLatch() {
+            return latch;
+        }
+
+        public void run() {
+
+            ActiveMQConnectionFactory connectionFactory = null;
+            ActiveMQConnection connection = null;
+            ActiveMQSession session = null;
+            Destination destination = null;
+
+            try {
+                LOG.info("Started TestProducer for destination (" + destinationName + ")");
+
+                connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI);
+                connection = (ActiveMQConnection) connectionFactory.createConnection();
+                connection.setCopyMessageOnSend(false);
+                connection.start();
+                session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+                destination = session.createQueue(this.destinationName);
+
+                // Create a MessageProducer from the Session to the Topic or Queue
+                ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
+                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+                for (int i = 0; i < (producerMessages); i++) {
+                    TextMessage message = (TextMessage) session.createTextMessage();
+                    message.setLongProperty("TestTime", (System.currentTimeMillis()));
+                    try {
+                        producer.send(message);
+                        LOG.info("Producer (" + destinationName + ")\n" + message.getJMSMessageID() + " = sent messageId\n");
+
+                        latch.countDown();
+                        LOG.info(" Latch count  " + latch.getCount());
+                        LOG.info("Producer message list size = " + messageList.keySet().size());
+                        messageList.put(message.getJMSMessageID(), new AtomicInteger(0));
+                        LOG.info("Producer message list size = " + messageList.keySet().size());
+
+                    } catch (Exception deeperException) {
+                        LOG.info("Producer for destination (" + destinationName + ") Caught: " + deeperException);
+                    }
+
+                    Thread.sleep(1000);
+                }
+
+                LOG.info("Finished TestProducer for destination (" + destinationName + ")");
+
+            } catch (Exception e) {
+                LOG.error("Terminating TestProducer(" + destinationName + ")Caught: " + e);
+            } finally {
+                try {
+                    if (session != null) {
+                        session.close();
+                    }
+                    if (connection != null) {
+                        connection.close();
+                    }
+                } catch (Exception e) {
+                    LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e);
+                }
+            }
+        }
+    }
+
+    private class TestConsumer implements Runnable, ExceptionListener, MessageListener {
+
+        private CountDownLatch latch = null;
+        private int receivedMessageCounter = 0;
+        private boolean bFakeFail = false;
+        String destinationName = null;
+        boolean bMessageReceiptIsOrdered = true;
+        boolean bStop = false;
+        String previousMessageId = null;
+
+        private ActiveMQConnectionFactory connectionFactory = null;
+        private ActiveMQConnection connection = null;
+        private Session session = null;
+        private MessageConsumer consumer = null;
+
+        public TestConsumer(String destinationName, boolean bFakeFail) {
+            this.bFakeFail = bFakeFail;
+            latch = new CountDownLatch(totalNumberMessages * (this.bFakeFail ? (maxRedeliveries + 1) : 1));
+            this.destinationName = destinationName;
+        }
+
+        public CountDownLatch getLatch() {
+            return latch;
+        }
+
+        public boolean messageReceiptIsOrdered() {
+            return bMessageReceiptIsOrdered;
+        }
+
+        public void run() {
+
+            try {
+                LOG.info("Started TestConsumer for destination (" + destinationName + ")");
+
+                connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI);
+                connection = (ActiveMQConnection) connectionFactory.createConnection();
+                connection.setNonBlockingRedelivery(true);
+                session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+                RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+                policy.setInitialRedeliveryDelay(redeliveryDelay);
+                policy.setBackOffMultiplier(-1);
+                policy.setRedeliveryDelay(redeliveryDelay);
+                policy.setMaximumRedeliveryDelay(-1);
+                policy.setUseExponentialBackOff(false);
+                policy.setMaximumRedeliveries(maxRedeliveries);
+
+                connection.setExceptionListener(this);
+                Destination destination = session.createQueue(destinationName);
+                consumer = session.createConsumer(destination);
+                consumer.setMessageListener(this);
+
+                connection.start();
+
+                while (!bStop) {
+                    Thread.sleep(100);
+                }
+
+                LOG.info("Finished TestConsumer for destination name (" + destinationName + ") remaining " + this.latch.getCount()
+                        + " messages " + this.toString());
+
+            } catch (Exception e) {
+                LOG.error("Consumer (" + destinationName + ") Caught: " + e);
+            } finally {
+                try {
+                    if (consumer != null) {
+                        consumer.close();
+                    }
+                    if (session != null) {
+                        session.close();
+                    }
+                    if (connection != null) {
+                        connection.close();
+                    }
+                } catch (Exception e) {
+                    LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e);
+                }
+            }
+        }
+
+        public synchronized void onException(JMSException ex) {
+            LOG.error("Consumer for destination, (" + destinationName + "), JMS Exception occured.  Shutting down client.");
+        }
+
+        public synchronized void setStop(boolean bStop) {
+            this.bStop = bStop;
+        }
+
+        public synchronized void onMessage(Message message) {
+            receivedMessageCounter++;
+            latch.countDown();
+
+            LOG.info("Consumer for destination (" + destinationName + ") latch countdown: " + latch.getCount() +
+                     " :: Number messages received " + this.receivedMessageCounter);
+
+            try {
+
+                if (receivedMessageCounter % (maxRedeliveries + 1) == 1) {
+                    previousMessageId = message.getJMSMessageID();
+                }
+
+                if (bMessageReceiptIsOrdered) {
+                    bMessageReceiptIsOrdered = previousMessageId.trim().equals(message.getJMSMessageID());
+                }
+
+                final String jmsMessageId = message.getJMSMessageID();
+                assertTrue("Did not find expected ", Wait.waitFor(new Wait.Condition() {
+                    @Override
+                    public boolean isSatisified() throws Exception {
+                        return messageList.containsKey(jmsMessageId);
+                    }
+                }));
+
+                AtomicInteger counter = messageList.get(jmsMessageId);
+                counter.incrementAndGet();
+
+                LOG.info("Consumer for destination (" + destinationName + ")\n" + message.getJMSMessageID() + " = currentMessageId\n"
+                        + previousMessageId + " = previousMessageId\n" + bMessageReceiptIsOrdered + "= bMessageReceiptIsOrdered\n"
+                        + ">>LATENCY " + (System.currentTimeMillis() - message.getLongProperty("TestTime")) + "\n" + "message counter = "
+                        + counter.get());
+
+                if (!bFakeFail) {
+                    LOG.debug("Consumer on destination " + destinationName + " committing JMS Session for message: " + message.toString());
+                    session.commit();
+                } else {
+                    LOG.debug("Consumer on destination " + destinationName + " rolling back JMS Session for message: " + message.toString());
+                    session.rollback(); // rolls back all the consumed messages on the session to
+                }
+
+            } catch (Exception ex) {
+                ex.printStackTrace();
+                LOG.error("Error reading JMS Message from destination " + destinationName + ".");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java
new file mode 100644
index 0000000..1bdd72e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.leveldb.LevelDBStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a test case for the issue reported at:
+ * https://issues.apache.org/activemq/browse/AMQ-1866
+ *
+ * If you have a JMS producer sending messages to multiple fast consumers and
+ * one slow consumer, eventually all consumers will run as slow as
+ * the slowest consumer.
+ */
+public class AMQ1866 extends TestCase {
+
+    private static final Logger log = LoggerFactory.getLogger(ConsumerThread.class);
+    private BrokerService brokerService;
+    private ArrayList<Thread> threads = new ArrayList<Thread>();
+
+    private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0";
+    private String ACTIVEMQ_BROKER_URI;
+
+    AtomicBoolean shutdown = new AtomicBoolean();
+    private ActiveMQQueue destination;
+
+    @Override
+    protected void setUp() throws Exception {
+        // Start an embedded broker up.
+        brokerService = new BrokerService();
+        LevelDBStore adaptor = new LevelDBStore();
+        brokerService.setPersistenceAdapter(adaptor);
+        brokerService.deleteAllMessages();
+
+        // A small max page size makes this issue occur faster.
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry pe = new PolicyEntry();
+        pe.setMaxPageSize(1);
+        policyMap.put(new ActiveMQQueue(">"), pe);
+        brokerService.setDestinationPolicy(policyMap);
+
+        brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
+        brokerService.start();
+
+        ACTIVEMQ_BROKER_URI = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
+        destination = new ActiveMQQueue(getName());
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        // Stop any running threads.
+        shutdown.set(true);
+        for (Thread t : threads) {
+            t.interrupt();
+            t.join();
+        }
+        brokerService.stop();
+    }
+
+    public void testConsumerSlowDownPrefetch0() throws Exception {
+        ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=0";
+        doTestConsumerSlowDown();
+    }
+
+    public void testConsumerSlowDownPrefetch10() throws Exception {
+        ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=10";
+        doTestConsumerSlowDown();
+    }
+
+    public void testConsumerSlowDownDefaultPrefetch() throws Exception {
+        doTestConsumerSlowDown();
+    }
+
+    public void doTestConsumerSlowDown() throws Exception {
+
+        // Preload the queue.
+        produce(20000);
+
+        Thread producer = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    while(!shutdown.get()) {
+                        produce(1000);
+                    }
+                } catch (Exception e) {
+                }
+            }
+        };
+        threads.add(producer);
+        producer.start();
+
+        // This is the slow consumer.
+        ConsumerThread c1 = new ConsumerThread("Consumer-1");
+        threads.add(c1);
+        c1.start();
+
+        // Wait a bit so that the slow consumer gets assigned most of the messages.
+        Thread.sleep(500);
+        ConsumerThread c2 = new ConsumerThread("Consumer-2");
+        threads.add(c2);
+        c2.start();
+
+        int totalReceived = 0;
+        for ( int i=0; i < 30; i++) {
+            Thread.sleep(1000);
+            long c1Counter = c1.counter.getAndSet(0);
+            long c2Counter = c2.counter.getAndSet(0);
+            log.debug("c1: "+c1Counter+", c2: "+c2Counter);
+            totalReceived += c1Counter;
+            totalReceived += c2Counter;
+
+            // Once message have been flowing for a few seconds, start asserting that c2 always gets messages.  It should be receiving about 100 / sec
+            if( i > 10 ) {
+                assertTrue("Total received=" + totalReceived + ", Consumer 2 should be receiving new messages every second.", c2Counter > 0);
+            }
+        }
+    }
+
+    public void produce(int count) throws Exception {
+        Connection connection=null;
+        try {
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
+            factory.setDispatchAsync(true);
+
+            connection = factory.createConnection();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(destination);
+            connection.start();
+
+            for( int i=0 ; i< count; i++ ) {
+                producer.send(session.createTextMessage(getName()+" Message "+(++i)));
+            }
+
+        } finally {
+            try {
+                connection.close();
+            } catch (Throwable e) {
+            }
+        }
+    }
+
+    public class ConsumerThread extends Thread {
+        final AtomicLong counter = new AtomicLong();
+
+        public ConsumerThread(String threadId) {
+            super(threadId);
+        }
+
+        public void run() {
+            Connection connection=null;
+            try {
+                log.debug(getName() + ": is running");
+
+                ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
+                factory.setDispatchAsync(true);
+
+                connection = factory.createConnection();
+
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer = session.createConsumer(destination);
+                connection.start();
+
+                while (!shutdown.get()) {
+                    TextMessage msg = (TextMessage)consumer.receive(1000);
+                    if ( msg!=null ) {
+                        int sleepingTime;
+                        if (getName().equals("Consumer-1")) {
+                            sleepingTime = 1000 * 1000;
+                        } else {
+                            sleepingTime = 1;
+                        }
+                        counter.incrementAndGet();
+                        Thread.sleep(sleepingTime);
+                    }
+                }
+
+            } catch (Exception e) {
+            } finally {
+                log.debug(getName() + ": is stopping");
+                try {
+                    connection.close();
+                } catch (Throwable e) {
+                }
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java
new file mode 100644
index 0000000..f5ccd50
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AMQ1893Test extends TestCase {
+
+    private static final Logger log = LoggerFactory.getLogger(AMQ1893Test.class);
+
+    static final String QUEUE_NAME = "TEST";
+
+    static final int MESSAGE_COUNT_OF_ONE_GROUP = 10000;
+
+    static final int[] PRIORITIES = new int[]{0, 5, 10};
+
+    static final boolean debug = false;
+
+    private BrokerService brokerService;
+
+    private ActiveMQQueue destination;
+
+    @Override
+    protected void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.start();
+        destination = new ActiveMQQueue(QUEUE_NAME);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        // Stop any running threads.
+        brokerService.stop();
+    }
+
+
+    public void testProduceConsumeWithSelector() throws Exception {
+        new TestProducer().produceMessages();
+        new TestConsumer().consume();
+    }
+
+    
+    class TestProducer {
+
+        public void produceMessages() throws Exception {
+            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+                    brokerService.getTransportConnectors().get(0).getConnectUri().toString()
+            );
+            Connection connection = connectionFactory.createConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createQueue(QUEUE_NAME);
+            MessageProducer producer = session.createProducer(destination);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+            long start = System.currentTimeMillis();
+
+            for (int priority : PRIORITIES) {
+
+                String name = null;
+                if (priority == 10) {
+                    name = "high";
+                } else if (priority == 5) {
+                    name = "mid";
+                } else {
+                    name = "low";
+                }
+
+                for (int i = 1; i <= MESSAGE_COUNT_OF_ONE_GROUP; i++) {
+
+                    TextMessage message = session.createTextMessage(name + "_" + i);
+                    message.setIntProperty("priority", priority);
+
+                    producer.send(message);
+                }
+            }
+
+            long end = System.currentTimeMillis();
+
+            log.info("sent " + (MESSAGE_COUNT_OF_ONE_GROUP * 3) + " messages in " + (end - start) + " ms");
+
+            producer.close();
+            session.close();
+            connection.close();
+        }
+    }
+
+    class TestConsumer {
+
+        private CountDownLatch finishLatch = new CountDownLatch(1);
+
+
+
+        public void consume() throws Exception {
+            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+                    brokerService.getTransportConnectors().get(0).getConnectUri().toString()
+            );
+
+
+            final int totalMessageCount = MESSAGE_COUNT_OF_ONE_GROUP * PRIORITIES.length;
+            final AtomicInteger counter = new AtomicInteger();
+            final MessageListener listener = new MessageListener() {
+                public void onMessage(Message message) {
+
+                    if (debug) {
+                        try {
+                            log.info(((TextMessage) message).getText());
+                        } catch (JMSException e) {
+                            e.printStackTrace();
+                        }
+                    }
+
+                    if (counter.incrementAndGet() == totalMessageCount) {
+
+                            finishLatch.countDown();
+
+                    }
+                }
+            };
+
+            int consumerCount = PRIORITIES.length;
+            Connection[] connections = new Connection[consumerCount];
+            Session[] sessions = new Session[consumerCount];
+            MessageConsumer[] consumers = new MessageConsumer[consumerCount];
+
+            for (int i = 0; i < consumerCount; i++) {
+                String selector = "priority = " + PRIORITIES[i];
+
+                connections[i] = connectionFactory.createConnection();
+                sessions[i] = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+                consumers[i] = sessions[i].createConsumer(destination, selector);
+                consumers[i].setMessageListener(listener);
+            }
+
+            for (Connection connection : connections) {
+                connection.start();
+            }
+
+            log.info("received " + counter.get() + " messages");
+
+            assertTrue("got all messages in time", finishLatch.await(60, TimeUnit.SECONDS));
+
+            log.info("received " + counter.get() + " messages");
+
+            for (MessageConsumer consumer : consumers) {
+                consumer.close();
+            }
+
+            for (Session session : sessions) {
+                session.close();
+            }
+
+            for (Connection connection : connections) {
+                connection.close();
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
new file mode 100644
index 0000000..f896342
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import junit.framework.TestCase;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+
+
+public class AMQ1917Test extends TestCase {
+
+        private static final int NUM_MESSAGES = 4000;
+        private static final int NUM_THREADS = 10;
+        private static final String REQUEST_QUEUE = "mock.in.queue";
+        private static final String REPLY_QUEUE = "mock.out.queue";
+
+        private Destination requestDestination = ActiveMQDestination.createDestination(
+                REQUEST_QUEUE, ActiveMQDestination.QUEUE_TYPE);
+        private Destination replyDestination = ActiveMQDestination.createDestination(
+                REPLY_QUEUE, ActiveMQDestination.QUEUE_TYPE);
+
+        private CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES);
+        private CountDownLatch errorLatch = new CountDownLatch(1);
+        private ThreadPoolExecutor tpe;
+        private final String BROKER_URL = "tcp://localhost:0";
+        private String connectionUri;
+        private BrokerService broker = null;
+        private boolean working = true;
+
+        // trival session/producer pool
+        final Session[] sessions = new Session[NUM_THREADS];
+        final MessageProducer[] producers = new MessageProducer[NUM_THREADS];
+
+        public void setUp() throws Exception {
+            broker = new BrokerService();
+            broker.setPersistent(false);
+            broker.addConnector(BROKER_URL);
+            broker.start();
+
+            connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+
+            BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000);
+            tpe = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60000,
+                    TimeUnit.MILLISECONDS, queue);
+            ThreadFactory limitedthreadFactory = new LimitedThreadFactory(tpe.getThreadFactory());
+            tpe.setThreadFactory(limitedthreadFactory);
+        }
+
+        public void tearDown() throws Exception {
+            broker.stop();
+            tpe.shutdown();
+        }
+
+        public void testLoadedSendRecieveWithCorrelationId() throws Exception {
+
+            ActiveMQConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory();
+            connectionFactory.setBrokerURL(connectionUri);
+            Connection connection = connectionFactory.createConnection();
+            setupReceiver(connection);
+
+            connection = connectionFactory.createConnection();
+            connection.start();
+
+            // trival session/producer pool
+            for (int i=0; i<NUM_THREADS; i++) {
+                sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                producers[i] = sessions[i].createProducer(requestDestination);
+            }
+
+            for (int i = 0; i < NUM_MESSAGES; i++) {
+                MessageSenderReceiver msr = new MessageSenderReceiver(requestDestination,
+                        replyDestination, "Test Message : " + i);
+                tpe.execute(msr);
+            }
+
+            while (!roundTripLatch.await(4000, TimeUnit.MILLISECONDS)) {
+                if (errorLatch.await(1000, TimeUnit.MILLISECONDS)) {
+                    fail("there was an error, check the console for thread or thread allocation failure");
+                    break;
+                }
+            }
+            working = false;
+        }
+
+        private void setupReceiver(final Connection connection) throws Exception {
+
+            final Session session = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = session
+                    .createConsumer(requestDestination);
+            final MessageProducer sender = session.createProducer(replyDestination);
+            connection.start();
+
+            new Thread() {
+                public void run() {
+                    while (working) {
+                        // wait for messages in infinitive loop
+                        // time out is set to show the client is awaiting
+                        try {
+                            TextMessage msg = (TextMessage) consumer.receive(20000);
+                            if (msg == null) {
+                                errorLatch.countDown();
+                                fail("Response timed out."
+                                        + " latchCount=" + roundTripLatch.getCount());
+                            } else {
+                                String result = msg.getText();
+                                //System.out.println("Request:" + (i++)
+                                //        + ", msg=" + result + ", ID" + msg.getJMSMessageID());
+                                TextMessage response = session.createTextMessage();
+                                response.setJMSCorrelationID(msg.getJMSMessageID());
+                                response.setText(result);
+                                sender.send(response);
+                            }
+                        } catch (JMSException e) {
+                            if (working) {
+                                errorLatch.countDown();
+                                fail("Unexpected exception:" + e);
+                            }
+                        }
+                    }
+                }
+            }.start();
+        }
+
+        class MessageSenderReceiver implements Runnable {
+
+            Destination reqDest;
+            Destination replyDest;
+            String origMsg;
+
+            public MessageSenderReceiver(Destination reqDest,
+                    Destination replyDest, String msg) throws Exception {
+                this.replyDest = replyDest;
+                this.reqDest = reqDest;
+                this.origMsg = msg;
+            }
+
+            private int getIndexFromCurrentThread() {
+                String name = Thread.currentThread().getName();
+                String num = name.substring(name.lastIndexOf('-') +1);
+                int idx = Integer.parseInt(num) -1;
+                assertTrue("idx is in range: idx=" + idx,  idx < NUM_THREADS);
+                return idx;
+            }
+
+            public void run() {
+                try {
+                    // get thread session and producer from pool
+                    int threadIndex = getIndexFromCurrentThread();
+                    Session session = sessions[threadIndex];
+                    MessageProducer producer = producers[threadIndex];
+
+                    final Message sendJmsMsg = session.createTextMessage(origMsg);
+                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                    producer.send(sendJmsMsg);
+
+                    String jmsId = sendJmsMsg.getJMSMessageID();
+                    String selector = "JMSCorrelationID='" + jmsId + "'";
+
+                    MessageConsumer consumer = session.createConsumer(replyDest,
+                            selector);
+                    Message receiveJmsMsg = consumer.receive(2000);
+                    consumer.close();
+                    if (receiveJmsMsg == null) {
+                        errorLatch.countDown();
+                        fail("Unable to receive response for:" + origMsg
+                                + ", with selector=" + selector);
+                    } else {
+                        //System.out.println("received response message :"
+                        //        + ((TextMessage) receiveJmsMsg).getText()
+                        //        + " with selector : " + selector);
+                        roundTripLatch.countDown();
+                    }
+                } catch (JMSException e) {
+                    fail("unexpected exception:" + e);
+                }
+            }
+        }
+
+        public class LimitedThreadFactory implements ThreadFactory {
+            int threadCount;
+            private ThreadFactory factory;
+            public LimitedThreadFactory(ThreadFactory threadFactory) {
+                this.factory = threadFactory;
+            }
+
+            public Thread newThread(Runnable arg0) {
+                if (++threadCount > NUM_THREADS) {
+                    errorLatch.countDown();
+                    fail("too many threads requested");
+                }
+                return factory.newThread(arg0);
+            }
+        }
+    }
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
new file mode 100644
index 0000000..2c86562
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.TextMessage;
+import javax.naming.NamingException;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.AutoFailTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
+import org.apache.log4j.Logger;
+
+/**
+ * A AMQ1936Test
+ *
+ */
+public class AMQ1936Test extends TestCase {
+    private final static Logger logger = Logger.getLogger(AMQ1936Test.class);
+    private final static String TEST_QUEUE_NAME = "dynamicQueues/duplicate.message.test.queue";
+    // //--
+    //
+    private final static long TEST_MESSAGE_COUNT = 6000; // The number of test messages to use
+    //
+    // //--
+    private final static int CONSUMER_COUNT = 2; // The number of message receiver instances
+    private final static boolean TRANSACTED_RECEIVE = true; // Flag used by receiver which indicates messages should be
+                                                            // processed within a JMS transaction
+
+    private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CONSUMER_COUNT, CONSUMER_COUNT, Long.MAX_VALUE, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>());
+    private final ThreadedMessageReceiver[] receivers = new ThreadedMessageReceiver[CONSUMER_COUNT];
+    private BrokerService broker = null;
+    static QueueConnectionFactory connectionFactory = null;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        broker = new BrokerService();
+        broker.getSystemUsage().getMemoryUsage().setLimit(5 * 1024 * 1024);
+        broker.setBrokerName("test");
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.start();
+        connectionFactory = new ActiveMQConnectionFactory("vm://test");
+        ;
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+
+        if (threadPool != null) {
+            // signal receivers to stop
+            for (ThreadedMessageReceiver receiver : receivers) {
+                receiver.setShouldStop(true);
+            }
+
+            logger.info("Waiting for receivers to shutdown..");
+            if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
+                logger.warn("Not all receivers completed shutdown.");
+            } else {
+                logger.info("All receivers shutdown successfully..");
+            }
+        }
+
+        logger.debug("Stoping the broker.");
+
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    private void sendTextMessage(String queueName, int i) throws JMSException, NamingException {
+        QueueConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test");
+        QueueConnection queueConnection = null;
+        QueueSession session = null;
+        QueueSender sender = null;
+        Queue queue = null;
+        TextMessage message = null;
+
+        try {
+
+            // Create the queue connection
+            queueConnection = connectionFactory.createQueueConnection();
+
+            session = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
+            queue = session.createQueue(TEST_QUEUE_NAME);
+            sender = session.createSender(queue);
+            sender.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+            message = session.createTextMessage(String.valueOf(i));
+
+            // send the message
+            sender.send(message);
+
+            if (session.getTransacted()) {
+                session.commit();
+            }
+            if (i % 1000 == 0) {
+                logger.info("Message successfully sent to : " + queue.getQueueName() + " messageid: " + message.getJMSMessageID() + " content:"
+                    + message.getText());
+            }
+        } finally {
+            if (sender != null) {
+                sender.close();
+            }
+            if (session != null) {
+                session.close();
+            }
+            if (queueConnection != null) {
+                queueConnection.close();
+            }
+        }
+    }
+
+    public void testForDuplicateMessages() throws Exception {
+        final ConcurrentHashMap<String, String> messages = new ConcurrentHashMap<String, String>();
+        final Object lock = new Object();
+        final CountDownLatch duplicateSignal = new CountDownLatch(1);
+        final AtomicInteger messageCount = new AtomicInteger(0);
+
+        // add 1/2 the number of our total messages
+        for (int i = 0; i < TEST_MESSAGE_COUNT / 2; i++) {
+            if (duplicateSignal.getCount() == 0) {
+                fail("Duplicate message id detected");
+            }
+            sendTextMessage(TEST_QUEUE_NAME, i);
+        }
+
+        // create a number of consumers to read of the messages and start them with a handler which simply stores the
+        // message ids
+        // in a Map and checks for a duplicate
+        for (int i = 0; i < CONSUMER_COUNT; i++) {
+            receivers[i] = new ThreadedMessageReceiver(TEST_QUEUE_NAME, new IMessageHandler() {
+
+                @Override
+                public void onMessage(Message message) throws Exception {
+                    synchronized (lock) {
+                        int current = messageCount.incrementAndGet();
+                        if (current % 1000 == 0) {
+                            logger.info("Received message:" + message.getJMSMessageID() + " with content: " + ((TextMessage) message).getText());
+                        }
+                        if (messages.containsKey(message.getJMSMessageID())) {
+                            duplicateSignal.countDown();
+                            logger.fatal("duplicate message id detected:" + message.getJMSMessageID());
+                            fail("Duplicate message id detected:" + message.getJMSMessageID());
+                        } else {
+                            messages.put(message.getJMSMessageID(), message.getJMSMessageID());
+                        }
+                    }
+                }
+            });
+            threadPool.submit(receivers[i]);
+        }
+
+        // starting adding the remaining messages
+        for (int i = 0; i < TEST_MESSAGE_COUNT / 2; i++) {
+            if (duplicateSignal.getCount() == 0) {
+                fail("Duplicate message id detected");
+            }
+            sendTextMessage(TEST_QUEUE_NAME, i);
+        }
+
+        logger.info("sent all " + TEST_MESSAGE_COUNT + " messages");
+
+        // allow some time for messages to be delivered to receivers.
+        boolean ok = Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return TEST_MESSAGE_COUNT == messages.size();
+            }
+        }, TimeUnit.MINUTES.toMillis(7));
+        if (!ok) {
+            AutoFailTestSupport.dumpAllThreads("--STUCK?--");
+        }
+        assertEquals("Number of messages received does not match the number sent", TEST_MESSAGE_COUNT, messages.size());
+        assertEquals(TEST_MESSAGE_COUNT, messageCount.get());
+    }
+
+    private final static class ThreadedMessageReceiver implements Runnable {
+
+        private IMessageHandler handler = null;
+        private final AtomicBoolean shouldStop = new AtomicBoolean(false);
+
+        public ThreadedMessageReceiver(String queueName, IMessageHandler handler) {
+            this.handler = handler;
+        }
+
+        @Override
+        public void run() {
+
+            QueueConnection queueConnection = null;
+            QueueSession session = null;
+            QueueReceiver receiver = null;
+            Queue queue = null;
+            Message message = null;
+            try {
+                try {
+
+                    queueConnection = connectionFactory.createQueueConnection();
+                    // create a transacted session
+                    session = queueConnection.createQueueSession(TRANSACTED_RECEIVE, QueueSession.AUTO_ACKNOWLEDGE);
+                    queue = session.createQueue(TEST_QUEUE_NAME);
+                    receiver = session.createReceiver(queue);
+
+                    // start the connection
+                    queueConnection.start();
+
+                    logger.info("Receiver " + Thread.currentThread().getName() + " connected.");
+
+                    // start receive loop
+                    while (!(shouldStop.get() || Thread.currentThread().isInterrupted())) {
+                        try {
+                            message = receiver.receive(200);
+                        } catch (Exception e) {
+                            //
+                            // ignore interrupted exceptions
+                            //
+                            if (e instanceof InterruptedException || e.getCause() instanceof InterruptedException) {
+                                /* ignore */
+                            } else {
+                                throw e;
+                            }
+                        }
+
+                        if (message != null && this.handler != null) {
+                            this.handler.onMessage(message);
+                        }
+
+                        // commit session on successful handling of message
+                        if (session.getTransacted()) {
+                            session.commit();
+                        }
+                    }
+
+                    logger.info("Receiver " + Thread.currentThread().getName() + " shutting down.");
+
+                } finally {
+                    if (receiver != null) {
+                        try {
+                            receiver.close();
+                        } catch (JMSException e) {
+                            logger.warn(e);
+                        }
+                    }
+                    if (session != null) {
+                        try {
+                            session.close();
+                        } catch (JMSException e) {
+                            logger.warn(e);
+                        }
+                    }
+                    if (queueConnection != null) {
+                        queueConnection.close();
+                    }
+                }
+            } catch (JMSException e) {
+                logger.error(e);
+                e.printStackTrace();
+            } catch (NamingException e) {
+                logger.error(e);
+            } catch (Exception e) {
+                logger.error(e);
+                e.printStackTrace();
+            }
+        }
+
+        public void setShouldStop(Boolean shouldStop) {
+            this.shouldStop.set(shouldStop);
+        }
+    }
+
+    public interface IMessageHandler {
+        void onMessage(Message message) throws Exception;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
new file mode 100644
index 0000000..9abab3f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a test case for the issue reported at: https://issues.apache.org/activemq/browse/AMQ-2021 Bug is modification
+ * of inflight message properties so the failure can manifest itself in a bunch or ways, from message receipt with null
+ * properties to marshall errors
+ */
+public class AMQ2021Test implements ExceptionListener, UncaughtExceptionHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(AMQ2021Test.class);
+    BrokerService brokerService;
+    ArrayList<Thread> threads = new ArrayList<Thread>();
+    Vector<Throwable> exceptions;
+
+    @Rule
+    public TestName name = new TestName();
+
+    AMQ2021Test testCase;
+
+    private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0";
+    private String CONSUMER_BROKER_URL = "?jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0";
+    private String PRODUCER_BROKER_URL;
+
+    private final int numMessages = 1000;
+    private final int numConsumers = 2;
+    private final int dlqMessages = numMessages / 2;
+
+    private CountDownLatch receivedLatch;
+    private ActiveMQTopic destination;
+    private CountDownLatch started;
+
+    @Before
+    public void setUp() throws Exception {
+        Thread.setDefaultUncaughtExceptionHandler(this);
+        testCase = this;
+
+        // Start an embedded broker up.
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
+        brokerService.start();
+        destination = new ActiveMQTopic(name.getMethodName());
+        exceptions = new Vector<Throwable>();
+
+        CONSUMER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString() + CONSUMER_BROKER_URL;
+        PRODUCER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
+
+        receivedLatch = new CountDownLatch(numConsumers * (numMessages + dlqMessages));
+        started = new CountDownLatch(1);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        for (Thread t : threads) {
+            t.interrupt();
+            t.join();
+        }
+        brokerService.stop();
+    }
+
+    @Test(timeout=240000)
+    public void testConcurrentTopicResendToDLQ() throws Exception {
+
+        for (int i = 0; i < numConsumers; i++) {
+            ConsumerThread c1 = new ConsumerThread("Consumer-" + i);
+            threads.add(c1);
+            c1.start();
+        }
+
+        assertTrue(started.await(10, TimeUnit.SECONDS));
+
+        Thread producer = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    produce(numMessages);
+                } catch (Exception e) {
+                }
+            }
+        };
+        threads.add(producer);
+        producer.start();
+
+        boolean allGood = receivedLatch.await(90, TimeUnit.SECONDS);
+        for (Throwable t : exceptions) {
+            log.error("failing test with first exception", t);
+            fail("exception during test : " + t);
+        }
+        assertTrue("excepted messages received within time limit", allGood);
+
+        assertEquals(0, exceptions.size());
+
+        for (int i = 0; i < numConsumers; i++) {
+            // last recovery sends message to deq so is not received again
+            assertEquals(dlqMessages * 2, ((ConsumerThread) threads.get(i)).recoveries);
+            assertEquals(numMessages + dlqMessages, ((ConsumerThread) threads.get(i)).counter);
+        }
+
+        // half of the messages for each consumer should go to the dlq but duplicates will
+        // be suppressed
+        consumeFromDLQ(dlqMessages);
+
+    }
+
+    private void consumeFromDLQ(int messageCount) throws Exception {
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+        int count = 0;
+        for (int i = 0; i < messageCount; i++) {
+            if (dlqConsumer.receive(1000) == null) {
+                break;
+            }
+            count++;
+        }
+        assertEquals(messageCount, count);
+    }
+
+    public void produce(int count) throws Exception {
+        Connection connection = null;
+        try {
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(PRODUCER_BROKER_URL);
+            connection = factory.createConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(destination);
+            producer.setTimeToLive(0);
+            connection.start();
+
+            for (int i = 0; i < count; i++) {
+                int id = i + 1;
+                TextMessage message = session.createTextMessage(name.getMethodName() + " Message " + id);
+                message.setIntProperty("MsgNumber", id);
+                producer.send(message);
+
+                if (id % 500 == 0) {
+                    log.info("sent " + id + ", ith " + message);
+                }
+            }
+        } catch (JMSException e) {
+            log.error("unexpected ex on produce", e);
+            exceptions.add(e);
+        } finally {
+            try {
+                if (connection != null) {
+                    connection.close();
+                }
+            } catch (Throwable e) {
+            }
+        }
+    }
+
+    public class ConsumerThread extends Thread implements MessageListener {
+        public long counter = 0;
+        public long recoveries = 0;
+        private Session session;
+
+        public ConsumerThread(String threadId) {
+            super(threadId);
+        }
+
+        @Override
+        public void run() {
+            try {
+                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
+                Connection connection = connectionFactory.createConnection();
+                connection.setExceptionListener(testCase);
+                connection.setClientID(getName());
+                session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                MessageConsumer consumer = session.createDurableSubscriber(destination, getName());
+                consumer.setMessageListener(this);
+                connection.start();
+
+                started.countDown();
+
+            } catch (JMSException exception) {
+                log.error("unexpected ex in consumer run", exception);
+                exceptions.add(exception);
+            }
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                counter++;
+                int messageNumber = message.getIntProperty("MsgNumber");
+                if (messageNumber % 2 == 0) {
+                    session.recover();
+                    recoveries++;
+                } else {
+                    message.acknowledge();
+                }
+
+                if (counter % 200 == 0) {
+                    log.info("recoveries:" + recoveries + ", Received " + counter + ", counter'th " + message);
+                }
+                receivedLatch.countDown();
+            } catch (Exception e) {
+                log.error("unexpected ex on onMessage", e);
+                exceptions.add(e);
+            }
+        }
+
+    }
+
+    @Override
+    public void onException(JMSException exception) {
+        log.info("Unexpected JMSException", exception);
+        exceptions.add(exception);
+    }
+
+    @Override
+    public void uncaughtException(Thread thread, Throwable exception) {
+        log.info("Unexpected exception from thread " + thread + ", ex: " + exception);
+        exceptions.add(exception);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java
new file mode 100644
index 0000000..1f31864
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.naming.InitialContext;
+
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ2084Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ2084Test.class);
+    BrokerService broker;
+    CountDownLatch qreceived;
+    String connectionUri;
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+        broker.start();
+
+        qreceived = new CountDownLatch(1);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    public void listenQueue(final String queueName, final String selectors) {
+        try {
+            Properties props = new Properties();
+            props.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+            props.put("java.naming.provider.url", connectionUri);
+            props.put("queue.queueName", queueName);
+
+            javax.naming.Context ctx = new InitialContext(props);
+            QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
+            QueueConnection conn = factory.createQueueConnection();
+            final Queue queue = (Queue) ctx.lookup("queueName");
+            QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+            QueueReceiver receiver = session.createReceiver(queue, selectors);
+            System.out.println("Message Selector: " + receiver.getMessageSelector());
+            receiver.setMessageListener(new MessageListener() {
+                public void onMessage(Message message) {
+                    try {
+                        if (message instanceof TextMessage) {
+                            TextMessage txtMsg = (TextMessage) message;
+                            String msg = txtMsg.getText();
+                            LOG.info("Queue Message Received: " + queueName + " - " + msg);
+                            qreceived.countDown();
+
+                        }
+                        message.acknowledge();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+            conn.start();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void listenTopic(final String topicName, final String selectors) {
+        try {
+            Properties props = new Properties();
+            props.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+            props.put("java.naming.provider.url", connectionUri);
+            props.put("topic.topicName", topicName);
+
+            javax.naming.Context ctx = new InitialContext(props);
+            TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("ConnectionFactory");
+            TopicConnection conn = factory.createTopicConnection();
+            final Topic topic = (Topic) ctx.lookup("topicName");
+            TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+            TopicSubscriber receiver = session.createSubscriber(topic, selectors, false);
+
+            receiver.setMessageListener(new MessageListener() {
+                public void onMessage(Message message) {
+                    try {
+                        if (message instanceof TextMessage) {
+                            TextMessage txtMsg = (TextMessage) message;
+                            String msg = txtMsg.getText();
+                            LOG.info("Topic Message Received: " + topicName + " - " + msg);
+                        }
+                        message.acknowledge();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+            conn.start();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void publish(String topicName, String message) {
+        try {
+            Properties props = new Properties();
+            props.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+            props.put("java.naming.provider.url", connectionUri);
+            props.put("topic.topicName", topicName);
+            javax.naming.Context ctx = new InitialContext(props);
+            TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("ConnectionFactory");
+            TopicConnection conn = factory.createTopicConnection();
+            Topic topic = (Topic) ctx.lookup("topicName");
+            TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+            TopicPublisher publisher = session.createPublisher(topic);
+            if (message != null) {
+                Message msg = session.createTextMessage(message);
+                publisher.send(msg);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void tryXpathSelectorMatch() throws Exception {
+        String xPath = "XPATH '//books//book[@lang=''en'']'";
+        listenQueue("Consumer.Sample.VirtualTopic.TestXpath", xPath);
+        publish("VirtualTopic.TestXpath", "<?xml version=\"1.0\" encoding=\"UTF-8\"?><books><book lang=\"en\">ABC</book></books>");
+        assertTrue("topic received: ", qreceived.await(20, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void tryXpathSelectorNoMatch() throws Exception {
+        String xPath = "XPATH '//books//book[@lang=''es'']'";
+        listenQueue("Consumer.Sample.VirtualTopic.TestXpath", xPath);
+        publish("VirtualTopic.TestXpath", "<?xml version=\"1.0\" encoding=\"UTF-8\"?><books><book lang=\"en\">ABC</book></books>");
+        assertFalse("topic did not receive unmatched", qreceived.await(5, TimeUnit.SECONDS));
+    }
+
+}


Mime
View raw message