activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [16/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:36:47 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
new file mode 100644
index 0000000..21c389f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.QueueConnection;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.TimeUtils;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
+    static final String payload = new String(new byte[10 * 1024]);
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4485LowLimitTest.class);
+    final int portBase = 61600;
+    int numBrokers = 8;
+    final int numProducers = 30;
+    final int numMessages = 1000;
+    final int consumerSleepTime = 40;
+    StringBuilder brokersUrl = new StringBuilder();
+    HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap<ActiveMQQueue, AtomicInteger>();
+    private ArrayList<Throwable> exceptions = new ArrayList<Throwable>();
+
+    protected void buildUrlList() throws Exception {
+        for (int i = 0; i < numBrokers; i++) {
+            brokersUrl.append("tcp://localhost:" + (portBase + i));
+            if (i != numBrokers - 1) {
+                brokersUrl.append(',');
+            }
+        }
+    }
+
+    protected BrokerService createBroker(int brokerid) throws Exception {
+        return createBroker(brokerid, true);
+    }
+
+    protected BrokerService createBroker(int brokerid, boolean addToNetwork) throws Exception {
+
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.getManagementContext().setCreateConnector(false);
+
+
+        broker.setUseJmx(true);
+        broker.setBrokerName("B" + brokerid);
+        broker.addConnector(new URI("tcp://localhost:" + (portBase + brokerid)));
+
+        if (addToNetwork) {
+            addNetworkConnector(broker);
+        }
+        broker.setSchedulePeriodForDestinationPurge(0);
+        broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024l);
+
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setExpireMessagesPeriod(0);
+        policyEntry.setQueuePrefetch(1000);
+        policyEntry.setMemoryLimit(2 * 1024 * 1024l);
+        policyEntry.setProducerFlowControl(false);
+        policyEntry.setEnableAudit(true);
+        policyEntry.setUseCache(true);
+        policyMap.put(new ActiveMQQueue("GW.>"), policyEntry);
+
+        PolicyEntry inPolicyEntry = new PolicyEntry();
+        inPolicyEntry.setExpireMessagesPeriod(0);
+        inPolicyEntry.setQueuePrefetch(1000);
+        inPolicyEntry.setMemoryLimit(5 * 1024 * 1024l);
+        inPolicyEntry.setProducerFlowControl(true);
+        inPolicyEntry.setEnableAudit(true);
+        inPolicyEntry.setUseCache(true);
+        policyMap.put(new ActiveMQQueue("IN"), inPolicyEntry);
+
+        broker.setDestinationPolicy(policyMap);
+
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
+
+        brokers.put(broker.getBrokerName(), new BrokerItem(broker));
+        return broker;
+    }
+
+    private void addNetworkConnector(BrokerService broker) throws Exception {
+        StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(brokersUrl.toString());
+        networkConnectorUrl.append(')');
+
+        for (int i = 0; i < 2; i++) {
+            NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString()));
+            nc.setName("Bridge-" + i);
+            nc.setNetworkTTL(1);
+            nc.setDecreaseNetworkConsumerPriority(true);
+            nc.setDynamicOnly(true);
+            nc.setPrefetchSize(100);
+            nc.setDynamicallyIncludedDestinations(
+                    Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("GW.*")}));
+            broker.addNetworkConnector(nc);
+        }
+    }
+
+    // used to explore contention with concurrentStoreandDispatch - sync commit and task queue reversing
+    // order of cursor add and sequence assignment
+    public void x_testInterleavedSend() throws Exception {
+
+        BrokerService b = createBroker(0, false);
+        b.start();
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (portBase + 0));
+        connectionFactory.setWatchTopicAdvisories(false);
+
+        QueueConnection c1 = connectionFactory.createQueueConnection();
+        QueueConnection c2 = connectionFactory.createQueueConnection();
+        QueueConnection c3 = connectionFactory.createQueueConnection();
+
+        c1.start();
+        c2.start();
+        c3.start();
+
+        ActiveMQQueue dest = new ActiveMQQueue("IN");
+        final Session s1 = c1.createQueueSession(true, Session.SESSION_TRANSACTED);
+        final TextMessage txMessage = s1.createTextMessage("TX");
+        final TextMessage noTxMessage = s1.createTextMessage("NO_TX");
+
+        final MessageProducer txProducer = s1.createProducer(dest);
+        final MessageProducer nonTxProducer = c2.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(dest);
+
+        txProducer.send(txMessage);
+
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    s1.commit();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    nonTxProducer.send(noTxMessage);
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        executorService.shutdown();
+        executorService.awaitTermination(10, TimeUnit.MINUTES);
+
+    }
+
+    public void testBrokers() throws Exception {
+
+        buildUrlList();
+
+        for (int i = 0; i < numBrokers; i++) {
+            createBroker(i);
+        }
+
+        startAllBrokers();
+        waitForBridgeFormation(numBrokers - 1);
+
+        verifyPeerBrokerInfos(numBrokers - 1);
+
+
+        final List<ConsumerState> consumerStates = startAllGWConsumers(numBrokers);
+
+        startAllGWFanoutConsumers(numBrokers);
+
+        LOG.info("Waiting for percolation of consumers..");
+        TimeUnit.SECONDS.sleep(5);
+
+        LOG.info("Produce mesages..");
+        long startTime = System.currentTimeMillis();
+
+        // produce
+        produce(numMessages);
+
+        assertTrue("Got all sent", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                for (ConsumerState tally : consumerStates) {
+                    final int expected = numMessages * (tally.destination.isComposite() ? tally.destination.getCompositeDestinations().length : 1);
+                    LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get());
+                    if (tally.accumulator.get() != expected) {
+                        LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected + ", " + tally.expected);
+                        if (tally.accumulator.get() > expected - 50) {
+                            dumpQueueStat(null);
+                        }
+                        if (tally.expected.size() == 1) {
+                            startConsumer(tally.brokerName, tally.destination);
+                        };
+                        return false;
+                    }
+                    LOG.info("got tally on " + tally.brokerName);
+                }
+                return true;
+            }
+        }, 1000 * 60 * 1000l, 20*1000));
+
+        assertTrue("No exceptions:" + exceptions, exceptions.isEmpty());
+
+        LOG.info("done");
+        long duration = System.currentTimeMillis() - startTime;
+        LOG.info("Duration:" + TimeUtils.printDuration(duration));
+
+        assertEquals("nothing in the dlq's", 0, dumpQueueStat(new ActiveMQQueue("ActiveMQ.DLQ")));
+
+    }
+
+    private void startConsumer(String brokerName, ActiveMQDestination destination) throws Exception {
+        int id = Integer.parseInt(brokerName.substring(1));
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (portBase + id));
+        connectionFactory.setWatchTopicAdvisories(false);
+        QueueConnection queueConnection = connectionFactory.createQueueConnection();
+        queueConnection.start();
+
+        queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(destination);
+        queueConnection.close();
+    }
+
+    private long dumpQueueStat(ActiveMQDestination destination) throws Exception {
+        long sumTotal = 0;
+        Collection<BrokerItem> brokerList = brokers.values();
+        for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext(); ) {
+            BrokerService brokerService = i.next().broker;
+            for (ObjectName objectName : brokerService.getAdminView().getQueues()) {
+                if (destination != null && objectName.toString().contains(destination.getPhysicalName())) {
+                    QueueViewMBean qViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(objectName, QueueViewMBean.class, false);
+                    LOG.info(brokerService.getBrokerName() + ", " + qViewMBean.getName() + ", Enqueue:"  + qViewMBean.getEnqueueCount() + ", Size: " + qViewMBean.getQueueSize());
+                    sumTotal += qViewMBean.getQueueSize();
+                }
+            }
+        }
+        return sumTotal;
+    }
+
+    private void startAllGWFanoutConsumers(int nBrokers) throws Exception {
+
+        StringBuffer compositeDest = new StringBuffer();
+        for (int k = 0; k < nBrokers; k++) {
+            compositeDest.append("GW." + k);
+            if (k + 1 != nBrokers) {
+                compositeDest.append(',');
+            }
+        }
+        ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString());
+
+        for (int id = 0; id < nBrokers; id++) {
+            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
+            connectionFactory.setWatchTopicAdvisories(false);
+
+            QueueConnection queueConnection = connectionFactory.createQueueConnection();
+            queueConnection.start();
+
+            final QueueSession queueSession = queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);
+
+            final MessageProducer producer = queueSession.createProducer(compositeQ);
+            queueSession.createReceiver(new ActiveMQQueue("IN")).setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        producer.send(message);
+                        queueSession.commit();
+                    } catch (Exception e) {
+                        LOG.error("Failed to fanout to GW: " + message, e);
+                    }
+
+                }
+            });
+        }
+    }
+
+    private List<ConsumerState> startAllGWConsumers(int nBrokers) throws Exception {
+        List<ConsumerState> consumerStates = new LinkedList<ConsumerState>();
+        for (int id = 0; id < nBrokers; id++) {
+            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
+            connectionFactory.setWatchTopicAdvisories(false);
+
+            QueueConnection queueConnection = connectionFactory.createQueueConnection();
+            queueConnection.start();
+
+            final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            ActiveMQQueue destination = new ActiveMQQueue("GW." + id);
+            QueueReceiver queueReceiver = queueSession.createReceiver(destination);
+
+            final ConsumerState consumerState = new ConsumerState();
+            consumerState.brokerName = ((ActiveMQConnection) queueConnection).getBrokerName();
+            consumerState.receiver = queueReceiver;
+            consumerState.destination = destination;
+            for (int j = 0; j < numMessages * (consumerState.destination.isComposite() ? consumerState.destination.getCompositeDestinations().length : 1); j++) {
+                consumerState.expected.add(j);
+            }
+
+            if (!accumulators.containsKey(destination)) {
+                accumulators.put(destination, new AtomicInteger(0));
+            }
+            consumerState.accumulator = accumulators.get(destination);
+
+            queueReceiver.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        if (consumerSleepTime > 0) {
+                            TimeUnit.MILLISECONDS.sleep(consumerSleepTime);
+                        }
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    try {
+                        consumerState.accumulator.incrementAndGet();
+                        try {
+                            consumerState.expected.remove(((ActiveMQMessage) message).getProperty("NUM"));
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                        //queueSession.commit();
+                    } catch (Exception e) {
+                        LOG.error("Failed to commit slow receipt of " + message, e);
+                    }
+                }
+            });
+
+            consumerStates.add(consumerState);
+
+        }
+        return consumerStates;
+    }
+
+    private void produce(final int numMessages) throws Exception {
+        ExecutorService executorService = Executors.newFixedThreadPool(numProducers);
+        final AtomicInteger toSend = new AtomicInteger(numMessages);
+        for (int i = 1; i <= numProducers; i++) {
+            final int id = i % numBrokers;
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
+                        connectionFactory.setWatchTopicAdvisories(false);
+                        QueueConnection queueConnection = connectionFactory.createQueueConnection();
+                        queueConnection.start();
+                        QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+                        MessageProducer producer = queueSession.createProducer(null);
+                        int val = 0;
+                        while ((val = toSend.decrementAndGet()) >= 0) {
+
+                            int id = numMessages - val - 1;
+
+                            ActiveMQQueue compositeQ = new ActiveMQQueue("IN");
+                            Message textMessage = queueSession.createTextMessage(((ActiveMQConnection) queueConnection).getBrokerName() + "->" + id + " payload:" + payload);
+                            textMessage.setIntProperty("NUM", id);
+                            producer.send(compositeQ, textMessage);
+                        }
+                        queueConnection.close();
+
+                    } catch (Throwable throwable) {
+                        throwable.printStackTrace();
+                        exceptions.add(throwable);
+                    }
+                }
+            });
+        }
+    }
+
+    private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) throws Exception {
+        final BrokerService broker = brokerItem.broker;
+        final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
+                return max == regionBroker.getPeerBrokerInfos().length;
+            }
+        });
+        LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
+        List<String> missing = new ArrayList<String>();
+        for (int i = 0; i < max; i++) {
+            missing.add("B" + i);
+        }
+        if (max != regionBroker.getPeerBrokerInfos().length) {
+            for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) {
+                LOG.info(info.getBrokerName());
+                missing.remove(info.getBrokerName());
+            }
+            LOG.info("Broker infos off.." + missing);
+        }
+        assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length);
+    }
+
+    private void verifyPeerBrokerInfos(final int max) throws Exception {
+        Collection<BrokerItem> brokerList = brokers.values();
+        for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext(); ) {
+            verifyPeerBrokerInfo(i.next(), max);
+        }
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    class ConsumerState {
+        AtomicInteger accumulator;
+        String brokerName;
+        QueueReceiver receiver;
+        ActiveMQDestination destination;
+        ConcurrentLinkedQueue<Integer> expected = new ConcurrentLinkedQueue<Integer>();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java
new file mode 100644
index 0000000..c2cf53a
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.QueueConnection;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.TimeUtils;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest extends JmsMultipleBrokersTestSupport {
+    static final String payload = new String(new byte[10 * 1024]);
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.class);
+    final int portBase = 61600;
+    final int numBrokers = 4;
+    final int numProducers = 10;
+    final int numMessages = 800;
+    final int consumerSleepTime = 20;
+    StringBuilder brokersUrl = new StringBuilder();
+    HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap<ActiveMQQueue, AtomicInteger>();
+    private ArrayList<Throwable> exceptions = new ArrayList<Throwable>();
+
+    protected void buildUrlList() throws Exception {
+        for (int i = 0; i < numBrokers; i++) {
+            brokersUrl.append("tcp://localhost:" + (portBase + i));
+            if (i != numBrokers - 1) {
+                brokersUrl.append(',');
+            }
+        }
+    }
+
+    protected BrokerService createBroker(int brokerid) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.getManagementContext().setCreateConnector(false);
+
+
+        broker.setUseJmx(true);
+        broker.setBrokerName("B" + brokerid);
+        broker.addConnector(new URI("tcp://localhost:" + (portBase + brokerid)));
+
+        addNetworkConnector(broker);
+        broker.setSchedulePeriodForDestinationPurge(0);
+        broker.getSystemUsage().setSendFailIfNoSpace(true);
+        broker.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024);
+
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setExpireMessagesPeriod(0);
+        policyEntry.setQueuePrefetch(1000);
+        policyEntry.setMemoryLimit(1024 * 1024l);
+        policyEntry.setOptimizedDispatch(false);
+        policyEntry.setProducerFlowControl(false);
+        policyEntry.setEnableAudit(true);
+        policyEntry.setUseCache(true);
+        policyMap.put(new ActiveMQQueue("GW.>"), policyEntry);
+        broker.setDestinationPolicy(policyMap);
+
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(false);
+
+        brokers.put(broker.getBrokerName(), new BrokerItem(broker));
+        return broker;
+    }
+
+    private void addNetworkConnector(BrokerService broker) throws Exception {
+        StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(brokersUrl.toString());
+        networkConnectorUrl.append(')');
+
+        for (int i = 0; i < 2; i++) {
+            NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString()));
+            nc.setName("Bridge-" + i);
+            nc.setNetworkTTL(1);
+            nc.setDecreaseNetworkConsumerPriority(true);
+            nc.setDynamicOnly(true);
+            nc.setPrefetchSize(100);
+            nc.setDynamicallyIncludedDestinations(
+                    Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("GW.*")}));
+            broker.addNetworkConnector(nc);
+        }
+    }
+
+    public void testBrokers() throws Exception {
+
+        buildUrlList();
+
+        for (int i = 0; i < numBrokers; i++) {
+            createBroker(i);
+        }
+
+        startAllBrokers();
+        waitForBridgeFormation(numBrokers - 1);
+
+        verifyPeerBrokerInfos(numBrokers - 1);
+
+
+        final List<ConsumerState> consumerStates = startAllGWConsumers(numBrokers);
+
+        startAllGWFanoutConsumers(numBrokers);
+
+        LOG.info("Waiting for percolation of consumers..");
+        TimeUnit.SECONDS.sleep(5);
+
+        LOG.info("Produce mesages..");
+        long startTime = System.currentTimeMillis();
+
+        // produce
+        produce(numMessages);
+
+        assertTrue("Got all sent", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                for (ConsumerState tally : consumerStates) {
+                    final int expected = numMessages * (tally.destination.isComposite() ? tally.destination.getCompositeDestinations().length : 1);
+                    LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get());
+                    if (tally.accumulator.get() != expected) {
+                        LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected + ", " + tally.expected);
+                        return false;
+                    }
+                    LOG.info("got tally on " + tally.brokerName);
+                }
+                return true;
+            }
+        }, 1000 * 60 * 1000l));
+
+        assertTrue("No exceptions:" + exceptions, exceptions.isEmpty());
+
+        LOG.info("done");
+        long duration = System.currentTimeMillis() - startTime;
+        LOG.info("Duration:" + TimeUtils.printDuration(duration));
+    }
+
+    private void startAllGWFanoutConsumers(int nBrokers) throws Exception {
+
+        StringBuffer compositeDest = new StringBuffer();
+        for (int k = 0; k < nBrokers; k++) {
+            compositeDest.append("GW." + k);
+            if (k + 1 != nBrokers) {
+                compositeDest.append(',');
+            }
+        }
+        ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString());
+
+        for (int id = 0; id < nBrokers; id++) {
+            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
+            connectionFactory.setWatchTopicAdvisories(false);
+
+            QueueConnection queueConnection = connectionFactory.createQueueConnection();
+            queueConnection.start();
+
+            final QueueSession queueSession = queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);
+
+            final MessageProducer producer = queueSession.createProducer(compositeQ);
+            queueSession.createReceiver(new ActiveMQQueue("IN")).setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        producer.send(message);
+                        queueSession.commit();
+                    } catch (Exception e) {
+                        LOG.error("Failed to fanout to GW: " + message, e);
+                    }
+
+                }
+            });
+        }
+    }
+
+    private List<ConsumerState> startAllGWConsumers(int nBrokers) throws Exception {
+        List<ConsumerState> consumerStates = new LinkedList<ConsumerState>();
+        for (int id = 0; id < nBrokers; id++) {
+            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
+            connectionFactory.setWatchTopicAdvisories(false);
+
+            QueueConnection queueConnection = connectionFactory.createQueueConnection();
+            queueConnection.start();
+
+            final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            ActiveMQQueue destination = new ActiveMQQueue("GW." + id);
+            QueueReceiver queueReceiver = queueSession.createReceiver(destination);
+
+            final ConsumerState consumerState = new ConsumerState();
+            consumerState.brokerName = ((ActiveMQConnection) queueConnection).getBrokerName();
+            consumerState.receiver = queueReceiver;
+            consumerState.destination = destination;
+            for (int j = 0; j < numMessages * (consumerState.destination.isComposite() ? consumerState.destination.getCompositeDestinations().length : 1); j++) {
+                consumerState.expected.add(j);
+            }
+
+            if (!accumulators.containsKey(destination)) {
+                accumulators.put(destination, new AtomicInteger(0));
+            }
+            consumerState.accumulator = accumulators.get(destination);
+
+            queueReceiver.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        if (consumerSleepTime > 0) {
+                            TimeUnit.MILLISECONDS.sleep(consumerSleepTime);
+                        }
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    try {
+                        consumerState.accumulator.incrementAndGet();
+                        try {
+                            consumerState.expected.remove(((ActiveMQMessage) message).getProperty("NUM"));
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                    } catch (Exception e) {
+                        LOG.error("Failed to commit slow receipt of " + message, e);
+                    }
+                }
+            });
+
+            consumerStates.add(consumerState);
+
+        }
+        return consumerStates;
+    }
+
+    private void produce(int numMessages) throws Exception {
+        ExecutorService executorService = Executors.newFixedThreadPool(numProducers);
+        final AtomicInteger toSend = new AtomicInteger(numMessages);
+        for (int i = 1; i <= numProducers; i++) {
+            final int id = i % numBrokers;
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
+                        connectionFactory.setWatchTopicAdvisories(false);
+                        QueueConnection queueConnection = connectionFactory.createQueueConnection();
+                        queueConnection.start();
+                        QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+                        MessageProducer producer = queueSession.createProducer(null);
+                        int val = 0;
+                        while ((val = toSend.decrementAndGet()) >= 0) {
+
+                            ActiveMQQueue compositeQ = new ActiveMQQueue("IN");
+                            LOG.info("Send to: " + ((ActiveMQConnection) queueConnection).getBrokerName() + ", " + val + ", dest:" + compositeQ);
+                            Message textMessage = queueSession.createTextMessage(((ActiveMQConnection) queueConnection).getBrokerName() + "->" + val + " payload:" + payload);
+                            textMessage.setIntProperty("NUM", val);
+                            producer.send(compositeQ, textMessage);
+                        }
+                        queueConnection.close();
+
+                    } catch (Throwable throwable) {
+                        throwable.printStackTrace();
+                        exceptions.add(throwable);
+                    }
+                }
+            });
+        }
+    }
+
+    private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) throws Exception {
+        final BrokerService broker = brokerItem.broker;
+        final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
+                return max == regionBroker.getPeerBrokerInfos().length;
+            }
+        });
+        LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
+        List<String> missing = new ArrayList<String>();
+        for (int i = 0; i < max; i++) {
+            missing.add("B" + i);
+        }
+        if (max != regionBroker.getPeerBrokerInfos().length) {
+            for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) {
+                LOG.info(info.getBrokerName());
+                missing.remove(info.getBrokerName());
+            }
+            LOG.info("Broker infos off.." + missing);
+        }
+        assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length);
+    }
+
+    private void verifyPeerBrokerInfos(final int max) throws Exception {
+        Collection<BrokerItem> brokerList = brokers.values();
+        for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext(); ) {
+            verifyPeerBrokerInfo(i.next(), max);
+        }
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    class ConsumerState {
+        AtomicInteger accumulator;
+        String brokerName;
+        QueueReceiver receiver;
+        ActiveMQDestination destination;
+        Vector<Integer> expected = new Vector<Integer>();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
new file mode 100644
index 0000000..1126d31
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransactionBroker;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4485Test extends TestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4485Test.class);
+    BrokerService broker;
+    ActiveMQConnectionFactory factory;
+    final int messageCount = 20;
+    int memoryLimit = 40 * 1024;
+    final ActiveMQQueue destination = new ActiveMQQueue("QUEUE." + this.getClass().getName());
+    final Vector<Throwable> exceptions = new Vector<Throwable>();
+    final CountDownLatch slowSendResume = new CountDownLatch(1);
+
+
+    protected void configureBroker(long memoryLimit) throws Exception {
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setAdvisorySupport(false);
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setExpireMessagesPeriod(0);
+        policy.setMemoryLimit(memoryLimit);
+        policy.setProducerFlowControl(false);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+
+        broker.setPlugins(new BrokerPlugin[] {new BrokerPluginSupport() {
+            @Override
+            public void send(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception {
+                if (messageSend.isInTransaction() && messageSend.getProperty("NUM") != null) {
+                    final Integer num = (Integer) messageSend.getProperty("NUM");
+                    if (true) {
+                        TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class);
+                        transactionBroker.getTransaction(producerExchange.getConnectionContext(), messageSend.getTransactionId(), false).addSynchronization(
+                                new Synchronization() {
+                                    public void afterCommit() throws Exception {
+                                        LOG.error("AfterCommit, NUM:" + num + ", " + messageSend.getMessageId() + ", tx: " + messageSend.getTransactionId());
+                                        if (num == 5) {
+                                            // we want to add to cursor after usage is exhausted by message 20 and when
+                                            // all other messages have been processed
+                                            LOG.error("Pausing on latch in afterCommit for: " + num + ", " + messageSend.getMessageId());
+                                            slowSendResume.await(20, TimeUnit.SECONDS);
+                                            LOG.error("resuming on latch afterCommit for: " + num + ", " + messageSend.getMessageId());
+                                        } else if (messageCount + 1 == num) {
+                                            LOG.error("releasing latch. " + num + ", " + messageSend.getMessageId());
+                                            slowSendResume.countDown();
+                                            // for message X, we need to delay so message 5 can setBatch
+                                            TimeUnit.SECONDS.sleep(5);
+                                            LOG.error("resuming afterCommit for: " + num + ", " + messageSend.getMessageId());
+                                        }
+                                    }
+                                });
+                    }
+                }
+                super.send(producerExchange, messageSend);
+            }
+        }
+        });
+
+    }
+
+
+    public void testOutOfOrderTransactionCompletionOnMemoryLimit() throws Exception {
+
+        Set<Integer> expected = new HashSet<Integer>();
+        final Vector<Session> sessionVector = new Vector<Session>();
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        for (int i = 1; i <= messageCount; i++) {
+           sessionVector.add(send(i, 1, true));
+           expected.add(i);
+        }
+
+        // get parallel commit so that the sync writes are batched
+        for (int i = 0; i < messageCount; i++) {
+            final int id = i;
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        sessionVector.get(id).commit();
+                    } catch (Exception fail) {
+                        exceptions.add(fail);
+                    }
+                }
+            });
+        }
+
+        final DestinationViewMBean queueViewMBean = (DestinationViewMBean)
+                broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0], DestinationViewMBean.class, false);
+
+        // not sure how many messages will get enqueued
+        TimeUnit.SECONDS.sleep(3);
+        if (false)
+        assertTrue("all " + messageCount + " on the q", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("enqueueCount: " + queueViewMBean.getEnqueueCount());
+                return messageCount == queueViewMBean.getEnqueueCount();
+            }
+        }));
+
+        LOG.info("Big send to blow available destination usage before slow send resumes");
+        send(messageCount + 1, 35*1024, true).commit();
+
+
+        // consume and verify all received
+        Connection cosumerConnection = factory.createConnection();
+        cosumerConnection.start();
+        MessageConsumer consumer = cosumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(destination);
+        for (int i = 1; i <= messageCount + 1; i++) {
+            BytesMessage bytesMessage = (BytesMessage) consumer.receive(10000);
+            assertNotNull("Got message: " + i + ", " + expected, bytesMessage);
+            MessageId mqMessageId = ((ActiveMQBytesMessage) bytesMessage).getMessageId();
+            LOG.info("got: " + expected + ", "  + mqMessageId + ", NUM=" + ((ActiveMQBytesMessage) bytesMessage).getProperty("NUM"));
+            expected.remove(((ActiveMQBytesMessage) bytesMessage).getProperty("NUM"));
+        }
+    }
+
+    private Session send(int id, int messageSize, boolean transacted) throws Exception {
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        BytesMessage bytesMessage = session.createBytesMessage();
+        bytesMessage.writeBytes(new byte[messageSize]);
+        bytesMessage.setIntProperty("NUM", id);
+        producer.send(bytesMessage);
+        LOG.info("Sent:" + bytesMessage.getJMSMessageID() + " session tx: " + ((ActiveMQBytesMessage) bytesMessage).getTransactionId());
+        return session;
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        broker = new BrokerService();
+        broker.setBrokerName("thisOne");
+        configureBroker(memoryLimit);
+        broker.start();
+        factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true");
+        factory.setWatchTopicAdvisories(false);
+
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+            broker = null;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java
new file mode 100644
index 0000000..346650e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Enumeration;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4487Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4487Test.class);
+
+    private final String destinationName = "TEST.QUEUE";
+    private BrokerService broker;
+    private ActiveMQConnectionFactory factory;
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.deleteAllMessages();
+        broker.setUseJmx(false);
+        broker.setAdvisorySupport(false);
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setQueue(">");
+        policy.setMaxProducersToAudit(75);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+
+        broker.start();
+        broker.waitUntilStarted();
+        factory = new ActiveMQConnectionFactory("vm://localhost");
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    private void sendMessages(int messageToSend) throws Exception {
+        String data = "";
+        for (int i = 0; i < 1024 * 2; i++) {
+            data += "x";
+        }
+
+        Connection connection = factory.createConnection();
+        connection.start();
+
+        for (int i = 0; i < messageToSend; i++) {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue(destinationName);
+            MessageProducer producer = session.createProducer(queue);
+            producer.send(session.createTextMessage(data));
+            session.close();
+        }
+
+        connection.close();
+    }
+
+    @Test
+    public void testBrowsingWithLessThanMaxAuditDepth() throws Exception {
+        doTestBrowsing(75);
+    }
+
+    @Test
+    public void testBrowsingWithMoreThanMaxAuditDepth() throws Exception {
+        doTestBrowsing(300);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private void doTestBrowsing(int messagesToSend) throws Exception {
+
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(destinationName);
+
+        sendMessages(messagesToSend);
+
+        QueueBrowser browser = session.createBrowser(queue);
+        Enumeration enumeration = browser.getEnumeration();
+        int received = 0;
+        while (enumeration.hasMoreElements()) {
+            Message m = (Message) enumeration.nextElement();
+            assertNotNull(m);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Browsed Message: {}", m.getJMSMessageID());
+            }
+
+            received++;
+            if (received > messagesToSend) {
+                break;
+            }
+        }
+
+        browser.close();
+
+        assertEquals(messagesToSend, received);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java
new file mode 100644
index 0000000..64204bd
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import static org.junit.Assert.assertNotNull;
+
+public class AMQ4504Test {
+
+    BrokerService brokerService;
+
+    @Before
+    public void setup() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.start();
+    }
+
+    @After
+    public void stop() throws Exception {
+        brokerService.stop();
+    }
+
+    @Test
+    public void testCompositeDestConsumer() throws Exception {
+
+        final int numDests = 20;
+        final int numMessages = 200;
+        StringBuffer stringBuffer = new StringBuffer();
+        for (int i=0; i<numDests; i++) {
+            if (stringBuffer.length() != 0) {
+                stringBuffer.append(',');
+            }
+            stringBuffer.append("ST." + i);
+        }
+        stringBuffer.append("?consumer.prefetchSize=100");
+        ActiveMQQueue activeMQQueue = new ActiveMQQueue(stringBuffer.toString());
+        ConnectionFactory factory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
+        Connection connection = factory.createConnection();
+        connection.start();
+        MessageProducer producer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(activeMQQueue);
+        for (int i=0; i<numMessages; i++) {
+            producer.send(new ActiveMQTextMessage());
+        }
+
+        MessageConsumer consumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(activeMQQueue);
+        try {
+            for (int i=0; i< numMessages * numDests; i++) {
+                assertNotNull("recieved:"  + i, consumer.receive(4000));
+            }
+        } finally {
+            connection.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java
new file mode 100644
index 0000000..cf19982
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ4513Test {
+
+    private BrokerService brokerService;
+    private String connectionUri;
+
+    @Before
+    public void setup() throws Exception {
+        brokerService = new BrokerService();
+
+        connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
+
+        // Configure Dead Letter Strategy
+        DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
+        ((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
+        ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
+        strategy.setProcessNonPersistent(false);
+        strategy.setProcessExpired(false);
+
+        // Add policy and individual DLQ strategy
+        PolicyEntry policy = new PolicyEntry();
+        policy.setTimeBeforeDispatchStarts(3000);
+        policy.setDeadLetterStrategy(strategy);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        brokerService.setDestinationPolicy(pMap);
+
+        brokerService.setPersistent(false);
+        brokerService.start();
+    }
+
+    @After
+    public void stop() throws Exception {
+        brokerService.stop();
+    }
+
+    @Test(timeout=360000)
+    public void test() throws Exception {
+
+        final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
+
+        ExecutorService service = Executors.newFixedThreadPool(25);
+
+        final Random ripple = new Random(System.currentTimeMillis());
+
+        for (int i = 0; i < 1000; ++i) {
+            service.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+                        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                        Destination destination = session.createTemporaryQueue();
+                        session.createProducer(destination);
+                        connection.close();
+                        TimeUnit.MILLISECONDS.sleep(ripple.nextInt(20));
+                    } catch (Exception e) {
+                    }
+                }
+            });
+
+            service.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+                        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                        Destination destination = session.createTemporaryQueue();
+                        MessageProducer producer = session.createProducer(destination);
+                        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                        producer.setTimeToLive(400);
+                        producer.send(session.createTextMessage());
+                        producer.send(session.createTextMessage());
+                        TimeUnit.MILLISECONDS.sleep(500);
+                        connection.close();
+                    } catch (Exception e) {
+                    }
+                }
+            });
+
+            service.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+                        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                        Destination destination = session.createTemporaryQueue();
+                        session.createProducer(destination);
+                        connection.close();
+                        TimeUnit.MILLISECONDS.sleep(ripple.nextInt(20));
+                    } catch (Exception e) {
+                    }
+                }
+            });
+        }
+
+        service.shutdown();
+        assertTrue(service.awaitTermination(5, TimeUnit.MINUTES));
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java
new file mode 100644
index 0000000..6f5556d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ4517Test {
+
+    private BrokerService brokerService;
+    private String connectionUri;
+
+    @Before
+    public void setup() throws Exception {
+        brokerService = new BrokerService();
+
+        connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
+
+        // Configure Dead Letter Strategy
+        DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
+        ((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
+        ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
+        strategy.setProcessNonPersistent(false);
+        strategy.setProcessExpired(false);
+
+        // Add policy and individual DLQ strategy
+        PolicyEntry policy = new PolicyEntry();
+        policy.setTimeBeforeDispatchStarts(3000);
+        policy.setDeadLetterStrategy(strategy);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        brokerService.setDestinationPolicy(pMap);
+        brokerService.setPersistent(false);
+        brokerService.start();
+    }
+
+    @After
+    public void stop() throws Exception {
+        brokerService.stop();
+    }
+
+    @Test(timeout=360000)
+    public void test() throws Exception {
+
+        final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
+
+        final AtomicBoolean advised = new AtomicBoolean(false);
+        Connection connection = cf.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination dlqDestination = session.createTopic(AdvisorySupport.MESSAGE_DLQ_TOPIC_PREFIX + ">");
+        MessageConsumer consumer = session.createConsumer(dlqDestination);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                advised.set(true);
+            }
+        });
+        connection.start();
+
+        ExecutorService service = Executors.newSingleThreadExecutor();
+
+        service.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    Destination destination = session.createTemporaryQueue();
+                    MessageProducer producer = session.createProducer(destination);
+                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                    producer.setTimeToLive(400);
+                    producer.send(session.createTextMessage());
+                    producer.send(session.createTextMessage());
+                    TimeUnit.MILLISECONDS.sleep(500);
+                    connection.close();
+                } catch (Exception e) {
+                }
+            }
+        });
+
+        service.shutdown();
+        assertTrue(service.awaitTermination(1, TimeUnit.MINUTES));
+        assertFalse("Should not get any Advisories for DLQ'd Messages", advised.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java
new file mode 100644
index 0000000..e544642
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ4518Test {
+
+    private BrokerService brokerService;
+    private String connectionUri;
+
+    @Before
+    public void setup() throws Exception {
+        brokerService = new BrokerService();
+
+        connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
+
+        // Configure Dead Letter Strategy
+        DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
+        ((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
+        ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
+        strategy.setProcessNonPersistent(false);
+        strategy.setProcessExpired(false);
+
+        // Add policy and individual DLQ strategy
+        PolicyEntry policy = new PolicyEntry();
+        policy.setTimeBeforeDispatchStarts(3000);
+        policy.setDeadLetterStrategy(strategy);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        brokerService.setDestinationPolicy(pMap);
+        brokerService.setPersistent(false);
+        brokerService.start();
+    }
+
+    @After
+    public void stop() throws Exception {
+        brokerService.stop();
+    }
+
+    @Test(timeout=360000)
+    public void test() throws Exception {
+
+        final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
+
+        final AtomicBoolean advised = new AtomicBoolean(false);
+        Connection connection = cf.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination dlqDestination = session.createTopic(AdvisorySupport.EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + ">");
+        MessageConsumer consumer = session.createConsumer(dlqDestination);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                advised.set(true);
+            }
+        });
+        connection.start();
+
+        ExecutorService service = Executors.newSingleThreadExecutor();
+
+        service.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    Destination destination = session.createTemporaryQueue();
+                    MessageProducer producer = session.createProducer(destination);
+                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                    producer.setTimeToLive(400);
+                    producer.send(session.createTextMessage());
+                    producer.send(session.createTextMessage());
+                    TimeUnit.MILLISECONDS.sleep(500);
+                    connection.close();
+                } catch (Exception e) {
+                }
+            }
+        });
+
+        service.shutdown();
+        assertTrue(service.awaitTermination(1, TimeUnit.MINUTES));
+        assertFalse("Should not get any Advisories for Expired Messages", advised.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java
new file mode 100644
index 0000000..e8ab9f4
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularDataSupport;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.CompositeDataConstants;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ4530Test {
+
+    private static BrokerService brokerService;
+    private static String TEST_QUEUE = "testQueue";
+    private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE);
+    private static String BROKER_ADDRESS = "tcp://localhost:0";
+    private static String KEY = "testproperty";
+    private static String VALUE = "propvalue";
+
+    private ActiveMQConnectionFactory connectionFactory;
+    private String connectionUri;
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(true);
+        connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+        sendMessage();
+    }
+
+    public void sendMessage() throws Exception {
+        final Connection conn = connectionFactory.createConnection();
+        try {
+            conn.start();
+            final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final Destination queue = session.createQueue(TEST_QUEUE);
+            final Message toSend = session.createMessage();
+            toSend.setStringProperty(KEY, VALUE);
+            final MessageProducer producer = session.createProducer(queue);
+            producer.send(queue, toSend);
+        } finally {
+            conn.close();
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testStringPropertiesFromCompositeData() throws Exception {
+        final QueueViewMBean queueView = getProxyToQueueViewMBean();
+        final CompositeData message = queueView.browse()[0];
+        assertNotNull(message);
+        TabularDataSupport stringProperties = (TabularDataSupport) message.get(CompositeDataConstants.STRING_PROPERTIES);
+        assertNotNull(stringProperties);
+        assertThat(stringProperties.size(), is(greaterThan(0)));
+        Map.Entry<Object, Object> compositeDataEntry = (Map.Entry<Object, Object>) stringProperties.entrySet().toArray()[0];
+        CompositeData stringEntry = (CompositeData) compositeDataEntry.getValue();
+        assertThat(String.valueOf(stringEntry.get("key")), equalTo(KEY));
+        assertThat(String.valueOf(stringEntry.get("value")), equalTo(VALUE));
+    }
+
+    private QueueViewMBean getProxyToQueueViewMBean() throws MalformedObjectNameException, NullPointerException,
+            JMSException {
+        final ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName());
+        final QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(
+                queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java
new file mode 100644
index 0000000..0be3226
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.CountDownLatch;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit test for simple App.
+ */
+public class AMQ4531Test extends TestCase {
+
+    private final Logger LOG = LoggerFactory.getLogger(AMQ4531Test.class);
+
+    private String connectionURI;
+    private MBeanServer mbeanServer;
+    private BrokerService broker;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        broker = new BrokerService();
+        connectionURI = broker.addConnector("tcp://0.0.0.0:0?maximumConnections=1").getPublishableConnectString();
+        broker.setPersistent(false);
+        broker.start();
+        mbeanServer = ManagementFactory.getPlatformMBeanServer();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        broker.stop();
+        super.tearDown();
+    }
+
+    /**
+     * Create the test case
+     *
+     * @param testName
+     *            name of the test case
+     */
+    public AMQ4531Test(String testName) {
+        super(testName);
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite() {
+        return new TestSuite(AMQ4531Test.class);
+    }
+
+    public void testFDSLeak() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI);
+        ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+        connection.start();
+
+        int connections = 100;
+        final long original = openFileDescriptorCount();
+        LOG.info("FD count: " + original);
+        final CountDownLatch done = new CountDownLatch(connections);
+        for (int i = 0; i < connections; i++) {
+            new Thread("worker: " + i) {
+                @Override
+                public void run() {
+                    ActiveMQConnection connection = null;
+                    try {
+                        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI);
+                        connection = (ActiveMQConnection) factory.createConnection();
+                        connection.start();
+                    } catch (Exception e) {
+                        LOG.debug(getStack(e));
+                    } finally {
+                        try {
+                            connection.close();
+                        } catch (Exception e) {
+                            LOG.debug(getStack(e));
+                        }
+                        done.countDown();
+                        LOG.debug("Latch count down called.");
+                    }
+                }
+            }.start();
+        }
+
+        // Wait for all the clients to finish
+        LOG.info("Waiting for latch...");
+        done.await();
+        LOG.info("Latch complete.");
+        LOG.info("FD count: " + openFileDescriptorCount());
+
+        assertTrue("Too many open file descriptors: " + openFileDescriptorCount(), Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                long openFDs = openFileDescriptorCount();
+                LOG.info("Current FD count [{}], original FD count[{}]", openFDs, original);
+                return (openFDs - original) < 10;
+            }
+        }));
+    }
+
+    private long openFileDescriptorCount() throws Exception {
+        return ((Long) mbeanServer.getAttribute(new ObjectName("java.lang:type=OperatingSystem"), "OpenFileDescriptorCount")).longValue();
+    }
+
+    private String getStack(Throwable aThrowable) {
+        final Writer result = new StringWriter();
+        final PrintWriter printWriter = new PrintWriter(result);
+        aThrowable.printStackTrace(printWriter);
+        return result.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java
new file mode 100644
index 0000000..47ce642
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit test for simple App.
+ */
+public class AMQ4554Test extends TestCase {
+
+    private final Logger LOG = LoggerFactory.getLogger(AMQ4554Test.class);
+
+    private String connectionURI;
+    private BrokerService broker;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        broker = new BrokerService();
+        connectionURI = broker.addConnector("tcp://0.0.0.0:0?maximumConnections=1").getPublishableConnectString();
+        broker.setPersistent(false);
+        broker.start();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        broker.stop();
+        super.tearDown();
+    }
+
+    /**
+     * Create the test case
+     *
+     * @param testName
+     *            name of the test case
+     */
+    public AMQ4554Test(String testName) {
+        super(testName);
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite() {
+        return new TestSuite(AMQ4554Test.class);
+    }
+
+    public void testMSXProducerTXID() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI);
+        Connection connection = factory.createConnection();
+        connection.start();
+
+        Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = producerSession.createProducer(producerSession.createQueue("myQueue"));
+        TextMessage producerMessage = producerSession.createTextMessage("Test Message");
+        producer.send(producerMessage);
+        producer.close();
+        producerSession.commit();
+        producerSession.close();
+
+        Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue("myQueue"));
+        Message consumerMessage = consumer.receive(1000);
+        try {
+            String txId = consumerMessage.getStringProperty("JMSXProducerTXID");
+            assertNotNull(txId);
+        } catch(Exception e) {
+            LOG.info("Caught Exception that was not expected:", e);
+            fail("Should not throw");
+        }
+        consumer.close();
+        consumerSession.commit();
+        consumerSession.close();
+        connection.close();
+    }
+
+}


Mime
View raw message