activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [30/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:37:01 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
new file mode 100644
index 0000000..9120937
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QueuePurgeTest extends CombinationTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(QueuePurgeTest.class);
+    private static final int NUM_TO_SEND = 20000;
+    private final String MESSAGE_TEXT = new String(new byte[1024]);
+    BrokerService broker;
+    ConnectionFactory factory;
+    Connection connection;
+    Session session;
+    Queue queue;
+    MessageConsumer consumer;
+
+    protected void setUp() throws Exception {
+        setMaxTestTime(10*60*1000); // 10 mins
+        setAutoFail(true);
+        super.setUp();
+        broker = new BrokerService();
+
+        File testDataDir = new File("target/activemq-data/QueuePurgeTest");
+        broker.setDataDirectoryFile(testDataDir);
+        broker.setUseJmx(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.getSystemUsage().getMemoryUsage().setLimit(1024l*1024*64);
+        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
+        persistenceAdapter.setDirectory(new File(testDataDir, "kahadb"));
+        broker.setPersistenceAdapter(persistenceAdapter);
+        broker.addConnector("tcp://localhost:0");
+        broker.start();
+        factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString());
+        connection = factory.createConnection();
+        connection.start();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (consumer != null) {
+            consumer.close();
+        }
+        session.close();
+        connection.stop();
+        connection.close();
+        broker.stop();
+    }
+
+    public void testPurgeLargeQueue() throws Exception {
+        applyBrokerSpoolingPolicy();
+        createProducerAndSendMessages(NUM_TO_SEND);
+        QueueViewMBean proxy = getProxyToQueueViewMBean();
+        LOG.info("purging..");
+
+        org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger(org.apache.activemq.broker.region.Queue.class);
+        final AtomicBoolean gotPurgeLogMessage = new AtomicBoolean(false);
+
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getMessage() instanceof String) {
+                    String message = (String) event.getMessage();
+                    if (message.contains("purged of " + NUM_TO_SEND +" messages")) {
+                        LOG.info("Received a log message: {} ", event.getMessage());
+                        gotPurgeLogMessage.set(true);
+                    }
+                }
+            }
+        };
+
+        Level level = log4jLogger.getLevel();
+        log4jLogger.setLevel(Level.INFO);
+        log4jLogger.addAppender(appender);
+        try {
+
+            proxy.purge();
+
+        } finally {
+            log4jLogger.setLevel(level);
+            log4jLogger.removeAppender(appender);
+        }
+
+        assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
+                proxy.getQueueSize());
+        assertTrue("cache is disabled, temp store being used", !proxy.isCacheEnabled());
+        assertTrue("got expected info purge log message", gotPurgeLogMessage.get());
+    }
+
+    public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {       
+        applyBrokerSpoolingPolicy();
+        final int expiryPeriod = 500;
+        applyExpiryDuration(expiryPeriod);
+        createProducerAndSendMessages(NUM_TO_SEND);
+        QueueViewMBean proxy = getProxyToQueueViewMBean();
+        LOG.info("waiting for expiry to kick in a bunch of times to verify it does not blow mem");
+        Thread.sleep(5000);
+        assertEquals("Queue size is has not changed " + proxy.getQueueSize(), NUM_TO_SEND,
+                proxy.getQueueSize());
+    }
+    
+
+    private void applyExpiryDuration(int i) {
+        broker.getDestinationPolicy().getDefaultEntry().setExpireMessagesPeriod(i);
+    }
+
+    private void applyBrokerSpoolingPolicy() {
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setProducerFlowControl(false);
+        PendingQueueMessageStoragePolicy pendingQueuePolicy = new FilePendingQueueMessageStoragePolicy();
+        defaultEntry.setPendingQueuePolicy(pendingQueuePolicy);
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+    }
+
+    
+    public void testPurgeLargeQueueWithConsumer() throws Exception {       
+        applyBrokerSpoolingPolicy();
+        createProducerAndSendMessages(NUM_TO_SEND);
+        QueueViewMBean proxy = getProxyToQueueViewMBean();
+        createConsumer();
+        long start = System.currentTimeMillis();
+        LOG.info("purging..");
+        proxy.purge();
+        LOG.info("purge done: " + (System.currentTimeMillis() - start) + "ms");
+        assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
+                proxy.getQueueSize());
+        assertEquals("usage goes to duck", 0, proxy.getMemoryPercentUsage());
+        Message msg;
+        do {
+            msg = consumer.receive(1000);
+            if (msg != null) {
+                msg.acknowledge();
+            }
+        } while (msg != null);
+        assertEquals("Queue size not valid", 0, proxy.getQueueSize());
+    }
+
+    private QueueViewMBean getProxyToQueueViewMBean()
+            throws MalformedObjectNameException, JMSException {
+        ObjectName queueViewMBeanName =
+                new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+                + queue.getQueueName());
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueViewMBeanName,
+                        QueueViewMBean.class, true);
+        return proxy;
+    }
+
+    private void createProducerAndSendMessages(int numToSend) throws Exception {
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        queue = session.createQueue("test1");
+        MessageProducer producer = session.createProducer(queue);
+        for (int i = 0; i < numToSend; i++) {
+            TextMessage message = session.createTextMessage(MESSAGE_TEXT + i);
+            if (i  != 0 && i % 10000 == 0) {
+                LOG.info("sent: " + i);
+            }
+            producer.send(message);
+        }
+        producer.close();
+    }
+
+    private void createConsumer() throws Exception {
+        consumer = session.createConsumer(queue);
+        // wait for buffer fill out
+        Thread.sleep(5 * 1000);
+        for (int i = 0; i < 500; ++i) {
+            Message message = consumer.receive();
+            message.acknowledge();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java
new file mode 100644
index 0000000..0439fa8
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.io.File;
+
+import static org.junit.matchers.JUnitMatchers.containsString;
+import static org.junit.Assert.*;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.broker.BrokerService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.*;
+
+/**
+ * Confirm that the broker does not resend unacknowledged messages during a broker shutdown.
+ */
+public class QueueResendDuringShutdownTest {
+    private static final Logger         LOG = LoggerFactory.getLogger(QueueResendDuringShutdownTest.class);
+    public static final int             NUM_CONNECTION_TO_TEST = 8;
+
+    private static boolean              iterationFoundFailure = false;
+
+    private BrokerService               broker;
+    private ActiveMQConnectionFactory   factory;
+    private Connection[]                connections;
+    private Connection                  producerConnection;
+    private Queue                       queue;
+
+    private Object                      messageReceiveSync = new Object();
+    private int                         receiveCount;
+
+    @Before
+    public void setUp () throws Exception {
+        this.receiveCount = 0;
+
+        this.broker = new BrokerService();
+        this.broker.setPersistent(false);
+        this.broker.start();
+        this.broker.waitUntilStarted();
+
+        this.factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
+        this.queue = new ActiveMQQueue("TESTQUEUE");
+
+        connections = new Connection[NUM_CONNECTION_TO_TEST];
+        int iter = 0;
+        while ( iter < NUM_CONNECTION_TO_TEST ) {
+            this.connections[iter] = factory.createConnection();
+            iter++;
+        }
+
+        this.producerConnection = factory.createConnection();
+        this.producerConnection.start();
+    }
+
+    @After
+    public void cleanup () throws Exception {
+        for ( Connection oneConnection : connections ) {
+            if ( oneConnection != null ) {
+                closeConnection(oneConnection);
+            }
+        }
+        connections = null;
+
+        if ( this.producerConnection != null ) {
+            closeConnection(this.producerConnection);
+            this.producerConnection = null;
+        }
+
+        this.broker.stop();
+        this.broker.waitUntilStopped();
+    }
+
+    @Test(timeout=3000)
+    public void testRedeliverAtBrokerShutdownAutoAckMsgListenerIter1 () throws Throwable {
+        runTestIteration();
+    }
+
+    @Test(timeout=3000)
+    public void testRedeliverAtBrokerShutdownAutoAckMsgListenerIter2 () throws Throwable {
+        runTestIteration();
+    }
+
+    @Test(timeout=3000)
+    public void testRedeliverAtBrokerShutdownAutoAckMsgListenerIter3 () throws Throwable {
+        runTestIteration();
+    }
+
+    /**
+     * Run one iteration of the test, skipping it if a failure was found on a prior iteration since a single failure is
+     * enough.  Also keep track of the state of failure for the iteration.
+     */
+    protected void  runTestIteration () throws Throwable {
+        if ( iterationFoundFailure ) {
+            LOG.info("skipping test iteration; failure previously detected");
+            return;
+        } try {
+            testRedeliverAtBrokerShutdownAutoAckMsgListener();
+        } catch ( Throwable thrown ) {
+            iterationFoundFailure = true;
+            throw thrown;
+        }
+    }
+
+    protected void  testRedeliverAtBrokerShutdownAutoAckMsgListener () throws Exception {
+        // Start consumers on all of the connections
+        for ( Connection oneConnection : connections ) {
+            MessageConsumer consumer = startupConsumer(oneConnection, false, Session.AUTO_ACKNOWLEDGE);
+            configureMessageListener(consumer);
+            oneConnection.start();
+        }
+
+        // Send one message to the Queue and wait a short time for the dispatch to occur.
+        this.sendMessage();
+        waitForMessage(1000);
+
+        // Verify one consumer received it
+        assertEquals(1, this.receiveCount);
+
+        // Shutdown the broker
+        this.broker.stop();
+        this.broker.waitUntilStopped();
+        delay(100, "give queue time flush");
+
+        // Verify still only one consumer received it
+        assertEquals(1, this.receiveCount);
+    }
+
+    /**
+     * Start a consumer on the given connection using the session transaction and acknowledge settings given.
+     */
+    protected MessageConsumer   startupConsumer (Connection conn, boolean transInd, int ackMode)
+    throws JMSException {
+        Session         sess;
+        MessageConsumer consumer;
+
+        sess = conn.createSession(transInd, ackMode);
+        consumer = sess.createConsumer(queue);
+
+        return  consumer;
+    }
+
+    /**
+     * Mark the receipt of a message from one of the consumers.
+     */
+    protected void  messageReceived () {
+        synchronized ( this ) {
+            this.receiveCount++;
+            synchronized ( this.messageReceiveSync ) {
+                this.messageReceiveSync.notifyAll();
+            }
+        }
+    }
+
+    /**
+     * Setup the MessageListener for the given consumer.  The listener uses a long delay on receiving the message to
+     * simulate the reported case of problems at shutdown caused by a message listener's connection closing while it is
+     * still processing.
+     */
+    protected void  configureMessageListener (MessageConsumer consumer) throws JMSException {
+        final MessageConsumer   fConsumer = consumer;
+
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage (Message msg) {
+                LOG.debug("got a message on consumer {}", fConsumer);
+                messageReceived();
+
+                // Delay long enough for the consumer to get closed while this delay is active.
+                delay(3000, "pause so connection shutdown leads to unacked message redelivery");
+            }
+        });
+    }
+
+    /**
+     * Send a test message now.
+     */
+    protected void  sendMessage () throws JMSException {
+        Session sess = this.producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer prod = sess.createProducer(queue);
+        prod.send(sess.createTextMessage("X-TEST-MSG-X"));
+        prod.close();
+        sess.close();
+    }
+
+    /**
+     * Close the given connection safely and log any exception caught.
+     */
+    protected void  closeConnection (Connection conn) {
+        try {
+            conn.close();
+        } catch ( JMSException jmsExc ) {
+            LOG.info("failed to cleanup connection", jmsExc);
+        }
+    }
+
+    /**
+     * Pause for the given length of time, in milliseconds, logging an interruption if one occurs.  Don't try to
+     * recover from interrupt - the test case does not support interrupting and such an occurrence likely means the
+     * test is being aborted.
+     */
+    protected void  delay (long delayMs, String desc) {
+        try {
+            Thread.sleep(delayMs);
+        } catch ( InterruptedException intExc ) {
+            LOG.warn("sleep interrupted: " + desc, intExc);
+        }
+    }
+
+    /**
+     * Wait up to the specified duration for a message to be received by any consumer.
+     */
+    protected void  waitForMessage (long delayMs) {
+        try {
+            synchronized ( this.messageReceiveSync ) {
+                if ( this.receiveCount == 0 ) {
+                    this.messageReceiveSync.wait(delayMs);
+                }
+            }
+        } catch ( InterruptedException intExc ) {
+            LOG.warn("sleep interrupted: wait for message to arrive");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
new file mode 100644
index 0000000..6964842
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.InvalidSelectorException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import junit.framework.TestCase;
+
+public class SubscriptionAddRemoveQueueTest extends TestCase {
+
+    Queue queue;
+
+    ConsumerInfo info = new ConsumerInfo();
+    List<SimpleImmediateDispatchSubscription> subs = new ArrayList<SimpleImmediateDispatchSubscription>();
+    ConnectionContext context = new ConnectionContext();
+    ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
+    ProducerInfo producerInfo = new ProducerInfo();
+    ProducerState producerState = new ProducerState(producerInfo);
+    ActiveMQDestination destination = new ActiveMQQueue("TEST");
+    int numSubscriptions = 1000;
+    boolean working = true;
+    int senders = 20;
+
+
+    @Override
+    public void setUp() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.start();
+        DestinationStatistics parentStats = new DestinationStatistics();
+        parentStats.setEnabled(true);
+
+        TaskRunnerFactory taskFactory = new TaskRunnerFactory();
+        MessageStore store = null;
+
+        info.setDestination(destination);
+        info.setPrefetchSize(100);
+
+        producerBrokerExchange.setProducerState(producerState);
+        producerBrokerExchange.setConnectionContext(context);
+
+        queue = new Queue(brokerService, destination, store, parentStats, taskFactory);
+        queue.initialize();
+    }
+
+    public void testNoDispatchToRemovedConsumers() throws Exception {
+        final AtomicInteger producerId = new AtomicInteger();
+        Runnable sender = new Runnable() {
+            public void run() {
+                AtomicInteger id = new AtomicInteger();
+                int producerIdAndIncrement = producerId.getAndIncrement();
+                while (working) {
+                    try {
+                        Message msg = new ActiveMQMessage();
+                        msg.setDestination(destination);
+                        msg.setMessageId(new MessageId(producerIdAndIncrement + ":0:" + id.getAndIncrement()));
+                        queue.send(producerBrokerExchange, msg);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        fail("unexpected exception in sendMessage, ex:" + e);
+                    }
+                }
+            }
+        };
+
+        Runnable subRemover = new Runnable() {
+            public void run() {
+                for (Subscription sub : subs) {
+                    try {
+                        queue.removeSubscription(context, sub, 0);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        fail("unexpected exception in removeSubscription, ex:" + e);
+                    }
+                }
+            }
+        };
+
+        for (int i=0;i<numSubscriptions; i++) {
+            SimpleImmediateDispatchSubscription sub = new SimpleImmediateDispatchSubscription();
+            subs.add(sub);
+            queue.addSubscription(context, sub);
+        }
+        assertEquals("there are X subscriptions", numSubscriptions, queue.getDestinationStatistics().getConsumers().getCount());
+        ExecutorService executor = Executors.newCachedThreadPool();
+        for (int i=0; i<senders ; i++) {
+            executor.submit(sender);
+        }
+
+        Thread.sleep(1000);
+        for (SimpleImmediateDispatchSubscription sub : subs) {
+            assertTrue("There are some locked messages in the subscription", hasSomeLocks(sub.dispatched));
+        }
+
+        Future<?> result = executor.submit(subRemover);
+        result.get();
+        working = false;
+        assertEquals("there are no subscriptions", 0, queue.getDestinationStatistics().getConsumers().getCount());
+
+        for (SimpleImmediateDispatchSubscription sub : subs) {
+            assertTrue("There are no locked messages in any removed subscriptions", !hasSomeLocks(sub.dispatched));
+        }
+
+    }
+
+    private boolean hasSomeLocks(List<MessageReference> dispatched) {
+        boolean hasLock = false;
+        for (MessageReference mr: dispatched) {
+            QueueMessageReference qmr = (QueueMessageReference) mr;
+            if (qmr.getLockOwner() != null) {
+                hasLock = true;
+                break;
+            }
+        }
+        return hasLock;
+    }
+
+    public class SimpleImmediateDispatchSubscription implements Subscription, LockOwner {
+
+        List<MessageReference> dispatched =
+            Collections.synchronizedList(new ArrayList<MessageReference>());
+
+        public void acknowledge(ConnectionContext context, MessageAck ack)
+                throws Exception {
+        }
+
+        public void add(MessageReference node) throws Exception {
+            // immediate dispatch
+            QueueMessageReference  qmr = (QueueMessageReference)node;
+            qmr.lock(this);
+            dispatched.add(qmr);
+        }
+
+        public ConnectionContext getContext() {
+            return null;
+        }
+
+        @Override
+        public int getCursorMemoryHighWaterMark() {
+            return 0;
+        }
+
+        @Override
+        public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+        }
+
+        @Override
+        public boolean isSlowConsumer() {
+            return false;
+        }
+
+        @Override
+        public void unmatched(MessageReference node) throws IOException {
+        }
+
+        @Override
+        public long getTimeOfLastMessageAck() {
+            return 0;
+        }
+
+        @Override
+        public long getConsumedCount() {
+            return 0;
+        }
+
+        @Override
+        public void incrementConsumedCount() {
+        }
+
+        @Override
+        public void resetConsumedCount() {
+        }
+
+        public void add(ConnectionContext context, Destination destination)
+                throws Exception {
+        }
+
+        public void destroy() {
+        }
+
+        public void gc() {
+        }
+
+        public ConsumerInfo getConsumerInfo() {
+            return info;
+        }
+
+        public long getDequeueCounter() {
+            return 0;
+        }
+
+        public long getDispatchedCounter() {
+            return 0;
+        }
+
+        public int getDispatchedQueueSize() {
+            return 0;
+        }
+
+        public long getEnqueueCounter() {
+            return 0;
+        }
+
+        public int getInFlightSize() {
+            return 0;
+        }
+
+        public int getInFlightUsage() {
+            return 0;
+        }
+
+        public ObjectName getObjectName() {
+            return null;
+        }
+
+        public int getPendingQueueSize() {
+            return 0;
+        }
+
+        public int getPrefetchSize() {
+            return 0;
+        }
+
+        public String getSelector() {
+            return null;
+        }
+
+        public boolean isBrowser() {
+            return false;
+        }
+
+        public boolean isFull() {
+            return false;
+        }
+
+        public boolean isHighWaterMark() {
+            return false;
+        }
+
+        public boolean isLowWaterMark() {
+            return false;
+        }
+
+        public boolean isRecoveryRequired() {
+            return false;
+        }
+
+        public boolean isSlave() {
+            return false;
+        }
+
+        public boolean matches(MessageReference node,
+                MessageEvaluationContext context) throws IOException {
+            return true;
+        }
+
+        public boolean matches(ActiveMQDestination destination) {
+            return false;
+        }
+
+        public void processMessageDispatchNotification(
+                MessageDispatchNotification mdn) throws Exception {
+        }
+
+        public Response pullMessage(ConnectionContext context, MessagePull pull)
+                throws Exception {
+            return null;
+        }
+
+        @Override
+        public boolean isWildcard() {
+            return false;
+        }
+
+        public List<MessageReference> remove(ConnectionContext context,
+                Destination destination) throws Exception {
+            return new ArrayList<MessageReference>(dispatched);
+        }
+
+        public void setObjectName(ObjectName objectName) {
+        }
+
+        public void setSelector(String selector)
+                throws InvalidSelectorException, UnsupportedOperationException {
+        }
+
+        public void updateConsumerPrefetch(int newPrefetch) {
+        }
+
+        public boolean addRecoveredMessage(ConnectionContext context,
+                MessageReference message) throws Exception {
+            return false;
+        }
+
+        public ActiveMQDestination getActiveMQDestination() {
+            return null;
+        }
+
+        public int getLockPriority() {
+            return 0;
+        }
+
+        public boolean isLockExclusive() {
+            return false;
+        }
+
+        public void addDestination(Destination destination) {
+        }
+
+        public void removeDestination(Destination destination) {
+        }
+
+        public int countBeforeFull() {
+            return 10;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/UniquePropertyMessageEvictionStrategyTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/UniquePropertyMessageEvictionStrategyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/UniquePropertyMessageEvictionStrategyTest.java
new file mode 100644
index 0000000..5657e5c
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/UniquePropertyMessageEvictionStrategyTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.UniquePropertyMessageEvictionStrategy;
+
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniquePropertyMessageEvictionStrategyTest extends EmbeddedBrokerTestSupport {
+
+
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker =  super.createBroker();
+                final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+        final PolicyEntry entry = new PolicyEntry();
+        entry.setTopic(">");
+
+        entry.setAdvisoryForDiscardingMessages(true);
+        entry.setTopicPrefetch(1);
+
+        ConstantPendingMessageLimitStrategy pendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy();
+        pendingMessageLimitStrategy.setLimit(10);
+        entry.setPendingMessageLimitStrategy(pendingMessageLimitStrategy);
+
+
+        UniquePropertyMessageEvictionStrategy messageEvictionStrategy = new UniquePropertyMessageEvictionStrategy();
+        messageEvictionStrategy.setPropertyName("sequenceI");
+        entry.setMessageEvictionStrategy(messageEvictionStrategy);
+
+        // let evicted messages disappear
+        entry.setDeadLetterStrategy(null);
+        policyEntries.add(entry);
+
+        final PolicyMap policyMap = new PolicyMap();
+        policyMap.setPolicyEntries(policyEntries);
+        broker.setDestinationPolicy(policyMap);
+
+        return broker;
+    }
+
+    public void testEviction() throws Exception {
+        Connection conn = connectionFactory.createConnection();
+        conn.start();
+        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        javax.jms.Topic destination = session.createTopic("TEST");
+
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        for (int i = 0; i < 10; i++) {
+            for (int j = 0; j < 10; j++) {
+                TextMessage msg = session.createTextMessage("message " + i + j);
+                msg.setIntProperty("sequenceI", i);
+                msg.setIntProperty("sequenceJ", j);
+                producer.send(msg);
+                Thread.sleep(100);
+            }
+        }
+
+
+        for (int i = 0; i < 11; i++) {
+            javax.jms.Message msg = consumer.receive(1000);
+            assertNotNull(msg);
+            int seqI = msg.getIntProperty("sequenceI");
+            int seqJ = msg.getIntProperty("sequenceJ");
+            if (i ==0 ) {
+                assertEquals(0, seqI);
+                assertEquals(0, seqJ);
+            } else {
+                    assertEquals(9, seqJ);
+                    assertEquals(i - 1, seqI);
+            }
+            //System.out.println(msg.getIntProperty("sequenceI") + " " + msg.getIntProperty("sequenceJ"));
+        }
+
+        javax.jms.Message msg = consumer.receive(1000);
+        assertNull(msg);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
new file mode 100644
index 0000000..12589ca
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.activemq.broker.BrokerService;
+
+/**
+ * 
+ */
+public class CursorDurableTest extends CursorSupport {
+
+    protected Destination getDestination(Session session) throws JMSException {
+        String topicName = getClass().getName();
+        return session.createTopic(topicName);
+    }
+
+    protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException {
+        Connection connection = fac.createConnection();
+        connection.setClientID("testConsumer");
+        connection.start();
+        return connection;
+    }
+
+    protected MessageConsumer getConsumer(Connection connection) throws Exception {
+        Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = (Topic)getDestination(consumerSession);
+        MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, "testConsumer");
+        return consumer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
new file mode 100644
index 0000000..b0668d9
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy;
+
+/**
+ * 
+ */
+public class CursorQueueStoreTest extends CursorSupport {
+
+    protected Destination getDestination(Session session) throws JMSException {
+        String queueName = "QUEUE" + getClass().getName();
+        return session.createQueue(queueName);
+    }
+
+    protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException {
+        Connection connection = fac.createConnection();
+        connection.setClientID("testConsumer");
+        connection.start();
+        return connection;
+    }
+
+    protected MessageConsumer getConsumer(Connection connection) throws Exception {
+        Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination dest = getDestination(consumerSession);
+        MessageConsumer consumer = consumerSession.createConsumer(dest);
+        return consumer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        PolicyEntry policy = new PolicyEntry();
+        policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        answer.setDestinationPolicy(pMap);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+    
+    public static Test suite() {
+        return suite(CursorQueueStoreTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
new file mode 100644
index 0000000..3c014a6
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ *
+ */
+public abstract class CursorSupport extends CombinationTestSupport {
+
+    public int MESSAGE_COUNT = 500;
+    public int PREFETCH_SIZE = 50;
+    private static final Logger LOG = LoggerFactory.getLogger(CursorSupport.class);
+
+    protected BrokerService broker;
+    protected String bindAddress = "tcp://localhost:60706";
+
+    protected abstract Destination getDestination(Session session) throws JMSException;
+
+    protected abstract MessageConsumer getConsumer(Connection connection) throws Exception;
+
+    protected abstract void configureBroker(BrokerService answer) throws Exception;
+
+    public void testSendFirstThenConsume() throws Exception {
+        ConnectionFactory factory = createConnectionFactory();
+        Connection consumerConnection = getConsumerConnection(factory);
+        MessageConsumer consumer = getConsumer(consumerConnection);
+        consumerConnection.close();
+        Connection producerConnection = factory.createConnection();
+        producerConnection.start();
+        Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(getDestination(session));
+        List<Message> senderList = new ArrayList<Message>();
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = session.createTextMessage("test" + i);
+            senderList.add(msg);
+            producer.send(msg);
+        }
+        producerConnection.close();
+        // now consume the messages
+        consumerConnection = getConsumerConnection(factory);
+        // create durable subs
+        consumer = getConsumer(consumerConnection);
+        List<Message> consumerList = new ArrayList<Message>();
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = consumer.receive(1000*5);
+            assertNotNull("Message "+i+" was missing.", msg);
+            consumerList.add(msg);
+        }
+        assertEquals(senderList, consumerList);
+        consumerConnection.close();
+    }
+
+
+    public void initCombosForTestSendWhilstConsume() {
+        addCombinationValues("MESSAGE_COUNT", new Object[] {Integer.valueOf(400),
+                                                           Integer.valueOf(500)});
+        addCombinationValues("PREFETCH_SIZE", new Object[] {Integer.valueOf(100),
+                Integer.valueOf(50)});
+    }
+
+    public void testSendWhilstConsume() throws Exception {
+        ConnectionFactory factory = createConnectionFactory();
+        Connection consumerConnection = getConsumerConnection(factory);
+        // create durable subs
+        MessageConsumer consumer = getConsumer(consumerConnection);
+        consumerConnection.close();
+        Connection producerConnection = factory.createConnection();
+        producerConnection.start();
+        Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(getDestination(session));
+        List<TextMessage> senderList = new ArrayList<TextMessage>();
+        for (int i = 0; i < MESSAGE_COUNT / 10; i++) {
+            TextMessage msg = session.createTextMessage("test" + i);
+            senderList.add(msg);
+            producer.send(msg);
+        }
+        // now consume the messages
+        consumerConnection = getConsumerConnection(factory);
+        // create durable subs
+        consumer = getConsumer(consumerConnection);
+        final List<Message> consumerList = new ArrayList<Message>();
+        final CountDownLatch latch = new CountDownLatch(1);
+        consumer.setMessageListener(new MessageListener() {
+
+            public void onMessage(Message msg) {
+                try {
+                    // sleep to act as a slow consumer
+                    // which will force a mix of direct and polled dispatching
+                    // using the cursor on the broker
+                    Thread.sleep(50);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+                consumerList.add(msg);
+                if (consumerList.size() == MESSAGE_COUNT) {
+                    latch.countDown();
+                }
+            }
+        });
+        for (int i = MESSAGE_COUNT / 10; i < MESSAGE_COUNT; i++) {
+            TextMessage msg = session.createTextMessage("test" + i);
+            senderList.add(msg);
+            producer.send(msg);
+        }
+        latch.await(300000, TimeUnit.MILLISECONDS);
+        producerConnection.close();
+        consumerConnection.close();
+        assertEquals("Still dipatching - count down latch not sprung", latch.getCount(), 0);
+        //assertEquals("cosumerList - expected: " + MESSAGE_COUNT + " but was: " + consumerList.size(), consumerList.size(), senderList.size());
+        for (int i = 0; i < senderList.size(); i++) {
+            Message sent = senderList.get(i);
+            Message consumed = consumerList.get(i);
+            if (!sent.equals(consumed)) {
+                LOG.error("BAD MATCH AT POS " + i);
+                LOG.error(sent.toString());
+                LOG.error(consumed.toString());
+                /*
+                 * log.error("\n\n\n\n\n"); for (int j = 0; j <
+                 * consumerList.size(); j++) { log.error(consumerList.get(j)); }
+                 */
+            }
+            assertEquals("This should be the same at pos " + i + " in the list", sent.getJMSMessageID(), consumed.getJMSMessageID());
+        }
+    }
+
+    protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException {
+        Connection connection = fac.createConnection();
+        connection.setClientID("testConsumer");
+        connection.start();
+        return connection;
+    }
+
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(bindAddress);
+        Properties props = new Properties();
+        props.setProperty("prefetchPolicy.durableTopicPrefetch", "" + PREFETCH_SIZE);
+        props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch", "" + PREFETCH_SIZE);
+        props.setProperty("prefetchPolicy.queuePrefetch", "" + PREFETCH_SIZE);
+        cf.setProperties(props);
+        return cf;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
new file mode 100644
index 0000000..123263d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.store.PList;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FilePendingMessageCursorTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursorTestSupport.class);
+    protected BrokerService brokerService;
+    protected  FilePendingMessageCursor underTest;
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.getTempDataStore().stop();
+        }
+    }
+
+    private void createBrokerWithTempStoreLimit() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setUseJmx(false);
+        SystemUsage usage = brokerService.getSystemUsage();
+        usage.getTempUsage().setLimit(1025*1024*15);
+
+        // put something in the temp store to on demand initialise it
+        PList dud = brokerService.getTempDataStore().getPList("dud");
+        dud.addFirst("A", new ByteSequence("A".getBytes()));
+    }
+
+    @Test
+    public void testAddToEmptyCursorWhenTempStoreIsFull() throws Exception {
+        createBrokerWithTempStoreLimit();
+        SystemUsage usage = brokerService.getSystemUsage();
+        assertTrue("temp store is full: %" + usage.getTempUsage().getPercentUsage(), usage.getTempUsage().isFull());
+
+        underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);
+        underTest.setSystemUsage(usage);
+
+        // ok to add
+        underTest.addMessageLast(QueueMessageReference.NULL_MESSAGE);
+
+        assertFalse("cursor is not full", underTest.isFull());
+    }
+
+    @Test
+    public void testResetClearsIterator() throws Exception {
+        createBrokerWithTempStoreLimit();
+
+        underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);
+        // ok to add
+        underTest.addMessageLast(QueueMessageReference.NULL_MESSAGE);
+
+        underTest.reset();
+        underTest.release();
+
+        try {
+            underTest.hasNext();
+            fail("expect npe on use of iterator after release");
+        } catch (NullPointerException expected) {}
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
new file mode 100644
index 0000000..1401b35
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
@@ -0,0 +1,421 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.AutoFailTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.StoreUsage;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.usage.TempUsage;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Modified CursorSupport Unit test to reproduce the negative queue issue.
+ *
+ * Keys to reproducing:
+ * 1) Consecutive queues with listener on first sending to second queue
+ * 2) Push each queue to the memory limit
+ *      This seems to help reproduce the issue more consistently, but
+ *      we have seen times in our production environment where the
+ *      negative queue can occur without. Our memory limits are
+ *      very high in production and it still happens in varying
+ *      frequency.
+ * 3) Prefetch
+ *      Lowering the prefetch down to 10 and below seems to help
+ *      reduce occurrences.
+ * 4) # of consumers per queue
+ *      The issue occurs less with fewer consumers
+ *
+ * Things that do not affect reproduction:
+ * 1) Spring - we use spring in our production applications, but this test case works
+ *      with or without it.
+ * 2) transacted
+ *
+ */
+public class NegativeQueueTest extends AutoFailTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(NegativeQueueTest.class);
+
+    public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS");
+
+    private static final String QUEUE_1_NAME = "conn.test.queue.1";
+    private static final String QUEUE_2_NAME = "conn.test.queue.2";
+
+    private static final long QUEUE_MEMORY_LIMIT = 2097152;
+    private static final long MEMORY_USAGE = 400000000;
+    private static final long TEMP_USAGE = 200000000;
+    private static final long STORE_USAGE = 1000000000;
+    // ensure we exceed the cache 70%
+    private static final int MESSAGE_COUNT = 2100;
+
+    protected static final boolean TRANSACTED = true;
+    protected static final boolean DEBUG = true;
+    protected static int NUM_CONSUMERS = 20;
+    protected static int PREFETCH_SIZE = 1000;
+
+    protected BrokerService broker;
+    protected String bindAddress = "tcp://localhost:0";
+
+    public void testWithDefaultPrefetch() throws Exception{
+        PREFETCH_SIZE = 1000;
+        NUM_CONSUMERS = 20;
+        blastAndConsume();
+    }
+
+    public void x_testWithDefaultPrefetchFiveConsumers() throws Exception{
+        PREFETCH_SIZE = 1000;
+        NUM_CONSUMERS = 5;
+        blastAndConsume();
+    }
+
+    public void x_testWithDefaultPrefetchTwoConsumers() throws Exception{
+        PREFETCH_SIZE = 1000;
+        NUM_CONSUMERS = 2;
+        blastAndConsume();
+    }
+
+    public void testWithDefaultPrefetchOneConsumer() throws Exception{
+        PREFETCH_SIZE = 1000;
+        NUM_CONSUMERS = 1;
+        blastAndConsume();
+    }
+
+    public void testWithMediumPrefetch() throws Exception{
+        PREFETCH_SIZE = 50;
+        NUM_CONSUMERS = 20;
+        blastAndConsume();
+    }
+
+    public void x_testWithSmallPrefetch() throws Exception{
+        PREFETCH_SIZE = 10;
+        NUM_CONSUMERS = 20;
+        blastAndConsume();
+    }
+
+    public void testWithNoPrefetch() throws Exception{
+        PREFETCH_SIZE = 1;
+        NUM_CONSUMERS = 20;
+        blastAndConsume();
+    }
+
+    public void blastAndConsume() throws Exception {
+        LOG.info(getName());
+        ConnectionFactory factory = createConnectionFactory();
+
+        //get proxy queues for statistics lookups
+        Connection proxyConnection = factory.createConnection();
+        proxyConnection.start();
+        Session proxySession = proxyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final QueueViewMBean proxyQueue1 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_1_NAME));
+        final QueueViewMBean proxyQueue2 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_2_NAME));
+
+        // LOAD THE QUEUE
+        Connection producerConnection = factory.createConnection();
+        producerConnection.start();
+        Session session = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
+        Destination queue = session.createQueue(QUEUE_1_NAME);
+        MessageProducer producer = session.createProducer(queue);
+        List<TextMessage> senderList = new ArrayList<TextMessage>();
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            TextMessage msg = session.createTextMessage(i + " " + formatter.format(new Date()));
+            senderList.add(msg);
+            producer.send(msg);
+            if(TRANSACTED) session.commit();
+            if(DEBUG && i%100 == 0){
+                int index = (i/100)+1;
+                System.out.print(index-((index/10)*10));
+            }
+        }
+
+        //get access to the Queue info
+        if(DEBUG){
+            System.out.println("");
+            System.out.println("Queue1 Size = "+proxyQueue1.getQueueSize());
+            System.out.println("Queue1 Memory % Used = "+proxyQueue1.getMemoryPercentUsage());
+            System.out.println("Queue1 Memory Available = "+proxyQueue1.getMemoryLimit());
+        }
+
+        // FLUSH THE QUEUE
+        final CountDownLatch latch1 = new CountDownLatch(1);
+        final CountDownLatch latch2 = new CountDownLatch(1);
+        Connection[] consumerConnections1 = new Connection[NUM_CONSUMERS];
+        List<Message> consumerList1 = new ArrayList<Message>();
+        Connection[] consumerConnections2 = new Connection[NUM_CONSUMERS];
+        Connection[] producerConnections2 = new Connection[NUM_CONSUMERS];
+        List<Message> consumerList2 = new ArrayList<Message>();
+
+        for(int ix=0; ix<NUM_CONSUMERS; ix++){
+            producerConnections2[ix] = factory.createConnection();
+            producerConnections2[ix].start();
+            consumerConnections1[ix] = getConsumerConnection(factory);
+            Session consumerSession = consumerConnections1[ix].createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_1_NAME));
+            consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix], consumerSession, QUEUE_2_NAME, latch1, consumerList1));
+        }
+
+        latch1.await(200000, TimeUnit.MILLISECONDS);
+        if(DEBUG){
+            System.out.println("");
+            System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize());
+            System.out.println("Queue2 Memory % Used = "+proxyQueue2.getMemoryPercentUsage());
+            System.out.println("Queue2 Memory Available = "+proxyQueue2.getMemoryLimit());
+        }
+
+        for(int ix=0; ix<NUM_CONSUMERS; ix++){
+            consumerConnections2[ix] = getConsumerConnection(factory);
+            Session consumerSession = consumerConnections2[ix].createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_2_NAME));
+            consumer.setMessageListener(new SessionAwareMessageListener(consumerSession, latch2, consumerList2));
+        }
+
+        boolean success = Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                boolean done = latch2.await(10, TimeUnit.SECONDS);
+                if(DEBUG){
+                    System.out.println("");
+                    System.out.println("Queue1 Size = "+proxyQueue1.getQueueSize());
+                    System.out.println("Queue1 Memory % Used = "+proxyQueue1.getMemoryPercentUsage());
+                    System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize());
+                    System.out.println("Queue2 Memory % Used = "+proxyQueue2.getMemoryPercentUsage());
+                    System.out.println("Queue2 Memory Available = "+proxyQueue2.getMemoryLimit());
+                }
+                return done;
+            }
+        }, 300 * 1000);
+        if (!success) {
+            dumpAllThreads("blocked waiting on 2");
+        }
+        assertTrue("got all expected messages on 2", success);
+
+        producerConnection.close();
+        for(int ix=0; ix<NUM_CONSUMERS; ix++){
+            consumerConnections1[ix].close();
+            consumerConnections2[ix].close();
+            producerConnections2[ix].close();
+        }
+
+        //let the consumer statistics on queue2 have time to update
+        Thread.sleep(500);
+
+        if(DEBUG){
+            System.out.println("");
+            System.out.println("Queue1 Size = "+proxyQueue1.getQueueSize());
+            System.out.println("Queue1 Memory % Used = "+proxyQueue1.getMemoryPercentUsage());
+            System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize());
+            System.out.println("Queue2 Memory % Used = "+proxyQueue2.getMemoryPercentUsage());
+        }
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == proxyQueue1.getQueueSize();
+            }});
+        assertEquals("Queue1 has gone negative,",0, proxyQueue1.getQueueSize());
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == proxyQueue2.getQueueSize();
+            }});
+        assertEquals("Queue2 has gone negative,",0, proxyQueue2.getQueueSize());
+        proxyConnection.close();
+
+    }
+
+    private QueueViewMBean getProxyToQueueViewMBean(Queue queue) throws MalformedObjectNameException, JMSException {
+        final String prefix = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=";
+
+        ObjectName queueViewMBeanName = new ObjectName(prefix + queue.getQueueName());
+        QueueViewMBean proxy = (QueueViewMBean)
+            broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+        return proxy;
+    }
+
+   protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException {
+        Connection connection = fac.createConnection();
+        connection.start();
+        return connection;
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(bindAddress);
+        Properties props = new Properties();
+        props.setProperty("prefetchPolicy.durableTopicPrefetch", "" + PREFETCH_SIZE);
+        props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch", "" + PREFETCH_SIZE);
+        props.setProperty("prefetchPolicy.queuePrefetch", "" + PREFETCH_SIZE);
+        cf.setProperties(props);
+        return cf;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        answer.waitUntilStarted();
+        bindAddress = answer.getTransportConnectors().get(0).getConnectUri().toString();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        PolicyEntry policy = new PolicyEntry();
+        policy.setMemoryLimit(QUEUE_MEMORY_LIMIT);
+        policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
+
+        // disable the cache to be sure setBatch is the problem
+        // will get lots of duplicates
+        // real problem is sync between cursor and store add - leads to out or order messages
+        // in the cursor so setBatch can break.
+        // policy.setUseCache(false);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        answer.setDestinationPolicy(pMap);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.addConnector("tcp://localhost:0");
+
+        MemoryUsage memoryUsage = new MemoryUsage();
+        memoryUsage.setLimit(MEMORY_USAGE);
+        memoryUsage.setPercentUsageMinDelta(20);
+
+        TempUsage tempUsage = new TempUsage();
+        tempUsage.setLimit(TEMP_USAGE);
+
+        StoreUsage storeUsage = new StoreUsage();
+        storeUsage.setLimit(STORE_USAGE);
+
+        SystemUsage systemUsage = new SystemUsage();
+        systemUsage.setMemoryUsage(memoryUsage);
+        systemUsage.setTempUsage(tempUsage);
+        systemUsage.setStoreUsage(storeUsage);
+        answer.setSystemUsage(systemUsage);
+    }
+
+    /**
+     * Message listener that is given the Session for transacted consumers
+     */
+    class SessionAwareMessageListener implements MessageListener{
+        private final List<Message> consumerList;
+        private final CountDownLatch latch;
+        private final Session consumerSession;
+        private Session producerSession;
+        private MessageProducer producer;
+
+        public SessionAwareMessageListener(Session consumerSession, CountDownLatch latch, List<Message> consumerList){
+            this(null, consumerSession, null, latch, consumerList);
+        }
+
+        public SessionAwareMessageListener(Connection producerConnection, Session consumerSession, String outQueueName,
+                CountDownLatch latch, List<Message> consumerList){
+            this.consumerList = consumerList;
+            this.latch = latch;
+            this.consumerSession = consumerSession;
+
+            if(producerConnection != null){
+                try {
+                    producerSession = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
+                    Destination queue = producerSession.createQueue(outQueueName);
+                    producer = producerSession.createProducer(queue);
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        @Override
+        public void onMessage(Message msg) {
+            try {
+                if(producer == null){
+                    // sleep to act as a slow consumer
+                    // which will force a mix of direct and polled dispatching
+                    // using the cursor on the broker
+                    Thread.sleep(50);
+                }else{
+                    producer.send(msg);
+                    if(TRANSACTED) producerSession.commit();
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+            synchronized(consumerList){
+                consumerList.add(msg);
+                if(DEBUG && consumerList.size()%100 == 0) {
+                    int index = consumerList.size()/100;
+                    System.out.print(index-((index/10)*10));
+                }
+                if (consumerList.size() == MESSAGE_COUNT) {
+                    latch.countDown();
+                }
+            }
+            if(TRANSACTED){
+                try {
+                    consumerSession.commit();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
new file mode 100644
index 0000000..79d7e6c
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
@@ -0,0 +1,444 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.util.IdGenerator;
+import org.junit.Test;
+
+public class OrderPendingListTest {
+
+    @Test
+    public void testAddMessageFirst() throws Exception {
+
+        OrderedPendingList list = new OrderedPendingList();
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+
+        Iterator<MessageReference> iter = list.iterator();
+        int lastId = list.size();
+        while (iter.hasNext()) {
+            assertEquals(lastId--, iter.next().getMessageId().getProducerSequenceId());
+        }
+    }
+
+    @Test
+    public void testAddMessageLast() throws Exception {
+
+        OrderedPendingList list = new OrderedPendingList();
+
+        list.addMessageLast(new TestMessageReference(1));
+        list.addMessageLast(new TestMessageReference(2));
+        list.addMessageLast(new TestMessageReference(3));
+        list.addMessageLast(new TestMessageReference(4));
+        list.addMessageLast(new TestMessageReference(5));
+
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+
+        Iterator<MessageReference> iter = list.iterator();
+        int lastId = 1;
+        while (iter.hasNext()) {
+            assertEquals(lastId++, iter.next().getMessageId().getProducerSequenceId());
+        }
+    }
+
+    @Test
+    public void testClear() throws Exception {
+        OrderedPendingList list = new OrderedPendingList();
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertFalse(list.isEmpty());
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+
+        list.clear();
+
+        assertTrue(list.isEmpty());
+        assertTrue(list.size() == 0);
+        assertEquals(0, list.getAsList().size());
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageLast(new TestMessageReference(2));
+        list.addMessageLast(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageLast(new TestMessageReference(5));
+
+        assertFalse(list.isEmpty());
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+    }
+
+    @Test
+    public void testIsEmpty() throws Exception {
+        OrderedPendingList list = new OrderedPendingList();
+        assertTrue(list.isEmpty());
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertFalse(list.isEmpty());
+        list.clear();
+        assertTrue(list.isEmpty());
+    }
+
+    @Test
+    public void testSize() {
+        OrderedPendingList list = new OrderedPendingList();
+        assertTrue(list.isEmpty());
+
+        assertTrue(list.size() == 0);
+        list.addMessageFirst(new TestMessageReference(1));
+        assertTrue(list.size() == 1);
+        list.addMessageLast(new TestMessageReference(2));
+        assertTrue(list.size() == 2);
+        list.addMessageFirst(new TestMessageReference(3));
+        assertTrue(list.size() == 3);
+        list.addMessageLast(new TestMessageReference(4));
+        assertTrue(list.size() == 4);
+        list.addMessageFirst(new TestMessageReference(5));
+        assertTrue(list.size() == 5);
+
+        assertFalse(list.isEmpty());
+        list.clear();
+        assertTrue(list.isEmpty());
+        assertTrue(list.size() == 0);
+    }
+
+    @Test
+    public void testRemove() throws Exception {
+
+        OrderedPendingList list = new OrderedPendingList();
+
+        TestMessageReference toRemove = new TestMessageReference(6);
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+
+        list.addMessageLast(toRemove);
+        list.remove(toRemove);
+
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+
+        list.remove(toRemove);
+
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+
+        Iterator<MessageReference> iter = list.iterator();
+        int lastId = list.size();
+        while (iter.hasNext()) {
+            assertEquals(lastId--, iter.next().getMessageId().getProducerSequenceId());
+        }
+
+        list.remove(null);
+    }
+
+    @Test
+    public void testContains() throws Exception {
+
+        OrderedPendingList list = new OrderedPendingList();
+
+        TestMessageReference toRemove = new TestMessageReference(6);
+
+        assertFalse(list.contains(toRemove));
+        assertFalse(list.contains(null));
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+
+        list.addMessageLast(toRemove);
+        assertTrue(list.size() == 6);
+        assertTrue(list.contains(toRemove));
+        list.remove(toRemove);
+        assertFalse(list.contains(toRemove));
+
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+    }
+
+    @Test
+    public void testValues() throws Exception {
+
+        OrderedPendingList list = new OrderedPendingList();
+
+        TestMessageReference toRemove = new TestMessageReference(6);
+
+        assertFalse(list.contains(toRemove));
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        Collection<MessageReference> values = list.values();
+        assertEquals(5, values.size());
+
+        for (MessageReference msg : values) {
+            assertTrue(values.contains(msg));
+        }
+
+        assertFalse(values.contains(toRemove));
+
+        list.addMessageLast(toRemove);
+        values = list.values();
+        assertEquals(6, values.size());
+        for (MessageReference msg : values) {
+            assertTrue(values.contains(msg));
+        }
+
+        assertTrue(values.contains(toRemove));
+    }
+
+    @Test
+    public void testAddAll() throws Exception {
+        OrderedPendingList list = new OrderedPendingList();
+        TestPendingList source = new TestPendingList();
+
+        source.addMessageFirst(new TestMessageReference(1));
+        source.addMessageFirst(new TestMessageReference(2));
+        source.addMessageFirst(new TestMessageReference(3));
+        source.addMessageFirst(new TestMessageReference(4));
+        source.addMessageFirst(new TestMessageReference(5));
+
+        assertTrue(list.isEmpty());
+        assertEquals(5, source.size());
+        list.addAll(source);
+        assertEquals(5, list.size());
+
+        for (MessageReference message : source) {
+            assertTrue(list.contains(message));
+        }
+
+        list.addAll(null);
+    }
+
+    static class TestPendingList implements PendingList {
+
+        private final LinkedList<MessageReference> theList = new LinkedList<MessageReference>();
+
+        @Override
+        public boolean isEmpty() {
+            return theList.isEmpty();
+        }
+
+        @Override
+        public void clear() {
+            theList.clear();
+        }
+
+        @Override
+        public PendingNode addMessageFirst(MessageReference message) {
+            theList.addFirst(message);
+            return new PendingNode(null, message);
+        }
+
+        @Override
+        public PendingNode addMessageLast(MessageReference message) {
+            theList.addLast(message);
+            return new PendingNode(null, message);
+        }
+
+        @Override
+        public PendingNode remove(MessageReference message) {
+            if (theList.remove(message)) {
+                return new PendingNode(null, message);
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        public int size() {
+            return theList.size();
+        }
+
+        @Override
+        public Iterator<MessageReference> iterator() {
+            return theList.iterator();
+        }
+
+        @Override
+        public boolean contains(MessageReference message) {
+            return theList.contains(message);
+        }
+
+        @Override
+        public Collection<MessageReference> values() {
+            return theList;
+        }
+
+        @Override
+        public void addAll(PendingList pendingList) {
+            for(MessageReference messageReference : pendingList) {
+                theList.add(messageReference);
+            }
+        }
+
+        @Override
+        public MessageReference get(MessageId messageId) {
+            for(MessageReference messageReference : theList) {
+                if (messageReference.getMessageId().equals(messageId)) {
+                    return messageReference;
+                }
+            }
+            return null;
+        }
+    }
+
+    static class TestMessageReference implements MessageReference {
+
+        private static final IdGenerator id = new IdGenerator();
+
+        private MessageId messageId;
+        private int referenceCount = 0;
+
+        public TestMessageReference(int sequenceId) {
+            messageId = new MessageId(id.generateId() + ":1", sequenceId);
+        }
+
+        @Override
+        public MessageId getMessageId() {
+            return messageId;
+        }
+
+        @Override
+        public Message getMessageHardRef() {
+            return null;
+        }
+
+        @Override
+        public Message getMessage() {
+            return null;
+        }
+
+        @Override
+        public boolean isPersistent() {
+            return false;
+        }
+
+        @Override
+        public Destination getRegionDestination() {
+            return null;
+        }
+
+        @Override
+        public int getRedeliveryCounter() {
+            return 0;
+        }
+
+        @Override
+        public void incrementRedeliveryCounter() {
+        }
+
+        @Override
+        public int getReferenceCount() {
+            return this.referenceCount;
+        }
+
+        @Override
+        public int incrementReferenceCount() {
+            return this.referenceCount++;
+        }
+
+        @Override
+        public int decrementReferenceCount() {
+            return this.referenceCount--;
+        }
+
+        @Override
+        public ConsumerId getTargetConsumerId() {
+            return null;
+        }
+
+        @Override
+        public int getSize() {
+            return 1;
+        }
+
+        @Override
+        public long getExpiration() {
+            return 0;
+        }
+
+        @Override
+        public String getGroupID() {
+            return null;
+        }
+
+        @Override
+        public int getGroupSequence() {
+            return 0;
+        }
+
+        @Override
+        public boolean isExpired() {
+            return false;
+        }
+
+        @Override
+        public boolean isDropped() {
+            return false;
+        }
+
+        @Override
+        public boolean isAdvisory() {
+            return false;
+        }
+    }
+}


Mime
View raw message