activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: AMQ-7067 - ensure updates to ackMessageFileMap are protected by the index lock
Date Tue, 09 Oct 2018 11:55:48 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 57c793953 -> a311139bf


AMQ-7067 - ensure updates to ackMessageFileMap are protected by the index lock


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a311139b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a311139b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a311139b

Branch: refs/heads/master
Commit: a311139bfe2f2b3ffc0c84cfb1e9cec0c11830c7
Parents: 57c7939
Author: gtully <gary.tully@gmail.com>
Authored: Tue Oct 9 12:55:11 2018 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Tue Oct 9 12:55:11 2018 +0100

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 27 ++++++++++++++------
 1 file changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a311139b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 46696f4..d231a86 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1401,6 +1401,7 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
                 public void execute(Transaction tx) throws IOException {
                     for (Operation op : messagingTx) {
                         op.execute(tx);
+                        recordAckMessageReferenceLocation(location, op.getLocation());
                     }
                 }
             });
@@ -1408,21 +1409,26 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         } finally {
             indexLock.writeLock().unlock();
         }
-        for (Operation op: inflightTx) {
-            recordAckMessageReferenceLocation(location, op.getLocation());
-        }
     }
 
     @SuppressWarnings("rawtypes")
     protected void process(KahaPrepareCommand command, Location location) {
         TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
+        List<Operation> tx = null;
         synchronized (inflightTransactions) {
-            List<Operation> tx = inflightTransactions.remove(key);
+            tx = inflightTransactions.remove(key);
             if (tx != null) {
                 preparedTransactions.put(key, tx);
-                for (Operation op: tx) {
+            }
+        }
+        if (tx != null && !tx.isEmpty()) {
+            indexLock.writeLock().lock();
+            try {
+                for (Operation op : tx) {
                     recordAckMessageReferenceLocation(location, op.getLocation());
                 }
+            } finally {
+                indexLock.writeLock().unlock();
             }
         }
     }
@@ -1437,9 +1443,14 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
                 updates = preparedTransactions.remove(key);
             }
         }
-        if (key.isXATransaction() && updates != null) {
-            for(Operation op : updates) {
-                recordAckMessageReferenceLocation(location, op.getLocation());
+        if (key.isXATransaction() && updates != null && !updates.isEmpty())
{
+            indexLock.writeLock().lock();
+            try {
+                for (Operation op : updates) {
+                    recordAckMessageReferenceLocation(location, op.getLocation());
+                }
+            } finally {
+                indexLock.writeLock().unlock();
             }
         }
     }


Mime
View raw message