activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: rework https://issues.apache.org/jira/browse/AMQ-3305 in the context of https://issues.apache.org/jira/browse/AMQ-4952 dlq processing of duplicates, also possible missed dispatch with interleaved xa completion and new messages - recovered tra
Date Wed, 05 Mar 2014 14:51:26 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk a7ff5975e -> cfe099d1c


rework https://issues.apache.org/jira/browse/AMQ-3305 in the context of https://issues.apache.org/jira/browse/AMQ-4952
dlq processing of duplicates, also possible missed dispatch with interleaved xa completion
and new messages - recovered transactions are now individually tracked by the store and replayed
in the next batch, which negates the need to flush the cursor and avoids duplicates


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cfe099d1
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cfe099d1
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cfe099d1

Branch: refs/heads/trunk
Commit: cfe099d1cccf6655894eceaa29ce6a54ceaf21be
Parents: a7ff597
Author: gtully <gary.tully@gmail.com>
Authored: Wed Mar 5 14:51:05 2014 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Mar 5 14:51:05 2014 +0000

----------------------------------------------------------------------
 .../activemq/broker/region/BaseDestination.java |  17 ---
 .../apache/activemq/broker/region/Queue.java    |  10 +-
 .../cursors/AbstractPendingMessageCursor.java   |   3 +
 .../region/cursors/AbstractStoreCursor.java     |   9 +-
 .../region/cursors/PendingMessageCursor.java    |   4 +-
 .../broker/region/cursors/StoreQueueCursor.java |   7 ++
 .../store/memory/MemoryTransactionStore.java    |  20 ++-
 .../activemq/state/ConnectionStateTracker.java  |   2 +-
 .../activemq/store/jdbc/JDBCMessageStore.java   |  20 ++-
 .../store/jdbc/JdbcMemoryTransactionStore.java  |  25 +++-
 .../activemq/store/kahadb/KahaDBStore.java      |  31 ++++-
 .../store/kahadb/KahaDBTransactionStore.java    |   8 +-
 .../activemq/store/kahadb/MessageDatabase.java  |  15 ++-
 .../activemq/broker/XARecoveryBrokerTest.java   | 123 ++++++++++++++++++-
 .../bugs/TrapMessageInJDBCStoreTest.java        |   2 +-
 .../failover/FailoverTransactionTest.java       |   2 +-
 16 files changed, 258 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 7bbe360..3f925b4 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -794,23 +794,6 @@ public abstract class BaseDestination implements Destination {
 
     public void duplicateFromStore(Message message, Subscription durableSub) {
         ConnectionContext connectionContext = createConnectionContext();
-
-        TransactionId transactionId = message.getTransactionId();
-        if (transactionId != null && transactionId.isXATransaction()) {
-            try {
-                List<TransactionId> preparedTx = Arrays.asList(broker.getRoot().getPreparedTransactions(connectionContext));
-                getLog().trace("processing duplicate in {}, prepared {} ", transactionId,
preparedTx);
-                if (!preparedTx.contains(transactionId)) {
-                    // duplicates from past transactions expected after org.apache.activemq.broker.region.Destination#clearPendingMessages
-                    // till they are acked
-                    getLog().debug("duplicate message from store {}, from historical transaction
{}, ignoring", message.getMessageId(), transactionId);
-                    return;
-                }
-            } catch (Exception ignored) {
-                getLog().debug("failed to determine state of transaction {} on duplicate
message {}", transactionId, message.getMessageId(), ignored);
-            }
-        }
-
         getLog().warn("duplicate message from store {}, redirecting for dlq processing",
message.getMessageId());
         Throwable cause = new Throwable("duplicate from store for " + destination);
         message.setRegionDestination(this);

http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 0ae4463..a8a864e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1321,11 +1321,13 @@ public class Queue extends BaseDestination implements Task, UsageListener
{
     public void clearPendingMessages() {
         messagesLock.writeLock().lock();
         try {
-            if (store != null) {
-                store.resetBatching();
+            if (resetNeeded) {
+                messages.gc();
+                messages.reset();
+                resetNeeded = false;
+            } else {
+                messages.rebase();
             }
-            messages.gc();
-            messages.reset();
             asyncWakeup();
         } finally {
             messagesLock.writeLock().unlock();

http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
index b962a1a..e9c3c75 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
@@ -335,4 +335,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
     public synchronized void setCacheEnabled(boolean val) {
         cacheEnabled = val;
     }
+
+    public void rebase() {
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index bb77afb..d0b1a39 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -69,6 +69,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
         this.storeHasMessages=this.size > 0;
     }
 
+    @Override
+    public void rebase() {
+        resetSize();
+    }
+
     public final synchronized void stop() throws Exception {
         resetBatch();
         super.stop();
@@ -289,7 +294,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
         return !batchList.isEmpty();
     }
 
-    
+
     public final synchronized int size() {
         if (size < 0) {
             this.size = getStoreSize();
@@ -307,7 +312,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
     protected abstract void doFillBatch() throws Exception;
     
     protected abstract void resetBatch();
-    
+
     protected abstract int getStoreSize();
     
     protected abstract boolean isStoreEmpty();

http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
index 52e5174..1fecb95 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
@@ -299,5 +299,7 @@ public interface PendingMessageCursor extends Service {
      * @return true if cache is being used
      */
     public boolean isCacheEnabled();
-   
+
+    public void rebase();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
index 24f31c7..b3f4261 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
@@ -309,4 +309,11 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         }
         return cacheEnabled;
     }
+
+    @Override
+    public void rebase() {
+        persistent.rebase();
+        reset();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
index a5de7fa..7e02694 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
@@ -33,7 +33,10 @@ import org.apache.activemq.store.TransactionStore;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 
@@ -46,7 +49,7 @@ import java.util.concurrent.Future;
 public class MemoryTransactionStore implements TransactionStore {
 
     protected ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object,
Tx>();
-    protected ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId,
Tx>();
+    protected Map<TransactionId, Tx> preparedTransactions = Collections.synchronizedMap(new
LinkedHashMap<TransactionId, Tx>());
     protected final PersistenceAdapter persistenceAdapter;
 
     private boolean doingRecover;
@@ -117,6 +120,8 @@ public class MemoryTransactionStore implements TransactionStore {
         MessageStore getMessageStore();
 
         void run(ConnectionContext context) throws IOException;
+
+        void setMessageStore(MessageStore messageStore);
     }
 
     public interface RemoveMessageCommand {
@@ -132,7 +137,7 @@ public class MemoryTransactionStore implements TransactionStore {
     }
 
     public MessageStore proxy(MessageStore messageStore) {
-        return new ProxyMessageStore(messageStore) {
+        ProxyMessageStore proxyMessageStore = new ProxyMessageStore(messageStore) {
             @Override
             public void addMessage(ConnectionContext context, final Message send) throws
IOException {
                 MemoryTransactionStore.this.addMessage(getDelegate(), send);
@@ -165,6 +170,11 @@ public class MemoryTransactionStore implements TransactionStore {
                 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
             }
         };
+        onProxyQueueStore(proxyMessageStore);
+        return proxyMessageStore;
+    }
+
+    protected void onProxyQueueStore(ProxyMessageStore proxyMessageStore) {
     }
 
     public TopicMessageStore proxy(TopicMessageStore messageStore) {
@@ -309,6 +319,7 @@ public class MemoryTransactionStore implements TransactionStore {
         if (message.getTransactionId() != null) {
             Tx tx = getTx(message.getTransactionId());
             tx.add(new AddMessageCommand() {
+                MessageStore messageStore = destination;
                 public Message getMessage() {
                     return message;
                 }
@@ -322,6 +333,11 @@ public class MemoryTransactionStore implements TransactionStore {
                     destination.addMessage(ctx, message);
                 }
 
+                @Override
+                public void setMessageStore(MessageStore messageStore) {
+                    this.messageStore = messageStore;
+                }
+
             });
         } else {
             destination.addMessage(null, message);

http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
index 41b4577..effbc83 100755
--- a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
+++ b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
@@ -301,7 +301,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
                 }
             }
             if (LOG.isDebugEnabled()) {
-                LOG.debug("restore consumer: " + infoToSend.getConsumerId());
+                LOG.debug("consumer: " + infoToSend.getConsumerId());
             }
             transport.oneway(infoToSend);
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index c4c3406..0ee9823 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -18,6 +18,9 @@ package org.apache.activemq.store.jdbc;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.ActiveMQMessageAudit;
@@ -65,7 +68,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
     protected final JDBCPersistenceAdapter persistenceAdapter;
     protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
     protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
-
+    final Set<Long> recoveredAdditions = new LinkedHashSet<Long>();
     protected ActiveMQMessageAudit audit;
     
     public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter,
WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws
IOException {
@@ -136,6 +139,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
     }
 
     protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
+        if (lastRecoveredSequenceId.get() > 0 && sequenceId < lastRecoveredSequenceId.get())
{
+            recoveredAdditions.add(sequenceId);
+        }
     }
 
     public void addMessageReference(ConnectionContext context, MessageId messageId, long
expirationTime, String messageRef) throws IOException {
@@ -275,6 +281,18 @@ public class JDBCMessageStore extends AbstractMessageStore {
     public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener)
throws Exception {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
+            if (!recoveredAdditions.isEmpty()) {
+                for (Iterator<Long> iterator = recoveredAdditions.iterator(); iterator.hasNext();
)  {
+                    Long sequenceId = iterator.next();
+                    iterator.remove();
+                    maxReturned--;
+                    if (sequenceId <= lastRecoveredSequenceId.get()) {
+                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(adapter.doGetMessageById(c,
sequenceId)));
+                        LOG.trace("recovered add {} {}", this, msg.getMessageId());
+                        listener.recoverMessage(msg);
+                    }
+                }
+            }
             adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(),
lastRecoveredPriority.get(),
                     maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener()
{
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
index 520ff16..135528e 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
@@ -28,6 +28,7 @@ import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.ProxyMessageStore;
 import org.apache.activemq.store.ProxyTopicMessageStore;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionRecoveryListener;
@@ -48,6 +49,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
 
 
     private HashMap<ActiveMQDestination, MessageStore> topicStores = new HashMap<ActiveMQDestination,
MessageStore>();
+    private HashMap<ActiveMQDestination, MessageStore> queueStores = new HashMap<ActiveMQDestination,
MessageStore>();
 
     public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) {
         super(jdbcPersistenceAdapter);
@@ -110,6 +112,11 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore
{
                             message.getPriority());
 
                 }
+
+                @Override
+                public void setMessageStore(MessageStore messageStore) {
+                    throw new RuntimeException("MessageStore already known");
+                }
             });
         }
         tx.messages = updateFromPreparedStateCommands;
@@ -166,6 +173,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore
{
         message.getMessageId().setEntryLocator(id);
         Tx tx = getPreparedTx(message.getTransactionId());
         tx.add(new AddMessageCommand() {
+            MessageStore messageStore;
             @Override
             public Message getMessage() {
                 return message;
@@ -173,12 +181,18 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore
{
 
             @Override
             public MessageStore getMessageStore() {
-                return null;
+                return messageStore;
             }
 
             @Override
             public void run(ConnectionContext context) throws IOException {
                 ((JDBCPersistenceAdapter)persistenceAdapter).commitAdd(null, message.getMessageId());
+                ((JDBCMessageStore)messageStore).onAdd(message.getMessageId(), ((Long)message.getMessageId().getEntryLocator()).longValue(),
message.getPriority());
+            }
+
+            @Override
+            public void setMessageStore(MessageStore messageStore) {
+                this.messageStore = messageStore;
             }
 
         });
@@ -290,6 +304,11 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore
{
     }
 
     @Override
+    protected void onProxyQueueStore(ProxyMessageStore proxyQueueMessageStore) {
+        queueStores.put(proxyQueueMessageStore.getDestination(), proxyQueueMessageStore.getDelegate());
+    }
+
+    @Override
     protected void onRecovered(Tx tx) {
         for (RemoveMessageCommand removeMessageCommand: tx.acks) {
             if (removeMessageCommand instanceof LastAckCommand) {
@@ -304,6 +323,10 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore
{
                 ((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment();
             }
         }
+        for (AddMessageCommand addMessageCommand : tx.messages) {
+            ActiveMQDestination destination = addMessageCommand.getMessage().getDestination();
+            addMessageCommand.setMessageStore(destination.isQueue() ? queueStores.get(destination)
: topicStores.get(destination));
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 3095a0d..9255c00 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -562,6 +562,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter
{
                     @Override
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
+                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
                         sd.orderIndex.resetCursorPosition();
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
listener.hasSpace() && iterator
                                 .hasNext(); ) {
@@ -589,7 +590,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter
{
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Entry<Long, MessageKeys> entry = null;
-                        int counter = 0;
+                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
iterator.hasNext(); ) {
                             entry = iterator.next();
                             if (ackedAndPrepared.contains(entry.getValue().messageId)) {
@@ -610,6 +611,31 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter
{
             }
         }
 
+        protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned,
MessageRecoveryListener listener) throws Exception {
+            int counter = 0;
+            String id;
+            for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext();
) {
+                id = iterator.next();
+                iterator.remove();
+                Long sequence = sd.messageIdIndex.get(tx, id);
+                if (sequence != null) {
+                    if (sd.orderIndex.alreadyDispatched(sequence)) {
+                        listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location));
+                        counter++;
+                        if (counter >= maxReturned) {
+                            break;
+                        }
+                    } else {
+                        LOG.info("rolledback ack message {} with seq {} will be picked up
in future batch {}", id, sequence, sd.orderIndex.cursor);
+                    }
+                } else {
+                    LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd);
+                }
+            }
+            return counter;
+        }
+
+
         @Override
         public void resetBatching() {
             if (pageFile.isLoaded()) {
@@ -875,6 +901,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter
{
                         StoredDestination sd = getStoredDestination(dest, tx);
                         LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
                         sd.orderIndex.setBatch(tx, cursorPos);
+                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
iterator
                                 .hasNext();) {
                             Entry<Long, MessageKeys> entry = iterator.next();
@@ -918,7 +945,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter
{
                         }
 
                         Entry<Long, MessageKeys> entry = null;
-                        int counter = 0;
+                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
moc); iterator
                                 .hasNext();) {
                             entry = iterator.next();

http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
index 7bcb99a..1ca1def 100755
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
@@ -295,7 +295,7 @@ public class KahaDBTransactionStore implements TransactionStore {
             } else {
                 KahaTransactionInfo info = getTransactionInfo(txid);
                 theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit,
postCommit);
-                forgetRecoveredAcks(txid);
+                forgetRecoveredAcks(txid, false);
             }
         }else {
            LOG.error("Null transaction passed on commit");
@@ -310,16 +310,16 @@ public class KahaDBTransactionStore implements TransactionStore {
         if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()
== false) {
             KahaTransactionInfo info = getTransactionInfo(txid);
             theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null,
null);
-            forgetRecoveredAcks(txid);
+            forgetRecoveredAcks(txid, true);
         } else {
             inflightTransactions.remove(txid);
         }
     }
 
-    protected void forgetRecoveredAcks(TransactionId txid) throws IOException {
+    protected void forgetRecoveredAcks(TransactionId txid, boolean isRollback) throws IOException
{
         if (txid.isXATransaction()) {
             XATransactionId xaTid = ((XATransactionId) txid);
-            theStore.forgetRecoveredAcks(xaTid.getPreparedAcks());
+            theStore.forgetRecoveredAcks(xaTid.getPreparedAcks(), isRollback);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index b185d69..78e26a9 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2302,6 +2302,7 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
     @SuppressWarnings("rawtypes")
     protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions
= new LinkedHashMap<TransactionId, List<Operation>>();
     protected final Set<String> ackedAndPrepared = new HashSet<String>();
+    protected final Set<String> rolledBackAcks = new HashSet<String>();
 
     // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome
is rollback,
     // till then they are skipped by the store.
@@ -2317,12 +2318,16 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         }
     }
 
-    public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException
{
+    public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws
IOException {
         if (acks != null) {
             this.indexLock.writeLock().lock();
             try {
                 for (MessageAck ack : acks) {
-                    ackedAndPrepared.remove(ack.getLastMessageId().toProducerKey());
+                    final String id = ack.getLastMessageId().toProducerKey();
+                    ackedAndPrepared.remove(id);
+                    if (rollback) {
+                        rolledBackAcks.add(id);
+                    }
                 }
             } finally {
                 this.indexLock.writeLock().unlock();
@@ -2945,6 +2950,12 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
             return lastGetPriority;
         }
 
+        public boolean alreadyDispatched(Long sequence) {
+            return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition
>= sequence) ||
+                    (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition
>= sequence) ||
+                    (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition
>= sequence);
+        }
+
         class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
             Iterator<Entry<Long, MessageKeys>>currentIterator;
             final Iterator<Entry<Long, MessageKeys>>highIterator;

http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index fb570b2..49fe817 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -30,6 +30,7 @@ import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.activemq.broker.jmx.PersistenceAdapterViewMBean;
 import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.*;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.util.JMXSupport;
@@ -233,17 +234,137 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport
{
 
         // Commit the prepared transactions.
         for (int i = 0; i < dar.getData().length; i++) {
-            connection.request(createCommitTransaction2Phase(connectionInfo, (TransactionId)
dar.getData()[i]));
+            TransactionId transactionId = (TransactionId) dar.getData()[i];
+            LOG.info("commit: " + transactionId);
+            connection.request(createCommitTransaction2Phase(connectionInfo, transactionId));
         }
 
         // We should get the committed transactions.
         final int countToReceive = expectedMessageCount(4, destination);
         for (int i = 0; i < countToReceive ; i++) {
             Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10));
+            LOG.info("received: " + m);
+            assertNotNull("Got non null message: " + i, m);
+        }
+
+        assertNoMessagesLeft(connection);
+        assertEmptyDLQ();
+    }
+
+    private void assertEmptyDLQ() throws Exception {
+        try {
+            DestinationViewMBean destinationView = getProxyToDestination(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
+            assertEquals("nothing on dlq", 0, destinationView.getQueueSize());
+            assertEquals("nothing added to dlq", 0, destinationView.getEnqueueCount());
+        } catch (java.lang.reflect.UndeclaredThrowableException maybeOk) {
+            if (maybeOk.getUndeclaredThrowable() instanceof javax.management.InstanceNotFoundException)
{
+                // perfect no dlq
+            } else {
+                throw maybeOk;
+            }
+        }
+    }
+
+    public void testPreparedInterleavedTransactionRecoveredOnRestart() throws Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // Prepare 4 message sends.
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            XATransactionId txid = createXATransaction(sessionInfo);
+            connection.send(createBeginTransaction(connectionInfo, txid));
+
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+
+            // Prepare
+            connection.send(createPrepareTransaction(connectionInfo, txid));
+        }
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+
+        // send non tx message
+        Message message = createMessage(producerInfo, destination);
+        message.setPersistent(true);
+        connection.request(message);
+
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and try receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // consume non transacted message, but don't ack
+        int countToReceive = expectedMessageCount(1, destination);
+        for (int i=0; i< countToReceive; i++) {
+            Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10));
+            LOG.info("received: " + m);
+            assertNotNull("got non tx message after prepared", m);
+        }
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+
+        Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(),
null, TransactionInfo.RECOVER));
+        assertNotNull(response);
+        DataArrayResponse dar = (DataArrayResponse)response;
+        assertEquals(4, dar.getData().length);
+
+        // ensure we can close a connection with prepared transactions
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // open again  to deliver outcome
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+
+        // Commit the prepared transactions.
+        for (int i = 0; i < dar.getData().length; i++) {
+            TransactionId transactionId = (TransactionId) dar.getData()[i];
+            LOG.info("commit: " + transactionId);
+            connection.request(createCommitTransaction2Phase(connectionInfo, transactionId));
+        }
+
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // We should get the committed transactions and the non tx message
+        countToReceive = expectedMessageCount(5, destination);
+        for (int i = 0; i < countToReceive ; i++) {
+            Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10));
+            LOG.info("received: " + m);
             assertNotNull("Got non null message: " + i, m);
         }
 
         assertNoMessagesLeft(connection);
+        assertEmptyDLQ();
     }
 
     public void testTopicPreparedTransactionRecoveredOnRestart() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
index 13bd755..29822f1 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
@@ -292,7 +292,7 @@ public class TrapMessageInJDBCStoreTest extends TestCase {
             super.executeBatch();
 
             if (throwSQLException){
-                throw new SQLException("TEST SQL EXCEPTION from executeBatch");
+                throw new SQLException("TEST SQL EXCEPTION from executeBatch after super.
execution");
             }
 
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/cfe099d1/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index 54a8a01..c365d37 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -225,7 +225,7 @@ public class FailoverTransactionTest extends TestSupport {
         setDefaultPersistenceAdapter(broker);
         broker.start();
 
-        assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+        assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
 
         // new transaction
         Message msg = consumer.receive(20000);


Mime
View raw message