activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6288
Date Thu, 12 May 2016 14:39:41 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 2e64abc38 -> c8a6171d0


https://issues.apache.org/jira/browse/AMQ-6288

Switching the checkpoint lock to a readlock when forwarding acks to
prevent other journal updates from being blocked.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c8a6171d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c8a6171d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c8a6171d

Branch: refs/heads/master
Commit: c8a6171d04303da18d9e19ded30146643d7cbad6
Parents: 2e64abc
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Thu May 12 14:37:51 2016 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Thu May 12 14:39:12 2016 +0000

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 28 +++++++++++++++-----
 1 file changed, 21 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c8a6171d/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 21530c2..92310a8 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1888,6 +1888,9 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
             int journalToAdvance = -1;
             Set<Integer> journalLogsReferenced = new HashSet<Integer>();
 
+            //flag to know whether the ack forwarding completed without an exception
+            boolean forwarded = false;
+
             try {
                 //acquire the checkpoint lock to prevent other threads from
                 //running a checkpoint while this is running
@@ -1903,7 +1906,7 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
                 //In the future it might be better to just remove the checkpointLock entirely
                 //and only use the executor but this would need to be examined for any unintended
                 //consequences
-                checkpointLock.writeLock().lock();
+                checkpointLock.readLock().lock();
 
                 try {
 
@@ -1937,18 +1940,29 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
                 try {
                     // Background rewrite of the old acks
                     forwardAllAcks(journalToAdvance, journalLogsReferenced);
-
-                    // Checkpoint with changes from the ackMessageFileMap
-                    checkpointUpdate(false);
+                    forwarded = true;
                 } catch (IOException ioe) {
-                    LOG.error("Checkpoint failed", ioe);
+                    LOG.error("Forwarding of acks failed", ioe);
                     brokerService.handleIOException(ioe);
                 } catch (Throwable e) {
-                    LOG.error("Checkpoint failed", e);
+                    LOG.error("Forwarding of acks failed", e);
                     brokerService.handleIOException(IOExceptionSupport.create(e));
                 }
             } finally {
-                checkpointLock.writeLock().unlock();
+                checkpointLock.readLock().unlock();
+            }
+
+            try {
+                if (forwarded) {
+                    // Checkpoint with changes from the ackMessageFileMap
+                    checkpointUpdate(false);
+                }
+            } catch (IOException ioe) {
+                LOG.error("Checkpoint failed", ioe);
+                brokerService.handleIOException(ioe);
+            } catch (Throwable e) {
+                LOG.error("Checkpoint failed", e);
+                brokerService.handleIOException(IOExceptionSupport.create(e));
             }
         }
     }


Mime
View raw message