activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1311513 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Date Mon, 09 Apr 2012 22:56:15 GMT
Author: tabish
Date: Mon Apr  9 22:56:15 2012
New Revision: 1311513

URL: http://svn.apache.org/viewvc?rev=1311513&view=rev
Log:
additional fix for: https://issues.apache.org/jira/browse/AMQ-3775

ensure that the messageReferences are calculated correctly when loading a stored destination.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1311513&r1=1311512&r2=1311513&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Mon Apr  9 22:56:15 2012
@@ -1337,6 +1337,64 @@ public abstract class MessageDatabase ex
                 sd.messageIdIndex.clear(tx);
                 sd.locationIndex.clear(tx);
                 sd.orderIndex.clear(tx);
+            } else {
+
+                if (sd.subscriptionCache.size() == 1) {
+
+                    int messageIdCount = 0;
+                    TreeSet<Long> msgIdContents = new TreeSet<Long>();
+                    Iterator<Entry<String, Long>> iterator1 = sd.messageIdIndex.iterator(tx);
+                    while (iterator1.hasNext()) {
+                        Entry<String, Long> entry = iterator1.next();
+                        messageIdCount++;
+                        msgIdContents.add(entry.getValue());
+                    }
+
+                    int locationCount = 0;
+                    TreeSet<Long> locationContents = new TreeSet<Long>();
+                    Iterator<Entry<Location, Long>> iterator2 = sd.locationIndex.iterator(tx);
+                    while (iterator2.hasNext()) {
+                        Entry<Location, Long> entry = iterator2.next();
+                        locationCount++;
+                        locationContents.add(entry.getValue());
+                    }
+
+                    LOG.info("Size of sd.messageIdIndex = " + messageIdCount);
+                    LOG.info("Size of sd.locationIndex = " + locationCount);
+                    LOG.info("Size of sd.ackPositions = " + sd.ackPositions.size());
+                    LOG.info("Size of sd.messageReferences = " + sd.messageReferences.size());
+
+                    Iterator<Entry<String, SequenceSet>> iterator3 = sd.ackPositions.iterator(tx);
+                    while (iterator3.hasNext()) {
+                        Entry<String, SequenceSet> entry = iterator3.next();
+                        StringBuilder logEntry = new StringBuilder();
+                        logEntry.append("Subscription["+entry.getKey()+"] references: ");
+                        for (Long sequenceId : entry.getValue()) {
+                            logEntry.append(sequenceId + " ");
+                        }
+                        LOG.info(logEntry.toString());
+                    }
+
+                    StringBuilder msgIdLog = new StringBuilder();
+                    msgIdLog.append("sd.messageIdIndex contains [");
+                    for(Long sequenceId : msgIdContents) {
+                        msgIdLog.append(sequenceId + " ");
+                    }
+                    msgIdLog.append("]");
+                    LOG.info(msgIdLog.toString());
+                    StringBuilder locationLog = new StringBuilder();
+                    locationLog.append("sd.locationIndex contains [");
+                    for(Long sequenceId : locationContents) {
+                        locationLog.append(sequenceId + " ");
+                    }
+                    locationLog.append("]");
+                    LOG.info(locationLog.toString());
+
+                    LOG.info("Order index last default key: " + sd.orderIndex.lastDefaultKey);
+                    LOG.info("Order index last high key: " + sd.orderIndex.lastHighKey);
+                    LOG.info("Order index last low key: " + sd.orderIndex.lastLowKey);
+                }
+
             }
         }
     }
@@ -1829,13 +1887,22 @@ public abstract class MessageDatabase ex
             Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
             while (subscriptions.hasNext()) {
                 Entry<String, SequenceSet> subscription = subscriptions.next();
-                if (subscription.getValue() != null) {
-                    for(Long sequenceId : subscription.getValue()) {
+                SequenceSet pendingAcks = subscription.getValue();
+                if (pendingAcks != null && !pendingAcks.isEmpty()) {
+                    Long lastPendingAck = pendingAcks.getTail().getLast();
+                    for(Long sequenceId : pendingAcks) {
                         Long current = rc.messageReferences.get(sequenceId);
                         if (current == null) {
                             current = new Long(0);
                         }
-                        rc.messageReferences.put(sequenceId, Long.valueOf(current.longValue()
+ 1));
+
+                        // We always add a trailing empty entry for the next position to
start from
+                        // so we need to ensure we don't count that as a message reference
on reload.
+                        if (!sequenceId.equals(lastPendingAck)) {
+                            current = current.longValue() + 1;
+                        }
+
+                        rc.messageReferences.put(sequenceId, current);
                     }
                 }
             }



Mime
View raw message