activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r667752 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store: amq/AMQPersistenceAdapter.java kahadaptor/KahaTopicReferenceStore.java
Date Sat, 14 Jun 2008 06:59:36 GMT
Author: rajdavies
Date: Fri Jun 13 23:59:36 2008
New Revision: 667752

URL: http://svn.apache.org/viewvc?rev=667752&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1797

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=667752&r1=667751&r2=667752&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Fri Jun 13 23:59:36 2008
@@ -95,7 +95,7 @@
     private TaskRunnerFactory taskRunnerFactory;
     private WireFormat wireFormat = new OpenWireFormat();
     private SystemUsage usageManager;
-    private long checkpointInterval = 1000 * 60;
+    private long checkpointInterval = 1000 * 20;
     private int maxCheckpointMessageAddSize = 1024 * 4;
     private AMQTransactionStore transactionStore = new AMQTransactionStore(this);
     private TaskRunner checkpointTask;
@@ -375,12 +375,13 @@
                 LOG.debug("Checkpoint started.");
             }
 
-            Location newMark = null;
+            Location currentMark = asyncDataManager.getMark();
+            Location newMark = currentMark;
             Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
             while (queueIterator.hasNext()) {
                 final AMQMessageStore ms = queueIterator.next();
                 Location mark = (Location)ms.getMark();
-                if (mark != null && (newMark == null || newMark.compareTo(mark) <
0)) {
+                if (mark != null && (newMark == null || mark.compareTo(newMark) >
0)) {
                     newMark = mark;
                 }
             }
@@ -388,12 +389,12 @@
             while (topicIterator.hasNext()) {
                 final AMQTopicMessageStore ms = topicIterator.next();
                 Location mark = (Location)ms.getMark();
-                if (mark != null && (newMark == null || newMark.compareTo(mark) <
0)) {
+                if (mark != null && (newMark == null || mark.compareTo(newMark) >
0)) {
                     newMark = mark;
                 }
             }
             try {
-                if (newMark != null) {
+                if (newMark != currentMark) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Marking journal at: " + newMark);
                     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=667752&r1=667751&r2=667752&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Fri Jun 13 23:59:36 2008
@@ -142,10 +142,10 @@
             if (container != null) {
                 ConsumerMessageRef ref = null;
                 if((ref = container.remove(messageId)) != null) {
-                    TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
+                    StoreEntry entry = ref.getAckEntry();
+                    TopicSubAck tsa = ackContainer.get(entry);
                     if (tsa != null) {
                         if (tsa.decrementCount() <= 0) {
-                            StoreEntry entry = ref.getAckEntry();
                             entry = ackContainer.refresh(entry);
                             ackContainer.remove(entry);
                             ReferenceRecord rr = messageContainer.get(messageId);
@@ -156,6 +156,8 @@
                                 removeInterest(rr);
                                 removeMessage = true;
                             }
+                        }else {
+                            ackContainer.update(entry,tsa);
                         }
                     }
                 }else{



Mime
View raw message