activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [5/6] git commit: https://issues.apache.org/jira/browse/AMQ-5266 https://issues.apache.org/jira/browse/AMQ-4485 - store has messages must be aware of pending also kahadb setBatch for async sends. additional tests and tidy up of cusror sync with store to
Date Thu, 16 Oct 2014 22:54:37 GMT
https://issues.apache.org/jira/browse/AMQ-5266 https://issues.apache.org/jira/browse/AMQ-4485 - store has messages must be aware of pending also kahadb setBatch for async sends. additional tests and tidy up of cusror sync with store to reflect async/sync additions


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9c2b1d25
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9c2b1d25
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9c2b1d25

Branch: refs/heads/trunk
Commit: 9c2b1d257288fb85138a37e30e1216251ca13eaf
Parents: 243db1c
Author: gtully <gary.tully@gmail.com>
Authored: Thu Oct 16 23:32:55 2014 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Thu Oct 16 23:35:18 2014 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  19 +-
 .../region/cursors/AbstractStoreCursor.java     |  95 +--
 .../region/cursors/QueueStorePrefetch.java      |   8 +-
 .../activemq/store/ProxyMessageStore.java       |   5 +
 .../activemq/store/kahadb/KahaDBStore.java      |   8 +
 .../activemq/store/kahadb/MessageDatabase.java  |   9 +-
 activemq-unit-tests/pom.xml                     |   1 +
 .../cursors/StoreQueueCursorOrderTest.java      | 517 +++++++++++++++
 .../bugs/AMQ5266StarvedConsumerTest.java        | 641 +++++++++++++++++++
 9 files changed, 1249 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 6df48da..21d7522 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -771,19 +771,24 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                     candidate = indexOrderedCursorUpdates.peek();
                 }
             }
-            for (MessageContext messageContext : orderedUpdates) {
-                if (!cursorAdd(messageContext.message)) {
-                    // cursor suppressed a duplicate
-                    messageContext.duplicate = true;
+            messagesLock.writeLock().lock();
+            try {
+                for (MessageContext messageContext : orderedUpdates) {
+                    if (!messages.addMessageLast(messageContext.message)) {
+                        // cursor suppressed a duplicate
+                        messageContext.duplicate = true;
+                    }
+                    if (messageContext.onCompletion != null) {
+                        messageContext.onCompletion.run();
+                    }
                 }
+            } finally {
+                messagesLock.writeLock().unlock();
             }
         } finally {
             sendLock.unlock();
         }
         for (MessageContext messageContext : orderedUpdates) {
-            if (messageContext.onCompletion != null) {
-                messageContext.onCompletion.run();
-            }
             if (!messageContext.duplicate) {
                 messageSent(messageContext.context, messageContext.message);
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 19864b7..c4bf985 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -20,6 +20,8 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.ListIterator;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
@@ -90,6 +92,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
 
     public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
         boolean recovered = false;
+        storeHasMessages = true;
         if (recordUniqueId(message.getMessageId())) {
             if (!cached) {
                 message.setRegionDestination(regionDestination);
@@ -101,12 +104,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
             batchList.addMessageLast(message);
             clearIterator(true);
             recovered = true;
-            storeHasMessages = true;
         } else if (!cached) {
             // a duplicate from the store (!cached) - needs to be removed/acked - otherwise it will get re dispatched on restart
             if (message.isRecievedByDFBridge()) {
                 // expected for messages pending acks with kahadb.concurrentStoreAndDispatchQueues=true
-                LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
+                }
             } else {
                 LOG.warn("{} - cursor got duplicate from store {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
                 duplicate(message);
@@ -201,7 +205,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         boolean disableCache = false;
         if (hasSpace()) {
             if (!isCacheEnabled() && size==0 && isStarted() && useCache) {
-                LOG.trace("{} - enabling cache for empty store {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("{} - enabling cache for empty store {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
+                }
                 setCacheEnabled(true);
             }
             if (isCacheEnabled()) {
@@ -217,64 +223,48 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         }
 
         if (disableCache && isCacheEnabled()) {
-            LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
+            }
+            syncWithStore(node.getMessage());
             setCacheEnabled(false);
-            syncWithStore();
         }
         this.storeHasMessages = true;
         size++;
         return true;
     }
 
-    private void syncWithStore() throws Exception {
+    private void syncWithStore(Message currentAdd) throws Exception {
+        pruneLastCached();
         if (lastCachedIds[SYNC_ADD] == null) {
-            // only async adds, lets wait on the potential last add and reset from there
+            // possibly only async adds, lets wait on the potential last add and reset from there
             for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) {
-                MessageId lastStored = it.previous();
-                Object futureOrLong = lastStored.getFutureOrSequenceLong();
+                MessageId lastPending = it.previous();
+                Object futureOrLong = lastPending.getFutureOrSequenceLong();
                 if (futureOrLong instanceof Future) {
                     Future future = (Future) futureOrLong;
                     if (future.isCancelled()) {
                         continue;
-                    } else {
-                        try {
-                            future.get();
-                            setLastCachedId(ASYNC_ADD, lastStored);
-                        } catch (Exception ignored) {}
                     }
+                    try {
+                        future.get(5, TimeUnit.SECONDS);
+                        setLastCachedId(ASYNC_ADD, lastPending);
+                    } catch (TimeoutException potentialDeadlock) {
+                        LOG.warn("{} timed out waiting for async add", this, potentialDeadlock);
+                    } catch (Exception cancelledOrTimeOutOrErrorWorstCaseWeReplay) {cancelledOrTimeOutOrErrorWorstCaseWeReplay.printStackTrace();}
+                } else {
+                    setLastCachedId(ASYNC_ADD, lastPending);
                 }
+                break;
             }
             if (lastCachedIds[ASYNC_ADD] != null) {
-                setBatch(lastCachedIds[ASYNC_ADD]);
-            }
-        } else {
-            // mix of async and sync - async can exceed sync only if next in sequence
-            for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) {
-                MessageId candidate = it.next();
-                final Object futureOrLong = candidate.getFutureOrSequenceLong();
-                if (futureOrLong instanceof Future) {
-                    Future future = (Future) futureOrLong;
-                    if (future.isCancelled()) {
-                        it.remove();
-                    } else {
-                        try {
-                            future.get();
-                            long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong();
-                            if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), next) == 0) {
-                                setLastCachedId(SYNC_ADD, candidate);
-                            } else {
-                                // out of sequence, revert to sync state
-                                LOG.trace("{} cursor order out of sync at seq {}, audit must suppress potential replay of {} messages from the store", this, next, pendingCachedIds.size());
-                                break;
-                            }
-                        } catch (Exception ignored) {}
-                    }
+                // ensure we don't skip current possibly sync add b/c we waited on the future
+                if (currentAdd.isRecievedByDFBridge() || Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) > 0) {
+                    setBatch(lastCachedIds[ASYNC_ADD]);
                 }
             }
-            if (lastCachedIds[SYNC_ADD] != null) {
-                setBatch(lastCachedIds[SYNC_ADD]);
-            }
-
+        } else {
+            setBatch(lastCachedIds[SYNC_ADD]);
         }
         // cleanup
         lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null;
@@ -282,7 +272,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
     private void trackLastCached(MessageReference node) {
-        if (node.getMessageId().getFutureOrSequenceLong() instanceof Future) {
+        if (node.getMessageId().getFutureOrSequenceLong() instanceof Future || node.getMessage().isRecievedByDFBridge()) {
             pruneLastCached();
             pendingCachedIds.add(node.getMessageId());
         } else {
@@ -305,6 +295,19 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
             } else {
                 // complete
                 setLastCachedId(ASYNC_ADD, candidate);
+
+                // keep lock step with sync adds while order is preserved
+                if (lastCachedIds[SYNC_ADD] != null) {
+                    long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong();
+                    if (Long.compare((Long)futureOrLong, next) == 0) {
+                        setLastCachedId(SYNC_ADD, candidate);
+                    } else {
+                        // out of sequence, revert to sync state
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("{} cursor order out of sync at seq {}, audit must suppress potential replay of {} messages from the store", this, next, pendingCachedIds.size());
+                        }
+                    }
+                }
                 it.remove();
             }
         }
@@ -374,13 +377,17 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
             this.batchResetNeeded = false;
         }
         if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) {
+            // avoid repeated  trips to the store if there is nothing of interest
+            this.storeHasMessages = false;
             try {
                 doFillBatch();
             } catch (Exception e) {
                 LOG.error("{} - Failed to fill batch", this, e);
                 throw new RuntimeException(e);
             }
-            this.storeHasMessages = !this.batchList.isEmpty() || !hadSpace;
+            if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) {
+                this.storeHasMessages = true;
+            }
         }
     }
     

http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
index 94dc817..9fb73c5 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
@@ -94,7 +94,9 @@ class QueueStorePrefetch extends AbstractStoreCursor {
     
     @Override
     protected void setBatch(MessageId messageId) throws Exception {
-        LOG.trace("{}  setBatch {} seq: {}, loc: {}", this, messageId, messageId.getFutureOrSequenceLong(), messageId.getEntryLocator());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("{}  setBatch {} seq: {}, loc: {}", this, messageId, messageId.getFutureOrSequenceLong(), messageId.getEntryLocator());
+        }
         store.setBatch(messageId);
         batchResetNeeded = false;
     }
@@ -109,4 +111,8 @@ class QueueStorePrefetch extends AbstractStoreCursor {
         }
     }
 
+    @Override
+    public String toString(){
+        return super.toString() + ",store=" + store;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
index 8c747e8..901c769 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
@@ -165,4 +165,9 @@ public class ProxyMessageStore implements MessageStore {
     public void registerIndexListener(IndexListener indexListener) {
         delegate.registerIndexListener(indexListener);
     }
+
+    @Override
+    public String toString() {
+        return delegate.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index eb5d1c4..a18071b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -665,6 +665,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long location = sd.messageIdIndex.get(tx, key);
                         if (location != null) {
+                            Long pending = sd.orderIndex.minPendingAdd();
+                            if (pending != null) {
+                                location = Math.min(location, pending-1);
+                            }
                             sd.orderIndex.setBatch(tx, location);
                         } else {
                             LOG.warn("{} {} setBatch failed, location for {} not found in messageId index for {}", this, dest.getName(), identity.getFutureOrSequenceLong(), identity);
@@ -714,6 +718,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
             this.localDestinationSemaphore.release();
         }
 
+        @Override
+        public String toString(){
+            return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
+        }
     }
 
     class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {

http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 554f1d3..4de5f16 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1767,7 +1767,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     // StoredDestination related implementation methods.
     // /////////////////////////////////////////////////////////////////
 
-    private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
+    protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
 
     static class MessageKeys {
         final String messageId;
@@ -1886,6 +1886,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         public void trackPendingAddComplete(Long seq) {
             orderIndex.trackPendingAddComplete(seq);
         }
+
+        @Override
+        public String toString() {
+            return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size();
+        }
     }
 
     protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
@@ -2337,7 +2342,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         return 0;
     }
 
-    private String key(KahaDestination destination) {
+    protected String key(KahaDestination destination) {
         return destination.getType().getNumber() + ":" + destination.getName();
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index 1333412..4735144 100755
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -1033,6 +1033,7 @@
                 <exclude>org/apache/activemq/store/kahadb/disk/index/HashIndexTest.*</exclude>
                 <exclude>org/apache/activemq/store/kahadb/disk/index/ListIndexTest.*</exclude>
                 <exclude>org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStreamTest.*</exclude>
+                <exclude>org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.*</exclude>
               </excludes>
             </configuration>
           </plugin>

http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
new file mode 100644
index 0000000..f8fab10
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
@@ -0,0 +1,517 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.usage.SystemUsage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StoreQueueCursorOrderTest {
+    private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursorOrderTest.class);
+
+    ActiveMQQueue destination = new ActiveMQQueue("queue-"
+            + StoreQueueCursorOrderTest.class.getSimpleName());
+    BrokerService brokerService;
+
+    final static String mesageIdRoot = "11111:22222:0:";
+    final int messageBytesSize = 1024;
+    final String text = new String(new byte[messageBytesSize]);
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = createBroker();
+        brokerService.setUseJmx(false);
+        brokerService.deleteAllMessages();
+        brokerService.start();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return new BrokerService();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+    }
+
+    @Test
+    public void tesBlockedFuture() throws Exception {
+        final int count = 2;
+        final Message[] messages = new Message[count];
+        final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination);
+        final ConsumerInfo consumerInfo = new ConsumerInfo();
+        final DestinationStatistics destinationStatistics = new DestinationStatistics();
+        consumerInfo.setExclusive(true);
+
+        final Queue queue = new Queue(brokerService, destination,
+                queueMessageStore, destinationStatistics, null);
+
+        queueMessageStore.start();
+        queueMessageStore.registerIndexListener(null);
+
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+        SystemUsage systemUsage = new SystemUsage();
+        // ensure memory limit is reached
+        systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
+        underTest.setSystemUsage(systemUsage);
+        underTest.setEnableAudit(false);
+        underTest.start();
+        assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+        ActiveMQTextMessage msg = getMessage(0);
+        messages[1] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.setRecievedByDFBridge(true);
+        FutureTask<Long> future = new FutureTask<Long>(new Runnable() {
+            @Override
+            public void run() {
+            }
+        }, 2l) {};
+        msg.getMessageId().setFutureOrSequenceLong(future);
+        underTest.addMessageLast(msg);
+
+        assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+        // second message will flip the cache but will be stored before the future task
+        msg = getMessage(1);
+        messages[0] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.getMessageId().setFutureOrSequenceLong(1l);
+        underTest.addMessageLast(msg);
+
+
+        assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
+        assertEquals("setBatch unset", 0l, queueMessageStore.batch.get());
+
+        int dequeueCount = 0;
+
+        underTest.setMaxBatchSize(2);
+        underTest.reset();
+        while (underTest.hasNext() && dequeueCount < count) {
+            MessageReference ref = underTest.next();
+            ref.decrementReferenceCount();
+            underTest.remove();
+            LOG.info("Received message: {} with body: {}",
+                     ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
+            assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
+        }
+        underTest.release();
+        assertEquals(count, dequeueCount);
+    }
+
+    @Test
+    public void testNoSetBatchWithUnOrderedFutureCurrentSync() throws Exception {
+        final int count = 2;
+        final Message[] messages = new Message[count];
+        final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination);
+        final ConsumerInfo consumerInfo = new ConsumerInfo();
+        final DestinationStatistics destinationStatistics = new DestinationStatistics();
+        consumerInfo.setExclusive(true);
+
+        final Queue queue = new Queue(brokerService, destination,
+                queueMessageStore, destinationStatistics, null);
+
+        queueMessageStore.start();
+        queueMessageStore.registerIndexListener(null);
+
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+        SystemUsage systemUsage = new SystemUsage();
+        // ensure memory limit is reached
+        systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
+        underTest.setSystemUsage(systemUsage);
+        underTest.setEnableAudit(false);
+        underTest.start();
+        assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+        ActiveMQTextMessage msg = getMessage(0);
+        messages[1] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.setRecievedByDFBridge(true);
+        final ActiveMQTextMessage msgRef = msg;
+        FutureTask<Long> future = new FutureTask<Long>(new Runnable() {
+            @Override
+            public void run() {
+                msgRef.getMessageId().setFutureOrSequenceLong(1l);
+            }
+        }, 1l) {};
+        msg.getMessageId().setFutureOrSequenceLong(future);
+        Executors.newSingleThreadExecutor().submit(future);
+        underTest.addMessageLast(msg);
+
+        assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+        // second message will flip the cache but will be stored before the future task
+        msg = getMessage(1);
+        messages[0] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.getMessageId().setFutureOrSequenceLong(1l);
+        underTest.addMessageLast(msg);
+
+
+        assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
+        assertEquals("setBatch unset", 0l, queueMessageStore.batch.get());
+
+        int dequeueCount = 0;
+
+        underTest.setMaxBatchSize(2);
+        underTest.reset();
+        while (underTest.hasNext() && dequeueCount < count) {
+            MessageReference ref = underTest.next();
+            ref.decrementReferenceCount();
+            underTest.remove();
+            LOG.info("Received message: {} with body: {}",
+                     ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
+            assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
+        }
+        underTest.release();
+        assertEquals(count, dequeueCount);
+    }
+
+    @Test
+    public void testSetBatchWithOrderedFutureCurrentFuture() throws Exception {
+        final int count = 2;
+        final Message[] messages = new Message[count];
+        final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination);
+        final ConsumerInfo consumerInfo = new ConsumerInfo();
+        final DestinationStatistics destinationStatistics = new DestinationStatistics();
+        consumerInfo.setExclusive(true);
+
+        final Queue queue = new Queue(brokerService, destination,
+                queueMessageStore, destinationStatistics, null);
+
+        queueMessageStore.start();
+        queueMessageStore.registerIndexListener(null);
+
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+        SystemUsage systemUsage = new SystemUsage();
+        // ensure memory limit is reached
+        systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
+        underTest.setSystemUsage(systemUsage);
+        underTest.setEnableAudit(false);
+        underTest.start();
+        assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+        ActiveMQTextMessage msg = getMessage(0);
+        messages[0] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.setRecievedByDFBridge(true);
+        final ActiveMQTextMessage msgRef = msg;
+        FutureTask<Long> future = new FutureTask<Long>(new Runnable() {
+            @Override
+            public void run() {
+                msgRef.getMessageId().setFutureOrSequenceLong(0l);
+            }
+        }, 0l) {};
+        msg.getMessageId().setFutureOrSequenceLong(future);
+        Executors.newSingleThreadExecutor().submit(future);
+        underTest.addMessageLast(msg);
+
+        assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+        // second message will flip the cache but will be stored before the future task
+        msg = getMessage(1);
+        messages[1] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.setRecievedByDFBridge(true);
+        final ActiveMQTextMessage msgRe2f = msg;
+        FutureTask<Long> future2 = new FutureTask<Long>(new Runnable() {
+            @Override
+            public void run() {
+                msgRe2f.getMessageId().setFutureOrSequenceLong(1l);
+            }
+        }, 1l) {};
+        msg.getMessageId().setFutureOrSequenceLong(future2);
+        Executors.newSingleThreadExecutor().submit(future2);
+        underTest.addMessageLast(msg);
+
+
+        assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
+        assertEquals("setBatch set", 1l, queueMessageStore.batch.get());
+
+        int dequeueCount = 0;
+
+        underTest.setMaxBatchSize(2);
+        underTest.reset();
+        while (underTest.hasNext() && dequeueCount < count) {
+            MessageReference ref = underTest.next();
+            ref.decrementReferenceCount();
+            underTest.remove();
+            LOG.info("Received message: {} with body: {}",
+                     ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
+            assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
+        }
+        underTest.release();
+        assertEquals(count, dequeueCount);
+    }
+
+    @Test
+    public void testSetBatchWithFuture() throws Exception {
+        final int count = 4;
+        final Message[] messages = new Message[count];
+        final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination);
+        final ConsumerInfo consumerInfo = new ConsumerInfo();
+        final DestinationStatistics destinationStatistics = new DestinationStatistics();
+        consumerInfo.setExclusive(true);
+
+        final Queue queue = new Queue(brokerService, destination,
+                queueMessageStore, destinationStatistics, null);
+
+        queueMessageStore.start();
+        queueMessageStore.registerIndexListener(null);
+
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+        SystemUsage systemUsage = new SystemUsage();
+        // ensure memory limit is reached
+        systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 6));
+        underTest.setSystemUsage(systemUsage);
+        underTest.setEnableAudit(false);
+        underTest.start();
+        assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+        ActiveMQTextMessage msg = getMessage(0);
+        messages[0] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.setRecievedByDFBridge(true);
+        final ActiveMQTextMessage msgRef = msg;
+        FutureTask<Long> future0 = new FutureTask<Long>(new Runnable() {
+            @Override
+            public void run() {
+                msgRef.getMessageId().setFutureOrSequenceLong(0l);
+            }
+        }, 0l) {};
+        msg.getMessageId().setFutureOrSequenceLong(future0);
+        underTest.addMessageLast(msg);
+        Executors.newSingleThreadExecutor().submit(future0);
+
+
+        msg = getMessage(1);
+        messages[3] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.setRecievedByDFBridge(true);
+        final ActiveMQTextMessage msgRef1 = msg;
+        FutureTask<Long> future1 = new FutureTask<Long>(new Runnable() {
+            @Override
+            public void run() {
+                msgRef1.getMessageId().setFutureOrSequenceLong(3l);
+            }
+        }, 3l) {};
+        msg.getMessageId().setFutureOrSequenceLong(future1);
+        underTest.addMessageLast(msg);
+
+
+        msg = getMessage(2);
+        messages[1] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.getMessageId().setFutureOrSequenceLong(1l);
+        underTest.addMessageLast(msg);
+
+        assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+        // out of order future
+        Executors.newSingleThreadExecutor().submit(future1);
+
+        // sync add to flip cache
+        msg = getMessage(3);
+        messages[2] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.getMessageId().setFutureOrSequenceLong(3l);
+        underTest.addMessageLast(msg);
+
+
+        assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
+        assertEquals("setBatch set", 2l, queueMessageStore.batch.get());
+
+        int dequeueCount = 0;
+
+        underTest.setMaxBatchSize(count);
+        underTest.reset();
+        while (underTest.hasNext() && dequeueCount < count) {
+            MessageReference ref = underTest.next();
+            ref.decrementReferenceCount();
+            underTest.remove();
+            LOG.info("Received message: {} with body: {}",
+                     ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
+            assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
+        }
+        underTest.release();
+        assertEquals(count, dequeueCount);
+    }
+
+    @Test
+    public void testSetBatch() throws Exception {
+        final int count = 3;
+        final Message[] messages = new Message[count];
+        final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination);
+        final ConsumerInfo consumerInfo = new ConsumerInfo();
+        final DestinationStatistics destinationStatistics = new DestinationStatistics();
+        consumerInfo.setExclusive(true);
+
+        final Queue queue = new Queue(brokerService, destination,
+                queueMessageStore, destinationStatistics, null);
+
+        queueMessageStore.start();
+        queueMessageStore.registerIndexListener(null);
+
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+        SystemUsage systemUsage = new SystemUsage();
+        // ensure memory limit is reached
+        systemUsage.getMemoryUsage().setLimit(messageBytesSize * 5);
+        underTest.setSystemUsage(systemUsage);
+        underTest.setEnableAudit(false);
+        underTest.start();
+        assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+
+        ActiveMQTextMessage msg = getMessage(0);
+        messages[0] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.getMessageId().setFutureOrSequenceLong(0l);
+        underTest.addMessageLast(msg);
+
+        msg = getMessage(1);
+        messages[1] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.getMessageId().setFutureOrSequenceLong(1l);
+        underTest.addMessageLast(msg);
+
+        assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+        msg = getMessage(2);
+        messages[2] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.getMessageId().setFutureOrSequenceLong(2l);
+        underTest.addMessageLast(msg);
+
+
+        assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
+        assertEquals("setBatch set", 2l, queueMessageStore.batch.get());
+
+        int dequeueCount = 0;
+
+        underTest.setMaxBatchSize(2);
+        underTest.reset();
+        while (underTest.hasNext() && dequeueCount < count) {
+            MessageReference ref = underTest.next();
+            ref.decrementReferenceCount();
+            underTest.remove();
+            LOG.info("Received message: {} with body: {}",
+                     ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
+            assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
+        }
+        underTest.release();
+        assertEquals(count, dequeueCount);
+    }
+
+    private ActiveMQTextMessage getMessage(int i) throws Exception {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        MessageId id = new MessageId(mesageIdRoot + i);
+        id.setBrokerSequenceId(i);
+        id.setProducerSequenceId(i);
+        message.setMessageId(id);
+        message.setDestination(destination);
+        message.setPersistent(true);
+        message.setResponseRequired(true);
+        message.setText("Msg:" + i + " " + text);
+        assertEquals(message.getMessageId().getProducerSequenceId(), i);
+        return message;
+    }
+
+    class TestMessageStore extends AbstractMessageStore {
+        final Message[] messages;
+        public AtomicLong batch = new AtomicLong();
+
+        public TestMessageStore(Message[] messages, ActiveMQDestination dest) {
+            super(dest);
+            this.messages = messages;
+        }
+
+        @Override
+        public void addMessage(ConnectionContext context, Message message) throws IOException {
+
+        }
+
+        @Override
+        public Message getMessage(MessageId identity) throws IOException {
+            return null;
+        }
+
+        @Override
+        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
+
+        }
+
+        @Override
+        public void removeAllMessages(ConnectionContext context) throws IOException {
+
+        }
+
+        @Override
+        public void recover(MessageRecoveryListener container) throws Exception {
+
+        }
+
+        @Override
+        public int getMessageCount() throws IOException {
+            return 0;
+        }
+
+        @Override
+        public void resetBatching() {
+
+        }
+        @Override
+        public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
+            for (int i=batch.intValue();i<messages.length;i++) {
+                LOG.info("recovered index:" + i);
+                listener.recoverMessage(messages[i]);
+            }
+        }
+
+        @Override
+        public void setBatch(MessageId message) {
+            batch.set((Long)message.getFutureOrSequenceLong());
+            batch.incrementAndGet();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
new file mode 100644
index 0000000..300bec1
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
@@ -0,0 +1,641 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+
+/*
+ * pause producers if consumers stall and verify broker drained before resume
+ */
+@RunWith(Parameterized.class)
+public class AMQ5266StarvedConsumerTest {
+    static Logger LOG = LoggerFactory.getLogger(AMQ5266StarvedConsumerTest.class);
+    String activemqURL;
+    BrokerService brokerService;
+    private EmbeddedDataSource dataSource;
+
+    public int messageSize = 1000;
+
+    @Parameterized.Parameter(0)
+    public int publisherMessagesPerThread = 1000;
+
+    @Parameterized.Parameter(1)
+    public int publisherThreadCount = 20;
+
+    @Parameterized.Parameter(2)
+    public int consumerThreadsPerQueue = 5;
+
+    @Parameterized.Parameter(3)
+    public int destMemoryLimit = 50 * 1024;
+
+    @Parameterized.Parameter(4)
+    public boolean useCache = true;
+
+    @Parameterized.Parameter(5)
+    public boolean useDefaultStore = false;
+
+    @Parameterized.Parameter(6)
+    public boolean optimizeDispatch = false;
+    private  AtomicBoolean didNotReceive = new AtomicBoolean(false);
+
+    @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
+    public static Iterable<Object[]> parameters() {
+        return Arrays.asList(new Object[][]{
+                {1000, 40,  5,   1024*1024,  false,  false, true},
+        });
+    }
+
+    public int consumerBatchSize = 5;
+
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+
+        dataSource = new EmbeddedDataSource();
+        dataSource.setDatabaseName("target/derbyDb");
+        dataSource.setCreateDatabase("create");
+
+        JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
+        jdbcPersistenceAdapter.setDataSource(dataSource);
+        jdbcPersistenceAdapter.setUseLock(false);
+
+        if (!useDefaultStore) {
+            brokerService.setPersistenceAdapter(jdbcPersistenceAdapter);
+        } else {
+            KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
+            kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
+        }
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setUseJmx(false);
+        brokerService.setAdvisorySupport(false);
+
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract!
+        defaultEntry.setMaxAuditDepth(publisherThreadCount);
+        defaultEntry.setEnableAudit(true);
+        defaultEntry.setUseCache(useCache);
+        defaultEntry.setMaxPageSize(1000);
+        defaultEntry.setOptimizedDispatch(optimizeDispatch);
+        defaultEntry.setMemoryLimit(destMemoryLimit);
+        defaultEntry.setExpireMessagesPeriod(0);
+        policyMap.setDefaultEntry(defaultEntry);
+        brokerService.setDestinationPolicy(policyMap);
+
+        brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024);
+
+        TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0");
+        brokerService.start();
+        activemqURL = transportConnector.getPublishableConnectString();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+        try {
+            dataSource.setShutdownDatabase("shutdown");
+            dataSource.getConnection();
+        } catch (Exception ignored) {}
+    }
+
+    CyclicBarrier globalProducerHalt = new CyclicBarrier(publisherThreadCount, new Runnable() {
+        @Override
+        public void run() {
+            // wait for queue size to go to zero
+            try {
+                while (((RegionBroker)brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount() > 0) {
+                    LOG.info("Total messageCount: " + ((RegionBroker)brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+                    TimeUnit.SECONDS.sleep(5);
+                }
+            } catch (Exception ignored) {
+                ignored.printStackTrace();
+            }
+        }
+    });
+
+    @Test(timeout = 30 * 60 * 1000)
+    public void test() throws Exception {
+
+        String activemqQueues = "activemq,activemq2,activemq3,activemq4";//,activemq5,activemq6,activemq7,activemq8,activemq9";
+
+        int consumerWaitForConsumption = 5 * 60 * 1000;
+
+        ExportQueuePublisher publisher = null;
+        ExportQueueConsumer consumer = null;
+
+        LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified.");
+        LOG.info("\nBuilding Publisher...");
+
+        publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount);
+
+        LOG.info("Building Consumer...");
+
+        consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount);
+
+
+        LOG.info("Starting Publisher...");
+
+        publisher.start();
+
+        LOG.info("Starting Consumer...");
+
+        consumer.start();
+
+        int distinctPublishedCount = 0;
+
+
+        LOG.info("Waiting For Publisher Completion...");
+
+        publisher.waitForCompletion();
+
+        List publishedIds = publisher.getIDs();
+        distinctPublishedCount = new TreeSet(publishedIds).size();
+
+        LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount);
+
+
+        long endWait = System.currentTimeMillis() + consumerWaitForConsumption;
+        while (!consumer.completed() && System.currentTimeMillis() < endWait) {
+            try {
+                int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
+                LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
+                if (!useDefaultStore) {
+                    DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
+                }
+                Thread.sleep(10000);
+            } catch (Exception e) {
+            }
+        }
+
+        LOG.info("\nConsumer Complete: " + consumer.completed() +", Shutting Down.");
+
+        consumer.shutdown();
+
+        TimeUnit.SECONDS.sleep(2);
+        LOG.info("DB Contents START");
+        if (!useDefaultStore) {
+            DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
+        }
+        LOG.info("DB Contents END");
+
+        LOG.info("Consumer Stats:");
+
+        for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {
+
+            List<String> idList = entry.getValue();
+
+            int distinctConsumed = new TreeSet<String>(idList).size();
+
+            StringBuilder sb = new StringBuilder();
+            sb.append("   Queue: " + entry.getKey() +
+                    " -> Total Messages Consumed: " + idList.size() +
+                    ", Distinct IDs Consumed: " + distinctConsumed);
+
+            int diff = distinctPublishedCount - distinctConsumed;
+            sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) ");
+            LOG.info(sb.toString());
+
+            assertEquals("expect to get all messages!", 0, diff);
+
+        }
+    }
+
+    public class ExportQueuePublisher {
+
+        private final String amqUser = ActiveMQConnection.DEFAULT_USER;
+        private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
+        private ActiveMQConnectionFactory connectionFactory = null;
+        private String activemqURL = null;
+        private String activemqQueues = null;
+        // Collection of distinct IDs that the publisher has published.
+        // After a message is published, its UUID will be written to this list for tracking.
+        // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs.
+        //private Set<String> ids = Collections.synchronizedSet(new TreeSet<String>());
+        private List<String> ids = Collections.synchronizedList(new ArrayList<String>());
+        private List<PublisherThread> threads;
+
+        public ExportQueuePublisher(String activemqURL, String activemqQueues, int messagesPerThread, int threadCount) throws Exception {
+
+            this.activemqURL = activemqURL;
+            this.activemqQueues = activemqQueues;
+
+            threads = new ArrayList<PublisherThread>();
+
+            // Build the threads and tell them how many messages to publish
+            for (int i = 0; i < threadCount; i++) {
+                PublisherThread pt = new PublisherThread(messagesPerThread);
+                threads.add(pt);
+            }
+        }
+
+        public List<String> getIDs() {
+            return ids;
+        }
+
+        // Kick off threads
+        public void start() throws Exception {
+
+            for (PublisherThread pt : threads) {
+                pt.start();
+            }
+        }
+
+        // Wait for threads to complete. They will complete once they've published all of their messages.
+        public void waitForCompletion() throws Exception {
+
+            for (PublisherThread pt : threads) {
+                pt.join();
+                pt.close();
+            }
+        }
+
+        private Session newSession(QueueConnection queueConnection) throws Exception {
+            return queueConnection.createSession(true, Session.SESSION_TRANSACTED);
+        }
+
+        private synchronized QueueConnection newQueueConnection() throws Exception {
+
+            if (connectionFactory == null) {
+                connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
+                connectionFactory.setWatchTopicAdvisories(false);
+            }
+
+            // Set the redelivery count to -1 (infinite), or else messages will start dropping
+            // after the queue has had a certain number of failures (default is 6)
+            RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
+            policy.setMaximumRedeliveries(-1);
+
+            QueueConnection amqConnection = connectionFactory.createQueueConnection();
+            amqConnection.start();
+            return amqConnection;
+        }
+
+        private class PublisherThread extends Thread {
+
+            private int count;
+            private QueueConnection qc;
+            private Session session;
+            private MessageProducer mp;
+            private Queue q;
+
+            private PublisherThread(int count) throws Exception {
+
+                this.count = count;
+
+                // Each Thread has its own Connection and Session, so no sync worries
+                qc = newQueueConnection();
+                session = newSession(qc);
+
+                // In our code, when publishing to multiple queues,
+                // we're using composite destinations like below
+                q = new ActiveMQQueue(activemqQueues);
+                mp = session.createProducer(null);
+            }
+
+            public void run() {
+
+                try {
+
+                    // Loop until we've published enough messages
+                    while (count-- > 0) {
+
+                        TextMessage tm = session.createTextMessage(getMessageText());
+                        String id = UUID.randomUUID().toString();
+                        tm.setStringProperty("KEY", id);
+                        ids.add(id);                            // keep track of the key to compare against consumer
+
+                        mp.send(q, tm);
+                        session.commit();
+
+                        if (didNotReceive.get()) {
+                            globalProducerHalt.await();
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+
+            // Called by waitForCompletion
+            public void close() {
+
+                try {
+                    mp.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    session.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    qc.close();
+                } catch (Exception e) {
+                }
+            }
+        }
+
+    }
+
+    String messageText;
+    private String getMessageText() {
+
+        if (messageText == null) {
+
+            synchronized (this) {
+
+                if (messageText == null) {
+
+                    StringBuilder sb = new StringBuilder();
+                    for (int i = 0; i < messageSize; i++) {
+                        sb.append("X");
+                    }
+                    messageText = sb.toString();
+                }
+            }
+        }
+
+        return messageText;
+    }
+
+
+    public class ExportQueueConsumer {
+
+        private final String amqUser = ActiveMQConnection.DEFAULT_USER;
+        private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
+        private final int totalToExpect;
+        private ActiveMQConnectionFactory connectionFactory = null;
+        private String activemqURL = null;
+        private String activemqQueues = null;
+        private String[] queues = null;
+        // Map of IDs that were consumed, keyed by queue name.
+        // We'll compare these against what was published to know if any got stuck or dropped.
+        private Map<String, List<String>> idsByQueue = new HashMap<String, List<String>>();
+        private Map<String, List<ConsumerThread>> threads;
+
+        public ExportQueueConsumer(String activemqURL, String activemqQueues, int threadsPerQueue, int batchSize, int totalToExpect) throws Exception {
+
+            this.activemqURL = activemqURL;
+            this.activemqQueues = activemqQueues;
+            this.totalToExpect = totalToExpect;
+
+            queues = this.activemqQueues.split(",");
+
+            for (int i = 0; i < queues.length; i++) {
+                queues[i] = queues[i].trim();
+            }
+
+            threads = new HashMap<String, List<ConsumerThread>>();
+
+            // For each queue, create a list of threads and set up the list of ids
+            for (String q : queues) {
+
+                List<ConsumerThread> list = new ArrayList<ConsumerThread>();
+
+                idsByQueue.put(q, Collections.synchronizedList(new ArrayList<String>()));
+
+                for (int i = 0; i < threadsPerQueue; i++) {
+                    list.add(new ConsumerThread(q, batchSize));
+                }
+
+                threads.put(q, list);
+            }
+        }
+
+        public Map<String, List<String>> getIDs() {
+            return idsByQueue;
+        }
+
+        // Start the threads
+        public void start() throws Exception {
+
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    ct.start();
+                }
+            }
+        }
+
+        // Tell the threads to stop
+        // Then wait for them to stop
+        public void shutdown() throws Exception {
+
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    ct.shutdown();
+                }
+            }
+
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    ct.join();
+                }
+            }
+        }
+
+        private Session newSession(QueueConnection queueConnection) throws Exception {
+            return queueConnection.createSession(true, Session.SESSION_TRANSACTED);
+        }
+
+        private synchronized QueueConnection newQueueConnection() throws Exception {
+
+            if (connectionFactory == null) {
+                connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
+                connectionFactory.setWatchTopicAdvisories(false);
+            }
+
+            // Set the redelivery count to -1 (infinite), or else messages will start dropping
+            // after the queue has had a certain number of failures (default is 6)
+            RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
+            policy.setMaximumRedeliveries(-1);
+
+            QueueConnection amqConnection = connectionFactory.createQueueConnection();
+            amqConnection.start();
+            return amqConnection;
+        }
+
+        public boolean completed() {
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    if (ct.isAlive()) {
+                        LOG.info("thread for {} is still alive.", ct.qName);
+                        return false;
+                    }
+                }
+            }
+            return true;
+        }
+
+        private class ConsumerThread extends Thread {
+
+            private int batchSize;
+            private QueueConnection qc;
+            private Session session;
+            private MessageConsumer mc;
+            private List<String> idList;
+            private boolean shutdown = false;
+            private String qName;
+
+            private ConsumerThread(String queueName, int batchSize) throws Exception {
+
+                this.batchSize = batchSize;
+
+                // Each thread has its own connection and session
+                qName = queueName;
+                qc = newQueueConnection();
+                session = newSession(qc);
+                Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize);
+                mc = session.createConsumer(q);
+
+                idList = idsByQueue.get(queueName);
+            }
+
+            public void run() {
+
+                try {
+
+                    int count = 0;
+
+                    // Keep reading as long as it hasn't been told to shutdown
+                    while (!shutdown) {
+
+                        if (idList.size() >= totalToExpect) {
+                            LOG.info("Got {} for q: {}", +idList.size(), qName);
+                            session.commit();
+                            break;
+                        }
+                        Message m = mc.receive(4000);
+
+                        if (m != null) {
+
+                            // We received a non-null message, add the ID to our list
+
+                            idList.add(m.getStringProperty("KEY"));
+
+                            count++;
+
+                            // If we've reached our batch size, commit the batch and reset the count
+
+                            if (count == batchSize) {
+                                session.commit();
+                                count = 0;
+                            }
+                        } else {
+
+                            // We didn't receive anything this time, commit any current batch and reset the count
+
+                            session.commit();
+                            count = 0;
+
+                            // Sleep a little before trying to read after not getting a message
+
+                            try {
+                                if (idList.size() < totalToExpect) {
+                                    LOG.info("did not receive on {}, current count: {}", qName, idList.size());
+                                    didNotReceive.set(true);
+                                }
+                                //sleep(3000);
+                            } catch (Exception e) {
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+
+                    // Once we exit, close everything
+                    close();
+                }
+            }
+
+            public void shutdown() {
+                shutdown = true;
+            }
+
+            public void close() {
+
+                try {
+                    mc.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    session.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    qc.close();
+                } catch (Exception e) {
+
+                }
+            }
+        }
+    }
+}


Mime
View raw message