activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [2/5] git commit: rework cursor store sync w.r.t to index order. resolve issues with skipped dispatch and duplicate dispatch. https://issues.apache.org/jira/browse/AMQ-4485 https://issues.apache.org/jira/browse/AMQ-5266
Date Sat, 30 Aug 2014 22:53:42 GMT
rework cursor store sync w.r.t to index order. resolve issues with skipped dispatch and duplicate dispatch. https://issues.apache.org/jira/browse/AMQ-4485 https://issues.apache.org/jira/browse/AMQ-5266


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/54e2e3be
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/54e2e3be
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/54e2e3be

Branch: refs/heads/trunk
Commit: 54e2e3bef290d7455d9d1ba3420d12dc4805b339
Parents: b2afb8c
Author: gtully <gary.tully@gmail.com>
Authored: Fri Aug 29 22:24:38 2014 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Sat Aug 30 00:51:21 2014 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    | 212 +++------
 .../activemq/broker/region/RegionBroker.java    |   1 +
 .../cursors/AbstractPendingMessageCursor.java   |   6 +-
 .../region/cursors/AbstractStoreCursor.java     | 101 ++--
 .../cursors/FilePendingMessageCursor.java       |   4 +-
 .../region/cursors/PendingMessageCursor.java    |   4 +-
 .../region/cursors/QueueStorePrefetch.java      |   1 +
 .../cursors/StoreDurableSubscriberCursor.java   |   3 +-
 .../broker/region/cursors/StoreQueueCursor.java |   6 +-
 .../region/cursors/VMPendingMessageCursor.java  |   4 +-
 .../activemq/store/AbstractMessageStore.java    |   7 +
 .../apache/activemq/store/IndexListener.java    |  47 ++
 .../org/apache/activemq/store/MessageStore.java |   2 +
 .../activemq/store/ProxyMessageStore.java       |   5 +
 .../activemq/store/ProxyTopicMessageStore.java  |   5 +
 .../store/memory/MemoryMessageStore.java        |   6 +
 .../org/apache/activemq/command/MessageId.java  |  10 +
 .../activemq/store/jdbc/JDBCMessageStore.java   |  47 +-
 .../store/jdbc/JDBCPersistenceAdapter.java      |   4 +-
 .../store/jdbc/JdbcMemoryTransactionStore.java  |   8 +-
 .../store/jdbc/adapter/DefaultJDBCAdapter.java  |  12 +-
 .../activemq/store/kahadb/KahaDBStore.java      |  82 +++-
 .../store/kahadb/KahaDBTransactionStore.java    |  15 +-
 .../activemq/store/kahadb/MessageDatabase.java  | 190 +++++---
 .../store/kahadb/disk/index/BTreeIndex.java     |   6 +-
 .../store/kahadb/disk/index/BTreeNode.java      |  19 +-
 .../store/kahadb/disk/index/BTreeIndexTest.java |  29 ++
 .../org/apache/activemq/leveldb/DBManager.scala |   6 +
 .../apache/activemq/leveldb/LevelDBStore.scala  |  35 +-
 .../StoreQueueCursorNoDuplicateTest.java        |   1 +
 .../activemq/bugs/AMQ4485LowLimitTest.java      | 462 +++++++++++++++++++
 .../org/apache/activemq/bugs/AMQ5212Test.java   |  21 +
 .../org/apache/activemq/bugs/AMQ5266Test.java   | 134 +++++-
 .../network/DemandForwardingBridgeTest.java     |   1 -
 .../activemq/usecases/MemoryLimitTest.java      |   6 +-
 35 files changed, 1152 insertions(+), 350 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index af7cfe9..d1605e2 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -30,13 +30,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -47,7 +45,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.ResourceAllocationException;
-import javax.transaction.xa.XAException;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
@@ -75,12 +72,12 @@ import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.Response;
-import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.store.IndexListener;
 import org.apache.activemq.store.ListenableFuture;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
@@ -88,7 +85,6 @@ import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transaction.Synchronization;
-import org.apache.activemq.transaction.Transaction;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
 import org.apache.activemq.util.BrokerSupport;
@@ -101,7 +97,7 @@ import org.slf4j.MDC;
  * The Queue is a List of MessageEntry objects that are dispatched to matching
  * subscriptions.
  */
-public class Queue extends BaseDestination implements Task, UsageListener {
+public class Queue extends BaseDestination implements Task, UsageListener, IndexListener {
     protected static final Logger LOG = LoggerFactory.getLogger(Queue.class);
     protected final TaskRunnerFactory taskFactory;
     protected TaskRunner taskRunner;
@@ -241,6 +237,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
         super(brokerService, store, destination, parentStats);
         this.taskFactory = taskFactory;
         this.dispatchSelector = new QueueDispatchSelector(destination);
+        if (store != null) {
+            store.registerIndexListener(this);
+        }
     }
 
     @Override
@@ -746,158 +745,81 @@ public class Queue extends BaseDestination implements Task, UsageListener {
         }
     }
 
-    final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new ConcurrentHashMap<Transaction, SendSync>();
-    private final LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>();
-
-    // roll up all message sends
-    class SendSync extends Synchronization {
-
-        class MessageContext {
-            public Message message;
-            public ConnectionContext context;
-
-            public MessageContext(ConnectionContext context, Message message) {
-                this.context = context;
-                this.message = message;
-            }
-        }
-
-        final Transaction transaction;
-        List<MessageContext> additions = new ArrayList<MessageContext>();
-
-        public SendSync(Transaction transaction) {
-            this.transaction = transaction;
-        }
-
-        public void add(ConnectionContext context, Message message) {
-            additions.add(new MessageContext(context, message));
-        }
+    private final LinkedList<MessageContext> indexOrderedCursorUpdates = new LinkedList<>();
 
-        @Override
-        public void beforeCommit() throws Exception {
-            synchronized (orderIndexUpdates) {
-                orderIndexUpdates.addLast(transaction);
-            }
+    @Override
+    public void onAdd(MessageContext messageContext) {
+        synchronized (indexOrderedCursorUpdates) {
+            indexOrderedCursorUpdates.addLast(messageContext);
         }
+    }
 
-        @Override
-        public void afterCommit() throws Exception {
-            ArrayList<SendSync> syncs = new ArrayList<SendSync>(200);
-            sendLock.lockInterruptibly();
-            try {
-                synchronized (orderIndexUpdates) {
-                    Transaction next = orderIndexUpdates.peek();
-                    while( next!=null && next.isCommitted() ) {
-                        syncs.add(sendSyncs.remove(orderIndexUpdates.removeFirst()));
-                        next = orderIndexUpdates.peek();
+    private void doPendingCursorAdditions() throws Exception {
+        LinkedList<MessageContext> orderedUpdates = new LinkedList<>();
+        sendLock.lockInterruptibly();
+        try {
+            synchronized (indexOrderedCursorUpdates) {
+                MessageContext candidate = indexOrderedCursorUpdates.peek();
+                while (candidate != null && candidate.message.getMessageId().getFutureOrSequenceLong() != null) {
+                    candidate = indexOrderedCursorUpdates.removeFirst();
+                    // check for duplicate adds suppressed by the store
+                    if (candidate.message.getMessageId().getFutureOrSequenceLong() instanceof Long && ((Long)candidate.message.getMessageId().getFutureOrSequenceLong()).compareTo(-1l) == 0) {
+                        LOG.warn("{} messageStore indicated duplicate add attempt for {}, suppressing duplicate dispatch", this, candidate.message.getMessageId());
+                    } else {
+                        orderedUpdates.add(candidate);
                     }
+                    candidate = indexOrderedCursorUpdates.peek();
                 }
-                for (SendSync sync : syncs) {
-                    sync.processSend();
-                }
-            } finally {
-                sendLock.unlock();
-            }
-            for (SendSync sync : syncs) {
-                sync.processSent();
             }
-        }
-
-        // called with sendLock
-        private void processSend() throws Exception {
-
-            for (Iterator<MessageContext> iterator = additions.iterator(); iterator.hasNext(); ) {
-                MessageContext messageContext = iterator.next();
-                // It could take while before we receive the commit
-                // op, by that time the message could have expired..
-                if (broker.isExpired(messageContext.message)) {
-                    broker.messageExpired(messageContext.context, messageContext.message, null);
-                    destinationStatistics.getExpired().increment();
-                    iterator.remove();
-                    continue;
+            for (MessageContext messageContext : orderedUpdates) {
+                if (!cursorAdd(messageContext.message)) {
+                    // cursor suppressed a duplicate
+                    messageContext.duplicate = true;
                 }
-                sendMessage(messageContext.message);
-                messageContext.message.decrementReferenceCount();
             }
+        } finally {
+            sendLock.unlock();
         }
-
-        private void processSent() throws Exception {
-            for (MessageContext messageContext : additions) {
+        for (MessageContext messageContext : orderedUpdates) {
+            if (!messageContext.duplicate) {
                 messageSent(messageContext.context, messageContext.message);
             }
-        }
-
-        @Override
-        public void afterRollback() throws Exception {
-            try {
-                for (MessageContext messageContext : additions) {
-                    messageContext.message.decrementReferenceCount();
-                }
-            } finally {
-                sendSyncs.remove(transaction);
+            if (messageContext.onCompletion != null) {
+                messageContext.onCompletion.run();
             }
         }
+        orderedUpdates.clear();
     }
 
-    class OrderedNonTransactionWorkTx extends Transaction {
-
-        @Override
-        public void commit(boolean onePhase) throws XAException, IOException {
-        }
-
-        @Override
-        public void rollback() throws XAException, IOException {
-        }
-
-        @Override
-        public int prepare() throws XAException, IOException {
-            return 0;
-        }
+    final class CursorAddSync extends Synchronization {
 
-        @Override
-        public TransactionId getTransactionId() {
-            return null;
-        }
+        private final MessageContext messageContext;
 
-        @Override
-        public Logger getLog() {
-            return null;
+        CursorAddSync(MessageContext messageContext) {
+            this.messageContext = messageContext;
         }
 
         @Override
-        public boolean isCommitted() {
-            return true;
-        }
-
-        @Override
-        public void addSynchronization(Synchronization s) {
-            try {
-                s.beforeCommit();
-            } catch (Exception e) {
-                LOG.error("Failed to add not transactional message to orderedWork", e);
+        public void afterCommit() throws Exception {
+            if (store != null && messageContext.message.isPersistent()) {
+                doPendingCursorAdditions();
+            } else {
+                cursorAdd(messageContext.message);
+                messageSent(messageContext.context, messageContext.message);
             }
+            messageContext.message.decrementReferenceCount();
         }
-    }
 
-    // called while holding the sendLock
-    private void registerSendSync(Message message, ConnectionContext context) {
-        final Transaction transaction =
-                message.isInTransaction() ? context.getTransaction()
-                        : new OrderedNonTransactionWorkTx();
-        Queue.SendSync currentSync = sendSyncs.get(transaction);
-        if (currentSync == null) {
-            currentSync = new Queue.SendSync(transaction);
-            transaction.addSynchronization(currentSync);
-            sendSyncs.put(transaction, currentSync);
+        @Override
+        public void afterRollback() throws Exception {
+            messageContext.message.decrementReferenceCount();
         }
-        currentSync.add(context, message);
     }
 
     void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
             Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
         ListenableFuture<Object> result = null;
-        boolean needsOrderingWithTransactions = context.isInTransaction();
 
         producerExchange.incrementSend();
         checkUsage(context, producerExchange, message);
@@ -922,26 +844,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
                     throw e;
                 }
             }
-            // did a transaction commit beat us to the index?
-            synchronized (orderIndexUpdates) {
-                needsOrderingWithTransactions |= !orderIndexUpdates.isEmpty();
-            }
-            if (needsOrderingWithTransactions ) {
-                // If this is a transacted message.. increase the usage now so that
-                // a big TX does not blow up
-                // our memory. This increment is decremented once the tx finishes..
-                message.incrementReferenceCount();
-
-                registerSendSync(message, context);
-            } else {
-                // Add to the pending list, this takes care of incrementing the
-                // usage manager.
-                sendMessage(message);
-            }
+            orderedCursorAdd(message, context);
         } finally {
             sendLock.unlock();
         }
-        if (!needsOrderingWithTransactions) {
+        if (store == null || (!context.isInTransaction() && !message.isPersistent())) {
             messageSent(context, message);
         }
         if (result != null && message.isResponseRequired() && !result.isCancelled()) {
@@ -954,6 +861,17 @@ public class Queue extends BaseDestination implements Task, UsageListener {
         }
     }
 
+    private void orderedCursorAdd(Message message, ConnectionContext context) throws Exception {
+        if (context.isInTransaction()) {
+            context.getTransaction().addSynchronization(new CursorAddSync(new MessageContext(context, message, null)));
+        } else if (store != null && message.isPersistent()) {
+            doPendingCursorAdditions();
+        } else {
+            // no ordering issue with non persistent messages
+            cursorAdd(message);
+        }
+    }
+
     private void checkUsage(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Message message) throws ResourceAllocationException, IOException, InterruptedException {
         if (message.isPersistent()) {
             if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
@@ -1860,10 +1778,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
         }
     }
 
-    final void sendMessage(final Message msg) throws Exception {
+    final boolean cursorAdd(final Message msg) throws Exception {
         messagesLock.writeLock().lock();
         try {
-            messages.addMessageLast(msg);
+            return messages.addMessageLast(msg);
         } finally {
             messagesLock.writeLock().unlock();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index 88d31b8..fb7d69e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -748,6 +748,7 @@ public class RegionBroker extends EmptyBroker {
                         if (deadLetterStrategy.isSendToDeadLetterQueue(message)) {
                             // message may be inflight to other subscriptions so do not modify
                             message = message.copy();
+                            message.getMessageId().setFutureOrSequenceLong(null);
                             stampAsExpired(message);
                             message.setExpiration(0);
                             if (!message.isPersistent()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
index e9c3c75..12ea104 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
@@ -82,12 +82,12 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
     public void addMessageFirst(MessageReference node) throws Exception {
     }
 
-    public void addMessageLast(MessageReference node) throws Exception {
+    public boolean addMessageLast(MessageReference node) throws Exception {
+        return true;
     }
     
     public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
-        addMessageLast(node);
-        return true;
+        return addMessageLast(node);
     }
 
     public void addRecoveredMessage(MessageReference node) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index b6f9b7e..fad666c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -18,12 +18,13 @@ package org.apache.activemq.broker.region.cursors;
 
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Future;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,8 +41,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     protected boolean batchResetNeeded = false;
     private boolean storeHasMessages = false;
     protected int size;
-    private MessageId lastCachedId;
-    private TransactionId lastTx;
+    private LinkedList<MessageId> pendingCachedIds = new LinkedList<>();
+    MessageId lastCachedId = null;
     protected boolean hadSpace = false;
 
     protected AbstractStoreCursor(Destination destination) {
@@ -84,7 +85,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     public final boolean recoverMessage(Message message) throws Exception {
         return recoverMessage(message,false);
     }
-    
+
     public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
         boolean recovered = false;
         if (recordUniqueId(message.getMessageId())) {
@@ -100,13 +101,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
             recovered = true;
             storeHasMessages = true;
         } else {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(this + " - cursor got duplicate: " + message.getMessageId() + "," + message.getPriority() + ", cached=" + cached, new Throwable("duplicate message detected"));
-            } else {
-                LOG.warn("{} - cursor got duplicate {}", regionDestination.getActiveMQDestination(), message.getMessageId());
-            }
-            if (!cached ||  message.getMessageId().getEntryLocator() != null) {
-                // came from the store or was added to the jdbc store
+            LOG.warn("{} - cursor got duplicate {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
+
+            // a duplicate from the store - needs to be removed/acked - otherwise it will get redispatched on restart
+            // jdbc store will store duplicates and will set entry locator to sequence long.
+            // REVISIT - this seems too hacky - see use case AMQ4952Test
+            if (!cached || message.getMessageId().getEntryLocator() instanceof Long) {
                 duplicate(message);
             }
         }
@@ -189,21 +189,24 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         return result;
     }
     
-    
-    public final synchronized void addMessageLast(MessageReference node) throws Exception {
+    public final synchronized boolean addMessageLast(MessageReference node) throws Exception {
         boolean disableCache = false;
         if (hasSpace()) {
             if (!isCacheEnabled() && size==0 && isStarted() && useCache) {
-                LOG.trace("{} - enabling cache for empty store {}", this, node.getMessageId());
+                LOG.trace("{} - enabling cache for empty store {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
                 setCacheEnabled(true);
             }
             if (isCacheEnabled()) {
                 if (recoverMessage(node.getMessage(),true)) {
-                    lastCachedId = node.getMessageId();
-                    lastTx = node.getMessage().getTransactionId();
+                    if (node.getMessageId().getFutureOrSequenceLong() instanceof Future) {
+                        pruneLastCached();
+                        pendingCachedIds.add(node.getMessageId());
+                    } else {
+                        setLastCachedId(node.getMessageId());
+                    }
                 } else {
                     dealWithDuplicates();
-                    return;
+                    return false;
                 }
             }
         } else {
@@ -213,16 +216,62 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         if (disableCache && isCacheEnabled()) {
             setCacheEnabled(false);
             // sync with store on disabling the cache
-            if (lastCachedId != null) {
-                LOG.debug("{} - disabling cache, lastCachedId: {} last-tx: {} current node Id: {} node-tx: {} batchList size: {}",
-                        new Object[]{ this, lastCachedId, lastTx, node.getMessageId(), node.getMessage().getTransactionId(), batchList.size() });
-                setBatch(lastCachedId);
-                lastCachedId = null;
-                lastTx = null;
+            if (!pendingCachedIds.isEmpty() || lastCachedId != null) {
+                LOG.trace("{} - disabling cache. current Id: {} seq: {}, batchList size: {}",
+                            new Object[]{this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong(), batchList.size()});
+                collapseLastCachedIds();
+                if (lastCachedId != null) {
+                    setBatch(lastCachedId);
+                    lastCachedId = null;
+                }
             }
         }
         this.storeHasMessages = true;
         size++;
+        return true;
+    }
+
+
+    private void pruneLastCached() {
+        for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) {
+            MessageId candidate = it.next();
+            final Object futureOrLong = candidate.getFutureOrSequenceLong();
+            if (futureOrLong instanceof Future) {
+                Future future = (Future) futureOrLong;
+                if (future.isCancelled()) {
+                    it.remove();
+                }
+            } else {
+                // store complete - track via lastCachedId
+                setLastCachedId(candidate);
+                it.remove();
+            }
+        }
+    }
+
+    private void collapseLastCachedIds() throws Exception {
+        for (MessageId candidate : pendingCachedIds) {
+            final Object futureOrLong = candidate.getFutureOrSequenceLong();
+            if (futureOrLong instanceof Future) {
+                Future future = (Future) futureOrLong;
+                try {
+                    future.get();
+                    // future should be replaced with sequence by this time
+                } catch (CancellationException ignored) {
+                    continue;
+                }
+            }
+            setLastCachedId(candidate);
+        }
+        pendingCachedIds.clear();
+    }
+
+    private void setLastCachedId(MessageId candidate) {
+        if (lastCachedId == null) {
+            lastCachedId = candidate;
+        } else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedId.getFutureOrSequenceLong())) > 0) {
+            lastCachedId = candidate;
+        }
     }
 
     protected void setBatch(MessageId messageId) throws Exception {
@@ -260,8 +309,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     
     
     public synchronized void gc() {
-        for (Iterator<MessageReference>i = batchList.iterator();i.hasNext();) {
-            MessageReference msg = i.next();
+        for (MessageReference msg : batchList) {
             rollback(msg.getMessageId());
             msg.decrementReferenceCount();
         }
@@ -272,7 +320,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
     protected final synchronized void fillBatch() {
-        //LOG.trace("{} - fillBatch", this);
         if (batchResetNeeded) {
             resetSize();
             setMaxBatchSize(Math.min(regionDestination.getMaxPageSize(), size));
@@ -313,7 +360,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     public String toString() {
         return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
                     + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
-                    + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace();
+                    + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() + ",lastCachedId:" + lastCachedId;
     }
     
     protected abstract void doFillBatch() throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index 2769e68..7512e39 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -203,8 +203,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
      * @throws Exception 
      */
     @Override
-    public synchronized void addMessageLast(MessageReference node) throws Exception {
-        tryAddMessageLast(node, 0);
+    public synchronized boolean addMessageLast(MessageReference node) throws Exception {
+        return tryAddMessageLast(node, 0);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
index 1fecb95..06d59f1 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
@@ -81,10 +81,12 @@ public interface PendingMessageCursor extends Service {
      * add message to await dispatch
      * 
      * @param node
+     * @return boolean true if successful, false if cursor traps a duplicate
      * @throws IOException
      * @throws Exception
      */
-    void addMessageLast(MessageReference node) throws Exception;
+    boolean addMessageLast(MessageReference node) throws Exception;
+
     /**
      * add message to await dispatch - if it can
      * 

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
index c89b648..1f42d57 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
@@ -94,6 +94,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
     
     @Override
     protected void setBatch(MessageId messageId) throws Exception {
+        LOG.trace("{}  setBatch {} loc: {}", this, messageId, messageId.getEntryLocator());
         store.setBatch(messageId);
         batchResetNeeded = false;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
index 6d03eeb..1820e7b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
@@ -183,7 +183,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
     }
 
     @Override
-    public synchronized void addMessageLast(MessageReference node) throws Exception {
+    public synchronized boolean addMessageLast(MessageReference node) throws Exception {
         if (node != null) {
             Message msg = node.getMessage();
             if (isStarted()) {
@@ -206,6 +206,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
             }
 
         }
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
index b3f4261..5b072a6 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
@@ -87,7 +87,8 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         pendingCount = 0;
     }
 
-    public synchronized void addMessageLast(MessageReference node) throws Exception {
+    public synchronized boolean addMessageLast(MessageReference node) throws Exception {
+        boolean result = true;
         if (node != null) {
             Message msg = node.getMessage();
             if (started) {
@@ -97,9 +98,10 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
                 }
             }
             if (msg.isPersistent()) {
-                persistent.addMessageLast(node);
+                result = persistent.addMessageLast(node);
             }
         }
+        return result;
     }
 
     public synchronized void addMessageFirst(MessageReference node) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
index 9518981..15c61df 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
@@ -97,15 +97,15 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
      * @param node
      */
     
-    public synchronized void addMessageLast(MessageReference node) {
+    public synchronized boolean addMessageLast(MessageReference node) {
         node.incrementReferenceCount();
         list.addMessageLast(node);
+        return true;
     }
 
     /**
      * add message to await dispatch
      * 
-     * @param position
      * @param node
      */
     

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
index df8658f..43713e6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
@@ -29,6 +29,7 @@ abstract public class AbstractMessageStore implements MessageStore {
     public static final ListenableFuture<Object> FUTURE;
     protected final ActiveMQDestination destination;
     protected boolean prioritizedMessages;
+    protected IndexListener indexListener;
 
     public AbstractMessageStore(ActiveMQDestination destination) {
         this.destination = destination;
@@ -114,10 +115,16 @@ abstract public class AbstractMessageStore implements MessageStore {
         removeMessage(context, ack);
     }
 
+    @Override
     public void updateMessage(Message message) throws IOException {
         throw new IOException("update is not supported by: " + this);
     }
 
+    @Override
+    public void registerIndexListener(IndexListener indexListener) {
+        this.indexListener = indexListener;
+    }
+
     static {
        FUTURE = new InlineListenableFuture();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java b/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java
new file mode 100644
index 0000000..66902dc
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.Message;
+
+/**
+ * callback when the index is updated, allows ordered work to be seen by destinations
+ */
+public interface IndexListener {
+
+    final class MessageContext {
+        public Message message;
+        public ConnectionContext context;
+        public Runnable onCompletion;
+        public boolean duplicate;
+
+        public MessageContext(ConnectionContext context, Message message, Runnable onCompletion) {
+            this.context = context;
+            this.message = message;
+            this.onCompletion = onCompletion;
+        }
+    }
+
+    /**
+     *  called with some global index lock held so that a listener can do order
+     *  dependent work
+     *  non null MessageContext.onCompletion called when work is done
+     */
+    public void onAdd(MessageContext messageContext);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
index 400245a..4cc472e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
@@ -195,4 +195,6 @@ public interface MessageStore extends Service {
     public boolean isPrioritizedMessages();
 
     void updateMessage(Message message) throws IOException;
+
+    void registerIndexListener(IndexListener indexListener);
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
index 3ddfadb..8c747e8 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
@@ -160,4 +160,9 @@ public class ProxyMessageStore implements MessageStore {
     public void updateMessage(Message message) throws IOException {
         delegate.updateMessage(message);
     }
+
+    @Override
+    public void registerIndexListener(IndexListener indexListener) {
+        delegate.registerIndexListener(indexListener);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
index de4d195..0f47f61 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
@@ -208,4 +208,9 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
     public void updateMessage(Message message) throws IOException {
         delegate.updateMessage(message);
     }
+
+    @Override
+    public void registerIndexListener(IndexListener indexListener) {
+        delegate.registerIndexListener(indexListener);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index 836b388..7cdaa78 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -28,6 +28,7 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.IndexListener;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.AbstractMessageStore;
 
@@ -41,6 +42,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
 
     protected final Map<MessageId, Message> messageTable;
     protected MessageId lastBatchId;
+    protected long sequenceId;
 
     public MemoryMessageStore(ActiveMQDestination destination) {
         this(destination, new LinkedHashMap<MessageId, Message>());
@@ -56,6 +58,10 @@ public class MemoryMessageStore extends AbstractMessageStore {
             messageTable.put(message.getMessageId(), message);
         }
         message.incrementReferenceCount();
+        message.getMessageId().setFutureOrSequenceLong(sequenceId++);
+        if (indexListener != null) {
+            indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
+        }
     }
 
     // public void addMessageReference(ConnectionContext context,MessageId

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java b/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
index 5c3069f..de8cc12 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
@@ -37,6 +37,7 @@ public class MessageId implements DataStructure, Comparable<MessageId> {
     private transient AtomicReference<Object> dataLocator = new AtomicReference<Object>();
     private transient Object entryLocator;
     private transient Object plistLocator;
+    private transient Object futureOrSequenceLong;
 
     public MessageId() {
         this.producerId = new ProducerId();
@@ -186,6 +187,7 @@ public class MessageId implements DataStructure, Comparable<MessageId> {
         copy.brokerSequenceId = brokerSequenceId;
         copy.dataLocator = dataLocator;
         copy.entryLocator = entryLocator;
+        copy.futureOrSequenceLong = futureOrSequenceLong;
         copy.plistLocator = plistLocator;
         copy.textView = textView;
         return copy;
@@ -219,6 +221,14 @@ public class MessageId implements DataStructure, Comparable<MessageId> {
         this.dataLocator.set(value);
     }
 
+    public Object getFutureOrSequenceLong() {
+        return futureOrSequenceLong;
+    }
+
+    public void setFutureOrSequenceLong(Object futureOrSequenceLong) {
+        this.futureOrSequenceLong = futureOrSequenceLong;
+    }
+
     public Object getEntryLocator() {
         return entryLocator;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index c3d5594..9f53cc1 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -32,6 +32,7 @@ import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.IndexListener;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.ByteSequenceData;
@@ -101,7 +102,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
         }
     }
 
-    public void addMessage(ConnectionContext context, Message message) throws IOException {
+    public void addMessage(final ConnectionContext context, final Message message) throws IOException {
         MessageId messageId = message.getMessageId();
         if (audit != null && audit.isDuplicate(message)) {
             if (LOG.isDebugEnabled()) {
@@ -126,8 +127,26 @@ public class JDBCMessageStore extends AbstractMessageStore {
         long sequenceId;
         synchronized (pendingAdditions) {
             sequenceId = persistenceAdapter.getNextSequenceId();
-            if (message.isInTransaction()) {
-                trackPendingSequence(c, sequenceId);
+            final long sequence = sequenceId;
+            pendingAdditions.add(sequence);
+            c.onCompletion(new Runnable() {
+                public void run() {
+                    // message added to db
+                    message.getMessageId().setFutureOrSequenceLong(sequence);
+                    message.getMessageId().setEntryLocator(sequence);
+                }
+            });
+
+            if (indexListener != null) {
+                indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
+                    @Override
+                    public void run() {
+                        // cursor add complete
+                        synchronized (pendingAdditions) { pendingAdditions.remove(sequence); }
+                    }
+                }));
+            } else {
+                pendingAdditions.remove(sequence);
             }
         }
         try {
@@ -139,20 +158,10 @@ public class JDBCMessageStore extends AbstractMessageStore {
         } finally {
             c.close();
         }
-        message.getMessageId().setEntryLocator(sequenceId);
         onAdd(message, sequenceId, message.getPriority());
     }
 
     // jdbc commit order is random with concurrent connections - limit scan to lowest pending
-    private void trackPendingSequence(final TransactionContext transactionContext, final long sequenceId) {
-        synchronized (pendingAdditions) { pendingAdditions.add(sequenceId); }
-        transactionContext.onCompletion(new Runnable() {
-            public void run() {
-                synchronized (pendingAdditions) { pendingAdditions.remove(sequenceId); }
-            }
-        });
-    }
-
     private long minPendingSequeunceId() {
         synchronized (pendingAdditions) {
             if (!pendingAdditions.isEmpty()) {
@@ -237,8 +246,8 @@ public class JDBCMessageStore extends AbstractMessageStore {
 
     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
 
-    	long seq = ack.getLastMessageId().getEntryLocator() != null ?
-                (Long) ack.getLastMessageId().getEntryLocator() :
+    	long seq = ack.getLastMessageId().getFutureOrSequenceLong() != null ?
+                (Long) ack.getLastMessageId().getFutureOrSequenceLong() :
                 persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0];
 
         // Get a connection and remove the message from the DB
@@ -251,9 +260,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
         } finally {
             c.close();
         }
-        if (context != null && context.getXid() != null) {
-            ack.getLastMessageId().setEntryLocator(seq);
-        }
+        //if (context != null && context.getXid() != null) {
+        //    ack.getLastMessageId().setEntryLocator(seq);
+        //}
     }
 
     public void recover(final MessageRecoveryListener listener) throws Exception {
@@ -341,7 +350,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
                         Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                         msg.getMessageId().setBrokerSequenceId(sequenceId);
-                        msg.getMessageId().setEntryLocator(sequenceId);
+                        msg.getMessageId().setFutureOrSequenceLong(sequenceId);
                         listener.recoverMessage(msg);
                         lastRecoveredSequenceId.set(sequenceId);
                         lastRecoveredPriority.set(msg.getPriority());

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index b4fb5d5..0a6dde8 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -773,7 +773,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
     public void commitAdd(ConnectionContext context, MessageId messageId) throws IOException {
         TransactionContext c = getTransactionContext(context);
         try {
-            long sequence = (Long)messageId.getEntryLocator();
+            long sequence = (Long)messageId.getFutureOrSequenceLong();
             getAdapter().doCommitAddOp(c, sequence);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
@@ -786,7 +786,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
     public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException {
         TransactionContext c = getTransactionContext(context);
         try {
-            getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getEntryLocator(), null);
+            getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getFutureOrSequenceLong(), null);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to commit last ack: " + ack + ". Reason: " + e,e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
index 4128eef..b2fedf7 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
@@ -108,7 +108,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
                     jdbcPersistenceAdapter.commitAdd(context, message.getMessageId());
                     ((JDBCMessageStore)addMessageCommand.getMessageStore()).onAdd(
                             message,
-                            (Long)message.getMessageId().getEntryLocator(),
+                            (Long)message.getMessageId().getFutureOrSequenceLong(),
                             message.getPriority());
 
                 }
@@ -170,7 +170,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
 
     public void recoverAdd(long id, byte[] messageBytes) throws IOException {
         final Message message = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(messageBytes));
-        message.getMessageId().setEntryLocator(id);
+        message.getMessageId().setFutureOrSequenceLong(id);
         Tx tx = getPreparedTx(message.getTransactionId());
         tx.add(new AddMessageCommand() {
             MessageStore messageStore;
@@ -187,7 +187,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
             @Override
             public void run(ConnectionContext context) throws IOException {
                 ((JDBCPersistenceAdapter)persistenceAdapter).commitAdd(null, message.getMessageId());
-                ((JDBCMessageStore)messageStore).onAdd(message, ((Long)message.getMessageId().getEntryLocator()).longValue(), message.getPriority());
+                ((JDBCMessageStore)messageStore).onAdd(message, ((Long)message.getMessageId().getFutureOrSequenceLong()).longValue(), message.getPriority());
             }
 
             @Override
@@ -200,7 +200,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
 
     public void recoverAck(long id, byte[] xid, byte[] message) throws IOException {
         Message msg = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(message));
-        msg.getMessageId().setEntryLocator(id);
+        msg.getMessageId().setFutureOrSequenceLong(id);
         Tx tx = getPreparedTx(new XATransactionId(xid));
         final MessageAck ack = new MessageAck(msg, MessageAck.STANDARD_ACK_TYPE, 1);
         tx.add(new RemoveMessageCommand() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
index 0087ac9..970e0f8 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
@@ -396,7 +396,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             if (this.batchStatements) {
                 s.addBatch();
             } else if (s.executeUpdate() != 1) {
-                throw new SQLException("Failed to remove message");
+                throw new SQLException("Failed to remove message seq: " + seq);
             }
         } finally {
             cleanupExclusiveLock.readLock().unlock();
@@ -935,7 +935,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
         this.batchStatements = batchStatements;
         // The next lines are deprecated and should be removed in a future release
         // and is here in case someone created their own
-        this.batchStatments = batchStatements;
+       // this.batchStatments = batchStatements;
     }
 
     // Note - remove batchStatment in future distributions.  Here for backward compatibility
@@ -1168,8 +1168,12 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
       printQuery(s,System.out); }
 
     public static void dumpTables(java.sql.Connection c) throws SQLException {
-        printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out);
-        printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
+        printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_MSGS", System.out);
+
+        //printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_ACKS", System.out);
+
+        //printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out);
+        //printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
     }
 
     public static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 975cd05..54cfd7d 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -29,6 +29,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -57,6 +59,7 @@ import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.IndexListener;
 import org.apache.activemq.store.ListenableFuture;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
@@ -83,7 +86,7 @@ import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
+public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, MessageDatabase.SerialExecution<Location> {
     static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
     private static final int MAX_ASYNC_JOBS = 10000;
 
@@ -121,6 +124,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                 return txid;
             }
         };
+        serialExecutor = this;
     }
 
     @Override
@@ -207,7 +211,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
             // In case the recovered store used a different OpenWire version log a warning
             // to assist in determining why journal reads fail.
             if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) {
-                LOG.warn("Receovered Store uses a different OpenWire version[{}] " +
+                LOG.warn("Recovered Store uses a different OpenWire version[{}] " +
                          "than the version configured[{}].",
                          metadata.openwireVersion, brokerService.getStoreOpenWireVersion());
             }
@@ -286,21 +290,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
         super.doStop(stopper);
     }
 
-    @Override
-    void rollbackStatsOnDuplicate(KahaDestination commandDestination) {
-        if (brokerService != null) {
-            RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
-            if (regionBroker != null) {
-                ActiveMQDestination activeMQDestination = convert(commandDestination);
-                Destination destination = regionBroker.getDestinationMap(activeMQDestination).get(activeMQDestination);
-                if (destination != null) {
-                    destination.getDestinationStatistics().getMessages().decrement();
-                    destination.getDestinationStatistics().getEnqueues().decrement();
-                }
-            }
-        }
-    }
-
     private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
         return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
             @Override
@@ -358,6 +347,17 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
         this.forceRecoverIndex = forceRecoverIndex;
     }
 
+    @Override
+    public Location execute(Callable<Location> c) throws Exception {
+        if (isConcurrentStoreAndDispatchQueues()) {
+            FutureTask<Location> future = new FutureTask<>(c);
+            this.queueExecutor.execute(future);
+            return future.get();
+        } else {
+            return c.call();
+        }
+    }
+
     public class KahaDBMessageStore extends AbstractMessageStore {
         protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
         protected KahaDestination dest;
@@ -385,7 +385,25 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                 StoreQueueTask result = new StoreQueueTask(this, context, message);
                 result.aquireLocks();
                 addQueueTask(this, result);
-                return result.getFuture();
+                final ListenableFuture<Object> future = result.getFuture();
+                if (indexListener != null) {
+                    // allow concurrent dispatch by setting entry locator,
+                    // wait for add completion to remove potential pending addition
+                    message.getMessageId().setFutureOrSequenceLong(future);
+                    indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                future.get();
+                                trackPendingAddComplete(dest, (Long) message.getMessageId().getFutureOrSequenceLong());
+                            } catch (CancellationException okNothingToTrack) {
+                            } catch (Exception e) {
+                                LOG.warn("{} unexpected exception tracking completion of async add of {}", this, message.getMessageId(), e);
+                            }
+                        }
+                    }));
+                }
+                return future;
             } else {
                 return super.asyncAddQueueMessage(context, message);
             }
@@ -423,7 +441,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
         }
 
         @Override
-        public void addMessage(ConnectionContext context, Message message) throws IOException {
+        public void addMessage(final ConnectionContext context, final Message message) throws IOException {
             KahaAddMessageCommand command = new KahaAddMessageCommand();
             command.setDestination(dest);
             command.setMessageId(message.getMessageId().toProducerKey());
@@ -432,8 +450,25 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
             command.setPrioritySupported(isPrioritizedMessages());
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
             command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
-            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
-
+            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() {
+                @Override
+                public void sequenceAssignedWithIndexLocked(final long sequence) {
+                    final Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
+                    message.getMessageId().setFutureOrSequenceLong(sequence);
+                    if (indexListener != null) {
+                        trackPendingAdd(dest, sequence);
+                        if (possibleFuture == null) {
+                            // sync add (for async future present from getFutureOrSequenceLong)
+                            indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
+                                @Override
+                                public void run() {
+                                    trackPendingAddComplete(dest, sequence);
+                                }
+                            }));
+                        }
+                    }
+                }
+            }, null);
         }
 
         @Override
@@ -582,6 +617,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                                 continue;
                             }
                             Message msg = loadMessage(entry.getValue().location);
+                            msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
                             listener.recoverMessage(msg);
                             counter++;
                             if (counter >= maxReturned) {
@@ -643,7 +679,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
         }
 
         @Override
-        public void setBatch(MessageId identity) throws IOException {
+        public void setBatch(final MessageId identity) throws IOException {
             try {
                 final String key = identity.toProducerKey();
                 lockAsyncJobQueue();
@@ -660,6 +696,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                             Long location = sd.messageIdIndex.get(tx, key);
                             if (location != null) {
                                 sd.orderIndex.setBatch(tx, location);
+                            } else {
+                                LOG.warn("{} Location {} not found in order index for {}", this, identity.getFutureOrSequenceLong(), identity);
                             }
                         }
                     });

http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
index 47a9c34..7a79ddd 100755
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
@@ -42,9 +42,7 @@ import org.apache.activemq.store.ProxyTopicMessageStore;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.store.kahadb.MessageDatabase.AddOpperation;
 import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
-import org.apache.activemq.store.kahadb.MessageDatabase.RemoveOpperation;
 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
@@ -254,7 +252,7 @@ public class KahaDBTransactionStore implements TransactionStore {
         return tx;
     }
 
-    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
+    public void commit(TransactionId txid, boolean wasPrepared, final Runnable preCommit, Runnable postCommit)
             throws IOException {
         if (txid != null) {
             if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) {
@@ -294,7 +292,10 @@ public class KahaDBTransactionStore implements TransactionStore {
 
             } else {
                 KahaTransactionInfo info = getTransactionInfo(txid);
-                theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), preCommit, postCommit);
+                if (preCommit != null) {
+                    preCommit.run();
+                }
+                theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit);
                 forgetRecoveredAcks(txid, false);
             }
         }else {
@@ -336,13 +337,13 @@ public class KahaDBTransactionStore implements TransactionStore {
             ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
 
             for (Operation op : entry.getValue()) {
-                if (op.getClass() == AddOpperation.class) {
-                    AddOpperation addOp = (AddOpperation) op;
+                if (op.getClass() == MessageDatabase.AddOperation.class) {
+                    MessageDatabase.AddOperation addOp = (MessageDatabase.AddOperation) op;
                     Message msg = (Message) wireFormat().unmarshal(new DataInputStream(addOp.getCommand().getMessage()
                             .newInput()));
                     messageList.add(msg);
                 } else {
-                    RemoveOpperation rmOp = (RemoveOpperation) op;
+                    MessageDatabase.RemoveOperation rmOp = (MessageDatabase.RemoveOperation) op;
                     Buffer ackb = rmOp.getCommand().getAck();
                     MessageAck ack = (MessageAck) wireFormat().unmarshal(new DataInputStream(ackb.newInput()));
                     ackList.add(ack);


Mime
View raw message