activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [23/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:36:54 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java
new file mode 100644
index 0000000..25a95bc
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Vector;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ2413Test extends CombinationTestSupport implements MessageListener {
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ2413Test.class);
+    BrokerService broker;
+    private ActiveMQConnectionFactory factory;
+
+    private static final int HANG_THRESHOLD = 60;
+    private static final int SEND_COUNT = 1000;
+    private static final int RECEIVER_THINK_TIME = 1;
+    private static final int CONSUMER_COUNT = 1;
+    private static final int PRODUCER_COUNT = 50;
+    private static final int TO_SEND = SEND_COUNT / PRODUCER_COUNT;
+
+    public int deliveryMode = DeliveryMode.NON_PERSISTENT;
+    public int ackMode = Session.DUPS_OK_ACKNOWLEDGE;
+    public boolean useVMCursor = false;
+    public boolean useOptimizeAcks = false;
+
+    private final ArrayList<Service> services = new ArrayList<Service>(CONSUMER_COUNT + PRODUCER_COUNT);
+    AtomicInteger count = new AtomicInteger(0);
+    Semaphore receivedMessages;
+    AtomicBoolean running = new AtomicBoolean(false);
+
+    public void initCombos() {
+        addCombinationValues("deliveryMode", new Object[] { DeliveryMode.PERSISTENT, DeliveryMode.NON_PERSISTENT });
+        addCombinationValues("ackMode", new Object[] { Session.DUPS_OK_ACKNOWLEDGE, Session.AUTO_ACKNOWLEDGE });
+        addCombinationValues("useVMCursor", new Object[] { true, false });
+        // addCombinationValues("useOptimizeAcks", new Object[] {true, false});
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "AMQ2401Test");
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        KahaDBPersistenceAdapter kahaDb = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        kahaDb.setConcurrentStoreAndDispatchQueues(false);
+        broker.addConnector("tcp://0.0.0.0:2401");
+        PolicyMap policies = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setMemoryLimit(1024 * 1024);
+        entry.setProducerFlowControl(true);
+        if (useVMCursor) {
+            entry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+        }
+        entry.setQueue(">");
+        policies.setDefaultEntry(entry);
+        broker.setDestinationPolicy(policies);
+        broker.start();
+        broker.waitUntilStarted();
+
+        count.set(0);
+        receivedMessages = new Semaphore(0);
+
+        factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2401");
+        // factory = new ActiveMQConnectionFactory("vm://localhost?broker.useJmx=false&broker.persistent=false");
+        setAutoFail(true);
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        running.set(false);
+        for (Service service : services) {
+            service.close();
+        }
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        super.tearDown();
+    }
+
+    public void testReceipt() throws Exception {
+
+        running.set(true);
+        TestProducer p = null;
+        TestConsumer c = null;
+        try {
+
+            for (int i = 0; i < CONSUMER_COUNT; i++) {
+                TestConsumer consumer = new TestConsumer();
+                consumer.start();
+                services.add(consumer);
+            }
+            for (int i = 0; i < PRODUCER_COUNT; i++) {
+                TestProducer producer = new TestProducer(i);
+                producer.start();
+                services.add(producer);
+            }
+            waitForMessageReceipt();
+
+        } finally {
+            if (p != null) {
+                p.close();
+            }
+
+            if (c != null) {
+                c.close();
+            }
+        }
+
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
+     */
+    @Override
+    public void onMessage(Message message) {
+        receivedMessages.release();
+        if (count.incrementAndGet() % 100 == 0) {
+            LOG.info("Received message " + count);
+        }
+        track(message);
+        if (RECEIVER_THINK_TIME > 0) {
+            try {
+                Thread.sleep(RECEIVER_THINK_TIME);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+
+    }
+
+    HashMap<ProducerId, boolean[]> tracker = new HashMap<ProducerId, boolean[]>();
+
+    private synchronized void track(Message message) {
+        try {
+            MessageId id = new MessageId(message.getJMSMessageID());
+            ProducerId pid = id.getProducerId();
+            int seq = (int) id.getProducerSequenceId();
+            boolean[] ids = tracker.get(pid);
+            if (ids == null) {
+                ids = new boolean[TO_SEND + 1];
+                ids[seq] = true;
+                tracker.put(pid, ids);
+            } else {
+                assertTrue("not already received: " + id, !ids[seq]);
+                ids[seq] = true;
+            }
+        } catch (Exception e) {
+            LOG.error(e.toString());
+        }
+    }
+
+    /**
+     * @throws InterruptedException
+     * @throws TimeoutException
+     *
+     */
+    private void waitForMessageReceipt() throws InterruptedException, TimeoutException {
+        try {
+            while (count.get() < SEND_COUNT) {
+                if (!receivedMessages.tryAcquire(HANG_THRESHOLD, TimeUnit.SECONDS)) {
+                    if (count.get() == SEND_COUNT)
+                        break;
+                    verifyTracking();
+                    throw new TimeoutException("@count=" + count.get() + " Message not received for more than " + HANG_THRESHOLD + " seconds");
+                }
+            }
+        } finally {
+            running.set(false);
+        }
+    }
+
+    private void verifyTracking() {
+        Vector<MessageId> missing = new Vector<MessageId>();
+        for (ProducerId pid : tracker.keySet()) {
+            boolean[] ids = tracker.get(pid);
+            for (int i = 1; i < TO_SEND + 1; i++) {
+                if (!ids[i]) {
+                    missing.add(new MessageId(pid, i));
+                }
+            }
+        }
+        assertTrue("No missing messages: " + missing, missing.isEmpty());
+    }
+
+    private interface Service {
+        public void start() throws Exception;
+
+        public void close();
+    }
+
+    private class TestProducer implements Runnable, Service {
+        Thread thread;
+        BytesMessage message;
+        Connection connection;
+        Session session;
+        MessageProducer producer;
+
+        TestProducer(int id) throws Exception {
+            thread = new Thread(this, "TestProducer-" + id);
+            connection = factory.createConnection();
+            connection.start();
+            session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+            producer = session.createProducer(session.createQueue("AMQ2401Test"));
+        }
+
+        @Override
+        public void start() {
+            thread.start();
+        }
+
+        @Override
+        public void run() {
+
+            int i = 1;
+            for (; i <= TO_SEND; i++) {
+                try {
+
+                    if (+i % 100 == 0) {
+                        LOG.info(Thread.currentThread().getName() + " Sending message " + i);
+                    }
+                    message = session.createBytesMessage();
+                    message.writeBytes(new byte[1024]);
+                    producer.setDeliveryMode(deliveryMode);
+                    producer.send(message);
+                } catch (JMSException jmse) {
+                    jmse.printStackTrace();
+                    break;
+                }
+            }
+            LOG.info(Thread.currentThread().getName() + " Sent: " + (i - 1));
+        }
+
+        @Override
+        public void close() {
+            try {
+                connection.close();
+            } catch (JMSException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private class TestConsumer implements Runnable, Service {
+        ActiveMQConnection connection;
+        Session session;
+        MessageConsumer consumer;
+
+        TestConsumer() throws Exception {
+            factory.setOptimizeAcknowledge(false);
+            connection = (ActiveMQConnection) factory.createConnection();
+            if (useOptimizeAcks) {
+                connection.setOptimizeAcknowledge(true);
+            }
+
+            session = connection.createSession(false, ackMode);
+            consumer = session.createConsumer(session.createQueue("AMQ2401Test"));
+
+            consumer.setMessageListener(AMQ2413Test.this);
+        }
+
+        @Override
+        public void start() throws Exception {
+            connection.start();
+        }
+
+        @Override
+        public void close() {
+            try {
+                connection.close();
+            } catch (JMSException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+
+        /*
+         * (non-Javadoc)
+         *
+         * @see java.lang.Runnable#run()
+         */
+        @Override
+        public void run() {
+            while (running.get()) {
+                try {
+                    onMessage(consumer.receive());
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    public static Test suite() {
+        return suite(AMQ2413Test.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java
new file mode 100644
index 0000000..cd447f4
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ2439Test extends JmsMultipleBrokersTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ2439Test.class);
+    Destination dest;
+
+    
+    public void testDuplicatesThroughNetwork() throws Exception {
+        assertEquals("received expected amount", 500, receiveExactMessages("BrokerB", 500));
+        assertEquals("received expected amount", 500, receiveExactMessages("BrokerB", 500));
+        validateQueueStats();
+    }
+    
+    private void validateQueueStats() throws Exception {
+       final BrokerView brokerView = brokers.get("BrokerA").broker.getAdminView();
+       assertEquals("enequeue is correct", 1000, brokerView.getTotalEnqueueCount());
+       
+       assertTrue("dequeue is correct", Wait.waitFor(new Wait.Condition() {
+           public boolean isSatisified() throws Exception {
+               LOG.info("dequeue count (want 1000), is : " + brokerView.getTotalDequeueCount());
+               return 1000 == brokerView.getTotalDequeueCount();
+           }
+       }));
+    }
+
+    protected int receiveExactMessages(String brokerName, int msgCount) throws Exception {
+        
+        BrokerItem brokerItem = brokers.get(brokerName);
+        Connection connection = brokerItem.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);       
+        MessageConsumer consumer = session.createConsumer(dest);
+        
+        Message msg;
+        int i;
+        for (i = 0; i < msgCount; i++) {
+            msg = consumer.receive(1000);
+            if (msg == null) {
+                break;
+            }
+        }
+
+        connection.close();
+        brokerItem.connections.remove(connection);
+        
+        return i;
+    }
+    
+    public void setUp() throws Exception {
+        super.setUp();
+        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=true&deleteAllMessagesOnStartup=true&advisorySupport=false"));
+        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=true&deleteAllMessagesOnStartup=true&useJmx=false"));
+        bridgeBrokers("BrokerA", "BrokerB");
+        
+        startAllBrokers();
+        
+        // Create queue
+        dest = createDestination("TEST.FOO", false);
+        sendMessages("BrokerA", dest, 1000);
+    }   
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java
new file mode 100644
index 0000000..b581e6d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.command.ActiveMQQueue;
+
+/**
+ * In CLIENT_ACKNOWLEDGE and INDIVIDUAL_ACKNOWLEDGE modes following exception
+ * occurs when ASYNCH consumers acknowledges messages in not in order they
+ * received the messages.
+ * <p>
+ * Exception thrown on broker side:
+ * <p>
+ * {@code javax.jms.JMSException: Could not correlate acknowledgment with
+ * dispatched message: MessageAck}
+ * 
+ * @author daroo
+ */
+public class AMQ2489Test extends TestSupport {
+    private final static String SEQ_NUM_PROPERTY = "seqNum";
+
+    private final static int TOTAL_MESSAGES_CNT = 2;
+    private final static int CONSUMERS_CNT = 2;
+
+    private final CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT);
+
+    private Connection connection;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    public void testUnorderedClientAcknowledge() throws Exception {
+        doUnorderedAck(Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    public void testUnorderedIndividualAcknowledge() throws Exception {
+        doUnorderedAck(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+    }
+
+    /**
+     * Main test method
+     * 
+     * @param acknowledgmentMode
+     *            - ACK mode to be used by consumers
+     * @throws Exception
+     */
+    protected void doUnorderedAck(int acknowledgmentMode) throws Exception {
+        List<Consumer> consumers = null;
+        Session producerSession = null;
+
+        connection.start();
+        // Because exception is thrown on broker side only, let's set up
+        // exception listener to get it
+        final TestExceptionListener exceptionListener = new TestExceptionListener();
+        connection.setExceptionListener(exceptionListener);
+        try {
+            consumers = new ArrayList<Consumer>();
+            // start customers
+            for (int i = 0; i < CONSUMERS_CNT; i++) {
+                consumers.add(new Consumer(acknowledgmentMode));
+            }
+
+            // produce few test messages
+            producerSession = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = producerSession
+                    .createProducer(new ActiveMQQueue(getQueueName()));
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            for (int i = 0; i < TOTAL_MESSAGES_CNT; i++) {
+                final Message message = producerSession
+                        .createTextMessage("test");
+                // assign each message sequence number
+                message.setIntProperty(SEQ_NUM_PROPERTY, i);
+                producer.send(message);
+            }
+
+            // during each onMessage() calls consumers decreases the LATCH
+            // counter.
+            // 
+            // so, let's wait till all messages are consumed.
+            //
+            LATCH.await();
+
+            // wait a bit more to give exception listener a chance be populated
+            // with
+            // broker's error
+            TimeUnit.SECONDS.sleep(1);
+
+            assertFalse(exceptionListener.getStatusText(), exceptionListener.hasExceptions());
+
+        } finally {
+            if (producerSession != null)
+                producerSession.close();
+
+            if (consumers != null) {
+                for (Consumer c : consumers) {
+                    c.close();
+                }
+            }
+        }
+    }
+
+    protected String getQueueName() {
+        return getClass().getName() + "." + getName();
+    }
+
+    public final class Consumer implements MessageListener {
+        final Session session;
+
+        private Consumer(int acknowledgmentMode) {
+            try {
+                session = connection.createSession(false, acknowledgmentMode);
+                final Queue queue = session.createQueue(getQueueName()
+                        + "?consumer.prefetchSize=1");
+                final MessageConsumer consumer = session.createConsumer(queue);
+                consumer.setMessageListener(this);
+            } catch (JMSException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+            }
+        }
+
+        public void onMessage(Message message) {
+            try {
+                // retrieve sequence number assigned by producer...
+                final int seqNum = message.getIntProperty(SEQ_NUM_PROPERTY);
+
+                // ...and let's delay every second message a little bit before
+                // acknowledgment
+                if ((seqNum % 2) == 0) {
+                    System.out.println("Delayed message sequence numeber: "
+                            + seqNum);
+                    try {
+                        TimeUnit.SECONDS.sleep(1);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+
+                message.acknowledge();
+            } catch (JMSException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+            } finally {
+                // decrease LATCH counter in the main test method.
+                LATCH.countDown();
+            }
+        }
+
+        private void close() {
+            if (session != null) {
+                try {
+                    session.close();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    public final class TestExceptionListener implements ExceptionListener {
+        private final java.util.Queue<Exception> exceptions = new ConcurrentLinkedQueue<Exception>();
+
+        public void onException(JMSException e) {
+            exceptions.add(e);
+        }
+
+        public boolean hasExceptions() {
+            return exceptions.isEmpty() == false;
+        }
+
+        public String getStatusText() {
+            final StringBuilder str = new StringBuilder();
+            str.append("Exceptions count on broker side: " + exceptions.size()
+                    + ".\nMessages:\n");
+            for (Exception e : exceptions) {
+                str.append(e.getMessage() + "\n\n");
+            }
+            return str.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
new file mode 100644
index 0000000..669066e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.IOHelper;
+
+public class AMQ2512Test extends EmbeddedBrokerTestSupport {
+    private static Connection connection;
+    private final static String QUEUE_NAME = "dee.q";
+    private final static int INITIAL_MESSAGES_CNT = 1000;
+    private final static int WORKER_INTERNAL_ITERATIONS = 100;
+    private final static int TOTAL_MESSAGES_CNT = INITIAL_MESSAGES_CNT * WORKER_INTERNAL_ITERATIONS
+            + INITIAL_MESSAGES_CNT;
+    private final static byte[] payload = new byte[5 * 1024];
+    private final static String TEXT = new String(payload);
+
+    private final static String PRP_INITIAL_ID = "initial-id";
+    private final static String PRP_WORKER_ID = "worker-id";
+
+    private final static CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT);
+
+    private final static AtomicInteger ON_MSG_COUNTER = new AtomicInteger();
+
+    public void testKahaDBFailure() throws Exception {
+        final ConnectionFactory fac = new ActiveMQConnectionFactory(this.bindAddress);
+        connection = fac.createConnection();
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue queue = session.createQueue(QUEUE_NAME);
+        final MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        connection.start();
+
+        final long startTime = System.nanoTime();
+
+        final List<Consumer> consumers = new ArrayList<Consumer>();
+        for (int i = 0; i < 20; i++) {
+            consumers.add(new Consumer("worker-" + i));
+        }
+
+        for (int i = 0; i < INITIAL_MESSAGES_CNT; i++) {
+            final TextMessage msg = session.createTextMessage(TEXT);
+            msg.setStringProperty(PRP_INITIAL_ID, "initial-" + i);
+            producer.send(msg);
+        }
+
+        LATCH.await();
+        final long endTime = System.nanoTime();
+        System.out.println("Total execution time = "
+                + TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [ms].");
+        System.out.println("Rate = " + TOTAL_MESSAGES_CNT
+                / TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [msg/s].");
+
+        for (Consumer c : consumers) {
+            c.close();
+        }
+        connection.close();
+    }
+
+    private final static class Consumer implements MessageListener {
+        private final String name;
+        private final Session session;
+        private final MessageProducer producer;
+
+        private Consumer(String name) {
+            this.name = name;
+            try {
+                session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                final Queue queue = session.createQueue(QUEUE_NAME + "?consumer.prefetchSize=10");
+                producer = session.createProducer(queue);
+                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+                final MessageConsumer consumer = session.createConsumer(queue);
+                consumer.setMessageListener(this);
+            } catch (JMSException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+            }
+        }
+
+        public void onMessage(Message message) {
+            final TextMessage msg = (TextMessage) message;
+            try {
+                if (!msg.propertyExists(PRP_WORKER_ID)) {
+                    for (int i = 0; i < WORKER_INTERNAL_ITERATIONS; i++) {
+                        final TextMessage newMsg = session.createTextMessage(msg.getText());
+                        newMsg.setStringProperty(PRP_WORKER_ID, name + "-" + i);
+                        newMsg.setStringProperty(PRP_INITIAL_ID, msg.getStringProperty(PRP_INITIAL_ID));
+                        producer.send(newMsg);
+                    }
+                }
+                msg.acknowledge();
+
+            } catch (JMSException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+            } finally {
+                final int onMsgCounter = ON_MSG_COUNTER.getAndIncrement();
+                if (onMsgCounter % 1000 == 0) {
+                    System.out.println("message received: " + onMsgCounter);
+                }
+                LATCH.countDown();
+            }
+        }
+
+        private void close() {
+            if (session != null) {
+                try {
+                    session.close();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://0.0.0.0:61617";
+        super.setUp();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        File dataFileDir = new File("target/test-amq-2512/datadb");
+        IOHelper.mkdirs(dataFileDir);
+        IOHelper.deleteChildren(dataFileDir);
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(dataFileDir); 
+        BrokerService answer = new BrokerService();
+        answer.setPersistenceAdapter(kaha);
+      
+        kaha.setEnableJournalDiskSyncs(false);
+        //kaha.setIndexCacheSize(10);
+        answer.setDataDirectoryFile(dataFileDir);
+        answer.setUseJmx(false);
+        answer.addConnector(bindAddress);
+        return answer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java
new file mode 100644
index 0000000..b9cfbd9
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.jmx.ManagementContext;
+
+/**
+ * This unit test verifies an issue when
+ * javax.management.InstanceNotFoundException is thrown after subsequent startups when
+ * managementContext createConnector="false"
+ *
+ */
+public class AMQ2513Test extends TestCase {
+
+    private BrokerService broker;
+    private String connectionUri;
+
+    void createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
+        broker = new BrokerService();
+        broker.setBrokerName("localhost");
+        broker.setUseJmx(true);
+        broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
+        broker.addConnector("tcp://localhost:0");
+
+        ManagementContext ctx = new ManagementContext();
+        //if createConnector == true everything is fine
+        ctx.setCreateConnector(false);
+        broker.setManagementContext(ctx);
+
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+    }
+
+    public void testJmx() throws Exception{
+        createBroker(true);
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(session.createQueue("test"));
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        connection.start();
+
+        producer.send(session.createTextMessage("test123"));
+
+        DestinationViewMBean dv = createView();
+        assertTrue(dv.getQueueSize() > 0);
+
+        connection.close();
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        createBroker(false);
+        factory = new ActiveMQConnectionFactory(connectionUri);
+        connection = factory.createConnection();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(session.createQueue("test"));
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        connection.start();
+        producer.send(session.createTextMessage("test123"));
+        connection.close();
+
+        dv = createView();
+        assertTrue(dv.getQueueSize() > 0);
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+    }
+
+    DestinationViewMBean createView() throws Exception {
+        String domain = "org.apache.activemq";
+        ObjectName name = new ObjectName(domain + ":type=Broker,brokerName=localhost," +
+                                                  "destinationType=Queue,destinationName=test");
+        return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class,
+                true);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java
new file mode 100644
index 0000000..80c036f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.region.Queue;
+import org.junit.Assert;
+
+/**
+ * This test demonstrates a bug in which calling
+ * Queue#removeMatchingMessages("") generates an exception, whereas the JMS
+ * specification states that an empty selector is valid.
+ */
+public class AMQ2528Test extends EmbeddedBrokerTestSupport {
+
+	/**
+	 * Setup the test so that the destination is a queue.
+	 */
+	protected void setUp() throws Exception {
+		useTopic = false;
+		super.setUp();
+	}
+
+	/**
+	 * This test enqueues test messages to destination and then verifies that
+	 * {@link Queue#removeMatchingMessages("")} removes all the messages.
+	 */
+	public void testRemoveMatchingMessages() throws Exception {
+		final int NUM_MESSAGES = 100;
+		final String MESSAGE_ID = "id";
+
+		// Enqueue the test messages.
+		Connection conn = createConnection();
+		try {
+			conn.start();
+			Session session = conn.createSession(false,
+					Session.AUTO_ACKNOWLEDGE);
+			MessageProducer producer = session.createProducer(destination);
+			for (int id = 0; id < NUM_MESSAGES; id++) {
+				Message message = session.createMessage();
+				message.setIntProperty(MESSAGE_ID, id);
+				producer.send(message);
+			}
+			producer.close();
+			session.close();
+		} finally {
+			conn.close();
+		}
+
+		// Verify that half of the messages can be removed by selector.
+		Queue queue = (Queue) broker.getRegionBroker().getDestinations(
+				destination).iterator().next();
+
+		Assert.assertEquals(NUM_MESSAGES / 2, queue
+				.removeMatchingMessages(MESSAGE_ID + " < " + NUM_MESSAGES / 2));
+
+		// Verify that the remainder of the messages can be removed by empty
+		// selector.
+		Assert.assertEquals(NUM_MESSAGES - NUM_MESSAGES / 2, queue
+				.removeMatchingMessages(""));
+	}
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java
new file mode 100644
index 0000000..533ae0c
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+
+public class AMQ2571Test extends EmbeddedBrokerTestSupport {
+
+    public void testTempQueueClosing() {
+        try {
+            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.bindAddress);
+            connectionFactory.setAlwaysSyncSend(true);
+
+            // First create session that will own the TempQueue
+            Connection connectionA = connectionFactory.createConnection();
+            connectionA.start();
+
+            Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            TemporaryQueue tempQueue = sessionA.createTemporaryQueue();
+
+            // Next, create session that will put messages on the queue.
+            Connection connectionB = connectionFactory.createConnection();
+            connectionB.start();
+
+            Session sessionB = connectionB.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Create a producer for connection B.
+            final MessageProducer producerB = sessionB.createProducer(tempQueue);
+            producerB.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+            final TextMessage message = sessionB.createTextMessage("Testing AMQ TempQueue.");
+
+            Thread sendingThread = new Thread(new Runnable() {
+                public void run() {
+                    try {
+                        long end = System.currentTimeMillis() + 5*60*1000;
+                        // wait for exception on send
+                        while (System.currentTimeMillis() < end) {
+                            producerB.send(message);
+                        }
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+
+            // Send 5000 messages.
+            sendingThread.start();
+            // Now close connection A. This will remove the TempQueue.
+            connectionA.close();
+            // Wait for the thread to finish.
+            sendingThread.join(5*60*1000);
+
+            // Sleep for a while to make sure that we should know that the
+            // TempQueue is gone.
+            //Thread.sleep(50);
+
+            // Now we test if we are able to send again.
+            try {
+                producerB.send(message);
+                fail("Involuntary recreated temporary queue.");
+            } catch (JMSException e) {
+                // Got exception, just as we wanted because the creator of
+                // the TempQueue had closed the connection prior to the send.
+                assertTrue("TempQueue does not exist anymore.", true);
+            }
+        } catch (Exception e) {
+            fail("Unexpected exception " + e);
+        }
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = "vm://localhost";
+        setAutoFail(true);
+        super.setUp();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(false);
+        answer.setUseJmx(false);
+        return answer;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java
new file mode 100644
index 0000000..2bcb983
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;      
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+
+public class AMQ2580Test extends TestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ2580Test.class);
+
+    private static final String TOPIC_NAME = "topicName";
+    private static final String CLIENT_ID = "client_id";
+    private static final String textOfSelectedMsg = "good_message";
+
+    protected TopicConnection connection;
+
+    private Topic topic;
+    private Session session;
+    private MessageProducer producer;
+    private ConnectionFactory connectionFactory;
+    private BrokerService service;
+
+    public static Test suite() {
+        return suite(AMQ2580Test.class);
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        initDurableBroker();
+        initConnectionFactory();
+        initTopic();
+    }
+
+    protected void tearDown() throws Exception {
+        shutdownClient();
+        service.stop();
+        super.tearDown();
+    }
+
+    private void initConnection() throws JMSException {
+        if (connection == null) {
+            LOG.info("Initializing connection");
+
+            connection = (TopicConnection) connectionFactory.createConnection();
+            connection.start();
+        }
+    }
+
+    public void initCombosForTestTopicIsDurableSmokeTest() throws Exception {
+        addCombinationValues("defaultPersistenceAdapter", PersistenceAdapterChoice.values());
+    }
+
+    public void testTopicIsDurableSmokeTest() throws Exception {
+
+        initClient();
+        MessageConsumer consumer = createMessageConsumer();
+        LOG.info("Consuming message");
+        assertNull(consumer.receive(1));
+        shutdownClient();
+        consumer.close();
+
+        sendMessages();
+        shutdownClient();
+
+        initClient();
+        consumer = createMessageConsumer();
+
+        LOG.info("Consuming message");
+        TextMessage answer1 = (TextMessage) consumer.receive(1000);
+        assertNotNull("we got our message", answer1);
+
+        consumer.close();
+    }
+
+    private MessageConsumer createMessageConsumer() throws JMSException {
+        LOG.info("creating durable subscriber");
+        return session.createDurableSubscriber(topic,
+                TOPIC_NAME,
+                "name='value'",
+                false);
+    }
+
+    private void initClient() throws JMSException {
+        LOG.info("Initializing client");
+
+        initConnection();
+        initSession();
+    }
+
+    private void shutdownClient()
+            throws JMSException {
+        LOG.info("Closing session and connection");
+        session.close();
+        connection.close();
+        session = null;
+        connection = null;
+    }
+
+    private void sendMessages()
+            throws JMSException {
+        initConnection();
+
+        initSession();
+
+        LOG.info("Creating producer");
+        producer = session.createProducer(topic);
+
+        sendMessageThatFailsSelection();
+
+        sendMessage(textOfSelectedMsg, "value");
+    }
+
+    private void initSession() throws JMSException {
+        LOG.info("Initializing session");
+        session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    private void sendMessageThatFailsSelection() throws JMSException {
+        for (int i = 0; i < 5; i++) {
+            String textOfNotSelectedMsg = "Msg_" + i;
+            sendMessage(textOfNotSelectedMsg, "not_value");
+            LOG.info("#");
+        }
+    }
+
+    private void sendMessage(
+            String msgText,
+            String propertyValue) throws JMSException {
+        LOG.info("Creating message: " + msgText);
+        TextMessage messageToSelect = session.createTextMessage(msgText);
+        messageToSelect.setStringProperty("name", propertyValue);
+        LOG.info("Sending message");
+        producer.send(messageToSelect);
+    }
+
+    protected void initConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory activeMqConnectionFactory = createActiveMqConnectionFactory();
+        connectionFactory = activeMqConnectionFactory;
+    }
+
+
+    private ActiveMQConnectionFactory createActiveMqConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory(
+                "failover:" + service.getTransportConnectors().get(0).getConnectUri().toString());
+        activeMqConnectionFactory.setWatchTopicAdvisories(false);
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setDurableTopicPrefetch(2);
+        prefetchPolicy.setOptimizeDurableTopicPrefetch(2);
+        activeMqConnectionFactory.setPrefetchPolicy(prefetchPolicy);
+        activeMqConnectionFactory.setClientID(CLIENT_ID);
+        return activeMqConnectionFactory;
+    }
+
+    private void initDurableBroker() throws Exception {
+        service = new BrokerService();
+        setDefaultPersistenceAdapter(service);
+        service.setDeleteAllMessagesOnStartup(true);
+        service.setAdvisorySupport(false);
+        service.setTransportConnectorURIs(new String[]{"tcp://localhost:0"});
+        service.setPersistent(true);
+        service.setUseJmx(false);
+        service.start();
+
+    }
+
+    private void initTopic() throws JMSException {
+        initConnection();
+        TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        topic = topicSession.createTopic(TOPIC_NAME);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
new file mode 100644
index 0000000..3e41dc9
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// variation on AMQ2584 where the DLQ consumer works in parallel to producer so
+// that some dups are not suppressed as they are already acked by the consumer
+// the audit needs to be disabled to allow these dupes to be consumed
+public class AMQ2584ConcurrentDlqTest extends org.apache.activemq.TestSupport {
+
+    static final Logger LOG = LoggerFactory.getLogger(AMQ2584ConcurrentDlqTest.class);
+    BrokerService broker = null;
+    ActiveMQTopic topic;
+
+    ActiveMQConnection consumerConnection = null, producerConnection = null, dlqConnection = null;
+    Session consumerSession;
+    Session producerSession;
+    MessageProducer producer;
+    Vector<TopicSubscriber> duralbeSubs = new Vector<TopicSubscriber>();
+    final int numMessages = 1000;
+    final int numDurableSubs = 2;
+
+    String data;
+    private long dlqConsumerLastReceivedTimeStamp;
+    private AtomicLong dlqReceivedCount = new AtomicLong(0);
+
+    // 2 deliveries of each message, 3 producers
+    CountDownLatch redeliveryConsumerLatch = new CountDownLatch(((2 * numMessages) * numDurableSubs) - 1);
+    // should get at least numMessages, possibly more
+    CountDownLatch dlqConsumerLatch = new CountDownLatch((numMessages - 1));
+
+    public void testSize() throws Exception {
+        openConsumer(redeliveryConsumerLatch);
+        openDlqConsumer(dlqConsumerLatch);
+
+
+        assertEquals(0, broker.getAdminView().getStorePercentUsage());
+
+        for (int i = 0; i < numMessages; i++) {
+            sendMessage(false);
+        }
+
+        final BrokerView brokerView = broker.getAdminView();
+
+        broker.getSystemUsage().getStoreUsage().isFull();
+        LOG.info("store percent usage: " + brokerView.getStorePercentUsage());
+        assertTrue("redelivery consumer got all it needs, remaining: "
+                + redeliveryConsumerLatch.getCount(), redeliveryConsumerLatch.await(60, TimeUnit.SECONDS));
+        assertTrue("dql  consumer got all it needs", dlqConsumerLatch.await(60, TimeUnit.SECONDS));
+        closeConsumer();
+
+        LOG.info("Giving dlq a chance to clear down once topic consumer is closed");
+
+        // consumer all of the duplicates that arrived after the first ack
+        closeDlqConsumer();
+
+        //get broker a chance to clean obsolete messages, wait 2*cleanupInterval
+        Thread.sleep(5000);
+
+        FilenameFilter justLogFiles = new FilenameFilter() {
+            public boolean accept(File file, String s) {
+                return s.endsWith(".log");
+            }
+        };
+        int numFiles = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getDirectory().list(justLogFiles).length;
+        if (numFiles > 2) {
+            LOG.info(Arrays.toString(((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getDirectory().list(justLogFiles)));
+        }
+        LOG.info("num files: " + numFiles);
+        assertEquals("kahaDB dir should contain 1 db file,is: " + numFiles, 1, numFiles);
+    }
+
+    private void openConsumer(final CountDownLatch latch) throws Exception {
+        consumerConnection = (ActiveMQConnection) createConnection();
+        consumerConnection.setClientID("cliID");
+        consumerConnection.start();
+        consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageListener listener = new MessageListener() {
+            public void onMessage(Message message) {
+                latch.countDown();
+                try {
+                    consumerSession.recover();
+                } catch (Exception ignored) {
+                    ignored.printStackTrace();
+                }
+            }
+        };
+
+        for (int i = 1; i <= numDurableSubs; i++) {
+            TopicSubscriber sub = consumerSession.createDurableSubscriber(topic, "subName" + i);
+            sub.setMessageListener(listener);
+            duralbeSubs.add(sub);
+        }
+    }
+
+    private void openDlqConsumer(final CountDownLatch received) throws Exception {
+
+        dlqConnection = (ActiveMQConnection) createConnection();
+        Session dlqSession = dlqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+        dlqConsumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                if (received.getCount() > 0 && received.getCount() % 200 == 0) {
+                    LOG.info("remaining on DLQ: " + received.getCount());
+                }
+                received.countDown();
+                dlqConsumerLastReceivedTimeStamp = System.currentTimeMillis();
+                dlqReceivedCount.incrementAndGet();
+            }
+        });
+        dlqConnection.start();
+    }
+
+
+    private void closeConsumer() throws JMSException {
+        for (TopicSubscriber sub : duralbeSubs) {
+            sub.close();
+        }
+        if (consumerSession != null) {
+            for (int i = 1; i <= numDurableSubs; i++) {
+                consumerSession.unsubscribe("subName" + i);
+            }
+        }
+        if (consumerConnection != null) {
+            consumerConnection.close();
+            consumerConnection = null;
+        }
+    }
+
+    private void closeDlqConsumer() throws JMSException, InterruptedException {
+        final long limit = System.currentTimeMillis() + 30 * 1000;
+        if (dlqConsumerLastReceivedTimeStamp > 0) {
+            while (System.currentTimeMillis() < dlqConsumerLastReceivedTimeStamp + 5000
+                    && System.currentTimeMillis() < limit) {
+                LOG.info("waiting for DLQ do drain, receivedCount: " + dlqReceivedCount);
+                TimeUnit.SECONDS.sleep(1);
+            }
+        }
+        if (dlqConnection != null) {
+            dlqConnection.close();
+            dlqConnection = null;
+        }
+    }
+
+    private void sendMessage(boolean filter) throws Exception {
+        if (producerConnection == null) {
+            producerConnection = (ActiveMQConnection) createConnection();
+            producerConnection.start();
+            producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            producer = producerSession.createProducer(topic);
+        }
+
+        Message message = producerSession.createMessage();
+        message.setStringProperty("data", data);
+        producer.send(message);
+    }
+
+    private void startBroker(boolean deleteMessages) throws Exception {
+        broker = new BrokerService();
+        broker.setAdvisorySupport(false);
+        broker.setBrokerName("testStoreSize");
+
+        PolicyMap map = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setEnableAudit(false);
+        map.setDefaultEntry(entry);
+        broker.setDestinationPolicy(map);
+
+        if (deleteMessages) {
+            broker.setDeleteAllMessagesOnStartup(true);
+        }
+        configurePersistenceAdapter(broker.getPersistenceAdapter());
+        broker.getSystemUsage().getStoreUsage().setLimit(200 * 1000 * 1000);
+        broker.start();
+    }
+
+    private void configurePersistenceAdapter(PersistenceAdapter persistenceAdapter) {
+        Properties properties = new Properties();
+        String maxFileLengthVal = String.valueOf(2 * 1024 * 1024);
+        properties.put("journalMaxFileLength", maxFileLengthVal);
+        properties.put("maxFileLength", maxFileLengthVal);
+        properties.put("cleanupInterval", "2000");
+        properties.put("checkpointInterval", "2000");
+        // there are problems with duplicate dispatch in the cursor, which maintain
+        // a map of messages. A dup dispatch can be dropped.
+        // see: org.apache.activemq.broker.region.cursors.OrderedPendingList
+        // Adding duplicate detection to the default DLQ strategy removes the problem
+        // which means we can leave the default for concurrent store and dispatch q
+        //properties.put("concurrentStoreAndDispatchQueues", "false");
+
+        IntrospectionSupport.setProperties(persistenceAdapter, properties);
+    }
+
+    private void stopBroker() throws Exception {
+        if (broker != null)
+            broker.stop();
+        broker = null;
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://testStoreSize?jms.watchTopicAdvisories=false&jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0&waitForStart=5000&create=false");
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        StringBuilder sb = new StringBuilder(5000);
+        for (int i = 0; i < 5000; i++) {
+            sb.append('a');
+        }
+        data = sb.toString();
+
+        startBroker(true);
+        topic = (ActiveMQTopic) createDestination();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        stopBroker();
+        super.tearDown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java
new file mode 100644
index 0000000..b84b9ab
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class AMQ2584Test extends org.apache.activemq.TestSupport {
+
+    static final Logger LOG = LoggerFactory.getLogger(AMQ2584Test.class);
+    BrokerService broker = null;
+    ActiveMQTopic topic;
+
+    ActiveMQConnection consumerConnection = null, producerConnection = null;
+    Session producerSession;
+    MessageProducer producer;
+    final int minPercentUsageForStore = 3;
+    String data;
+
+    private final TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;
+
+    @Parameterized.Parameters(name="{0}")
+    public static Collection<TestSupport.PersistenceAdapterChoice[]> getTestParameters() {
+        TestSupport.PersistenceAdapterChoice[] kahaDb = {TestSupport.PersistenceAdapterChoice.KahaDB};
+        TestSupport.PersistenceAdapterChoice[] levelDb = {TestSupport.PersistenceAdapterChoice.LevelDB};
+        List<TestSupport.PersistenceAdapterChoice[]> choices = new ArrayList<TestSupport.PersistenceAdapterChoice[]>();
+        choices.add(kahaDb);
+        choices.add(levelDb);
+
+        return choices;
+    }
+
+    public AMQ2584Test(TestSupport.PersistenceAdapterChoice choice) {
+        this.persistenceAdapterChoice = choice;
+    }
+
+    @Test(timeout = 120000)
+    public void testSize() throws Exception {
+        int messages = 1000;
+        CountDownLatch redeliveryConsumerLatch = new CountDownLatch((messages*3));
+        openConsumer(redeliveryConsumerLatch);
+
+        assertEquals(0, broker.getAdminView().getStorePercentUsage());
+
+        for (int i = 0; i < messages; i++) {
+            sendMessage(false);
+        }
+
+        final BrokerView brokerView = broker.getAdminView();
+
+        broker.getSystemUsage().getStoreUsage().isFull();
+        LOG.info("store percent usage: "+brokerView.getStorePercentUsage());
+        int storePercentUsage = broker.getAdminView().getStorePercentUsage();
+        assertTrue("some store in use", storePercentUsage > minPercentUsageForStore);
+
+        assertTrue("redelivery consumer got all it needs", redeliveryConsumerLatch.await(60, TimeUnit.SECONDS));
+        closeConsumer();
+
+        // consume from DLQ
+        final CountDownLatch received = new CountDownLatch(messages);
+        consumerConnection = (ActiveMQConnection) createConnection();
+        Session dlqSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+        dlqConsumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                if (received.getCount() % 500 == 0) {
+                    LOG.info("remaining on DLQ: " + received.getCount());
+                }
+                received.countDown();
+            }
+        });
+        consumerConnection.start();
+
+        assertTrue("Not all messages reached the DLQ", received.await(60, TimeUnit.SECONDS));
+
+        assertTrue("Store usage exceeds expected usage",
+                Wait.waitFor(new Wait.Condition() {
+                    @Override
+                    public boolean isSatisified() throws Exception {
+                        broker.getSystemUsage().getStoreUsage().isFull();
+                        LOG.info("store precent usage: "+brokerView.getStorePercentUsage());
+                        return broker.getAdminView().getStorePercentUsage() < minPercentUsageForStore;
+                    }
+                }));
+
+         closeConsumer();
+
+    }
+
+    private void openConsumer(final CountDownLatch latch) throws Exception {
+        consumerConnection = (ActiveMQConnection) createConnection();
+        consumerConnection.setClientID("cliID");
+        consumerConnection.start();
+        final Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageListener listener = new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+                try {
+                    session.recover();
+                } catch (Exception ignored) {
+                    ignored.printStackTrace();
+                }
+
+            }
+        };
+
+        session.createDurableSubscriber(topic, "subName1").setMessageListener(listener);
+        session.createDurableSubscriber(topic, "subName2").setMessageListener(listener);
+        session.createDurableSubscriber(topic, "subName3").setMessageListener(listener);
+    }
+
+    private void closeConsumer() throws JMSException {
+        if (consumerConnection != null)
+            consumerConnection.close();
+        consumerConnection = null;
+    }
+
+    private void sendMessage(boolean filter) throws Exception {
+        if (producerConnection == null) {
+            producerConnection = (ActiveMQConnection) createConnection();
+            producerConnection.start();
+            producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            producer = producerSession.createProducer(topic);
+        }
+
+        Message message = producerSession.createMessage();
+        message.setStringProperty("data", data);
+        producer.send(message);
+    }
+
+    private void startBroker(boolean deleteMessages) throws Exception {
+        broker = new BrokerService();
+        broker.setAdvisorySupport(false);
+        broker.setBrokerName("testStoreSize");
+
+        if (deleteMessages) {
+            broker.setDeleteAllMessagesOnStartup(true);
+        }
+        LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString());
+        setPersistenceAdapter(broker, persistenceAdapterChoice);
+        configurePersistenceAdapter(broker.getPersistenceAdapter());
+        broker.getSystemUsage().getStoreUsage().setLimit(200 * 1000 * 1000);
+        broker.start();
+    }
+
+    private void configurePersistenceAdapter(PersistenceAdapter persistenceAdapter) {
+        Properties properties = new Properties();
+        String maxFileLengthVal = String.valueOf(1 * 1024 * 1024);
+        properties.put("journalMaxFileLength", maxFileLengthVal);
+        properties.put("maxFileLength", maxFileLengthVal);
+        properties.put("cleanupInterval", "2000");
+        properties.put("checkpointInterval", "2000");
+
+        IntrospectionSupport.setProperties(persistenceAdapter, properties);
+    }
+
+    private void stopBroker() throws Exception {
+        if (broker != null)
+            broker.stop();
+        broker = null;
+    }
+
+    @Override
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://testStoreSize?jms.watchTopicAdvisories=false&jms.redeliveryPolicy.maximumRedeliveries=0&jms.closeTimeout=60000&waitForStart=5000&create=false");
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        StringBuilder sb = new StringBuilder(5000);
+        for (int i = 0; i < 5000; i++) {
+            sb.append('a');
+        }
+        data = sb.toString();
+
+        startBroker(true);
+        topic = (ActiveMQTopic) createDestination();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        stopBroker();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java
new file mode 100644
index 0000000..3f515d9
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.EmbeddedBrokerAndConnectionTestSupport;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.spring.ConsumerBean;
+
+public class AMQ2585Test extends EmbeddedBrokerAndConnectionTestSupport {
+    private final Destination destination = new ActiveMQQueue("MyQueue");
+    final static String LENGTH10STRING = "1234567890";
+    private Session session;
+    private MessageProducer producer;
+    private ConsumerBean messageList;
+
+    public void testOneMessageWithProperties() throws Exception {
+        TextMessage message = session.createTextMessage(LENGTH10STRING);
+        message.setStringProperty(LENGTH10STRING, LENGTH10STRING);
+        producer.send(message);
+
+        messageList.assertMessagesArrived(1);
+
+        ActiveMQTextMessage received = ((ActiveMQTextMessage) messageList
+                .flushMessages().get(0));
+
+        assertEquals(LENGTH10STRING, received.getText());
+        assertTrue(received.getProperties().size() > 0);
+        assertTrue(received.propertyExists(LENGTH10STRING));
+        assertEquals(LENGTH10STRING, received.getStringProperty(LENGTH10STRING));
+
+        /**
+         * As specified by getSize(), the size (memory usage) of the body should
+         * be length of text * 2. Unsure of how memory usage is calculated for
+         * properties, but should probably not be less than the sum of (string)
+         * lengths for the key name and value.
+         */
+
+        final int sizeShouldBeNoLessThan = LENGTH10STRING.length() * 4 + Message.DEFAULT_MINIMUM_MESSAGE_SIZE;
+        assertTrue("Message size was smaller than expected: " + received.getSize(),
+                received.getSize() >= sizeShouldBeNoLessThan);
+        assertFalse(LENGTH10STRING.length() * 2 == received.getSize());
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = bindAddress + "?marshal=true";
+        super.setUp();
+        messageList = new ConsumerBean();
+        messageList.setVerbose(true);
+
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer messageConsumer = session.createConsumer(destination);
+
+        messageConsumer.setMessageListener(messageList);
+
+        producer = session.createProducer(destination);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java
new file mode 100644
index 0000000..4f6f168
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.IOHelper;
+
+public class AMQ2616Test extends TestCase {
+    private static final int NUMBER = 2000;
+    private BrokerService brokerService;
+    private final ArrayList<Thread> threads = new ArrayList<Thread>();
+    private final String ACTIVEMQ_BROKER_BIND = "tcp://0.0.0.0:0";
+    private final AtomicBoolean shutdown = new AtomicBoolean();
+
+    private String connectionUri;
+
+    public void testQueueResourcesReleased() throws Exception{
+        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(connectionUri);
+        Connection tempConnection = fac.createConnection();
+        tempConnection.start();
+        Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue tempQueue = tempSession.createTemporaryQueue();
+
+        Connection testConnection = fac.createConnection();
+        long startUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
+        Session testSession = testConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer testProducer = testSession.createProducer(tempQueue);
+        byte[] payload = new byte[1024*4];
+        for (int i = 0; i < NUMBER; i++ ) {
+            BytesMessage msg = testSession.createBytesMessage();
+            msg.writeBytes(payload);
+            testProducer.send(msg);
+        }
+        long endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
+        assertFalse(startUsage==endUsage);
+        tempConnection.close();
+        Thread.sleep(1000);
+        endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
+        assertEquals(startUsage,endUsage);
+    }
+
+
+    @Override
+    protected void setUp() throws Exception {
+        // Start an embedded broker up.
+        brokerService = new BrokerService();
+
+        KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
+        adaptor.setEnableJournalDiskSyncs(false);
+        File file = new File("target/AMQ2616Test");
+        IOHelper.mkdirs(file);
+        IOHelper.deleteChildren(file);
+        adaptor.setDirectory(file);
+        brokerService.setPersistenceAdapter(adaptor);
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry pe = new PolicyEntry();
+        pe.setMemoryLimit(10 * 1024 * 1024);
+        pe.setOptimizedDispatch(true);
+        pe.setProducerFlowControl(false);
+        pe.setExpireMessagesPeriod(1000);
+        pe.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
+        policyMap.put(new ActiveMQQueue(">"), pe);
+        brokerService.setDestinationPolicy(policyMap);
+        brokerService.getSystemUsage().getMemoryUsage().setLimit(20 * 1024 * 1024);
+        brokerService.getSystemUsage().getTempUsage().setLimit(200 * 1024 * 1024);
+        brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
+
+        new ActiveMQQueue(getName());
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        // Stop any running threads.
+        shutdown.set(true);
+        for (Thread t : threads) {
+            t.interrupt();
+            t.join();
+        }
+        brokerService.stop();
+    }
+
+}


Mime
View raw message