activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1171743 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb: KahaDBStore.java MessageDatabase.java
Date Fri, 16 Sep 2011 19:31:30 GMT
Author: tabish
Date: Fri Sep 16 19:31:30 2011
New Revision: 1171743

URL: http://svn.apache.org/viewvc?rev=1171743&view=rev
Log:
fixes for https://issues.apache.org/jira/browse/AMQ-3467 


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1171743&r1=1171742&r2=1171743&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Fri Sep 16 19:31:30 2011
@@ -157,7 +157,7 @@ public class KahaDBStore extends Message
     public boolean isConcurrentStoreAndDispatchTransactions() {
         return this.concurrentStoreAndDispatchTransactions;
     }
-    
+
     /**
      * @return the maxAsyncJobs
      */
@@ -361,7 +361,7 @@ public class KahaDBStore extends Message
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
             command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
             store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(),
null, null);
-            
+
         }
 
         public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException
{
@@ -479,7 +479,7 @@ public class KahaDBStore extends Message
             }
         }
 
-        
+
         public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener
listener) throws Exception {
             indexLock.readLock().lock();
             try {
@@ -534,10 +534,10 @@ public class KahaDBStore extends Message
                 // Hopefully one day the page file supports concurrent read
                 // operations... but for now we must
                 // externally synchronize...
-               
+
                 indexLock.writeLock().lock();
                 try {
-                        pageFile.tx().execute(new Transaction.Closure<IOException>()
{
+                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
                         public void execute(Transaction tx) throws IOException {
                             StoredDestination sd = getStoredDestination(dest, tx);
                             Long location = sd.messageIdIndex.get(tx, key);
@@ -546,14 +546,12 @@ public class KahaDBStore extends Message
                             }
                         }
                     });
-                }finally {
+                } finally {
                     indexLock.writeLock().unlock();
                 }
-                
             } finally {
                 unlockAsyncJobQueue();
             }
-
         }
 
         @Override
@@ -618,7 +616,7 @@ public class KahaDBStore extends Message
         public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
                                 MessageId messageId, MessageAck ack)
                 throws IOException {
-            String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
             if (isConcurrentStoreAndDispatchTopics()) {
                 AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
                 StoreTopicTask task = null;
@@ -660,7 +658,7 @@ public class KahaDBStore extends Message
                     .getSubscriptionName());
             KahaSubscriptionCommand command = new KahaSubscriptionCommand();
             command.setDestination(dest);
-            command.setSubscriptionKey(subscriptionKey);
+            command.setSubscriptionKey(subscriptionKey.toString());
             command.setRetroactive(retroactive);
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
             command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(),
packet.getLength()));
@@ -671,7 +669,7 @@ public class KahaDBStore extends Message
         public void deleteSubscription(String clientId, String subscriptionName) throws IOException
{
             KahaSubscriptionCommand command = new KahaSubscriptionCommand();
             command.setDestination(dest);
-            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
+            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
             store(command, isEnableJournalDiskSyncs() && true, null, null);
             this.subscriptionCount.decrementAndGet();
         }
@@ -730,21 +728,13 @@ public class KahaDBStore extends Message
                 return pageFile.tx().execute(new Transaction.CallableClosure<Integer,
IOException>() {
                     public Integer execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
                         if (cursorPos == null) {
                             // The subscription might not exist.
                             return 0;
                         }
 
-                        int counter = 0;
-                        for (Iterator<Entry<Long, HashSet<String>>> iterator
=
-                                sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence);
iterator.hasNext();) {
-                            Entry<Long, HashSet<String>> entry = iterator.next();
-                            if (entry.getValue().contains(subscriptionKey)) {
-                                counter++;
-                            }
-                        }
-                        return counter;
+                        return (int) getStoredMessageCount(tx, sd, subscriptionKey);
                     }
                 });
             }finally {
@@ -755,13 +745,14 @@ public class KahaDBStore extends Message
         public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener
listener)
                 throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            @SuppressWarnings("unused")
             final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
             indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
                         sd.orderIndex.setBatch(tx, cursorPos);
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
iterator
                                 .hasNext();) {
@@ -779,6 +770,7 @@ public class KahaDBStore extends Message
         public void recoverNextMessages(String clientId, String subscriptionName, final int
maxReturned,
                 final MessageRecoveryListener listener) throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            @SuppressWarnings("unused")
             final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
             indexLock.writeLock().lock();
             try {
@@ -788,7 +780,7 @@ public class KahaDBStore extends Message
                         sd.orderIndex.resetCursorPosition();
                         MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
                         if (moc == null) {
-                            LastAck pos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                            LastAck pos = getLastAck(tx, sd, subscriptionKey);
                             if (pos == null) {
                                 // sub deleted
                                 return;
@@ -858,7 +850,7 @@ public class KahaDBStore extends Message
     /**
      * Cleanup method to remove any state associated with the given destination.
      * This method does not stop the message store (it might not be cached).
-     * 
+     *
      * @param destination
      *            Destination to forget
      */
@@ -868,7 +860,7 @@ public class KahaDBStore extends Message
     /**
      * Cleanup method to remove any state associated with the given destination
      * This method does not stop the message store (it might not be cached).
-     * 
+     *
      * @param destination
      *            Destination to forget
      */
@@ -920,7 +912,7 @@ public class KahaDBStore extends Message
     public long getLastMessageBrokerSequenceId() throws IOException {
         return 0;
     }
-    
+
     public long getLastProducerSequenceId(ProducerId id) {
         indexLock.readLock().lock();
         try {
@@ -1184,7 +1176,7 @@ public class KahaDBStore extends Message
 
         /**
          * add a key
-         * 
+         *
          * @param key
          * @return true if all acknowledgements received
          */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1171743&r1=1171742&r2=1171743&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Sep 16 19:31:30 2011
@@ -27,8 +27,23 @@ import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.Stack;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -56,11 +71,9 @@ import org.apache.activemq.util.Callback
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
-import org.apache.kahadb.util.LocationMarshaller;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.kahadb.index.BTreeIndex;
 import org.apache.kahadb.index.BTreeVisitor;
+import org.apache.kahadb.index.ListIndex;
 import org.apache.kahadb.journal.DataFile;
 import org.apache.kahadb.journal.Journal;
 import org.apache.kahadb.journal.Location;
@@ -70,6 +83,7 @@ import org.apache.kahadb.page.Transactio
 import org.apache.kahadb.util.ByteSequence;
 import org.apache.kahadb.util.DataByteArrayInputStream;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.LocationMarshaller;
 import org.apache.kahadb.util.LockFile;
 import org.apache.kahadb.util.LongMarshaller;
 import org.apache.kahadb.util.Marshaller;
@@ -77,6 +91,8 @@ import org.apache.kahadb.util.Sequence;
 import org.apache.kahadb.util.SequenceSet;
 import org.apache.kahadb.util.StringMarshaller;
 import org.apache.kahadb.util.VariableMarshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
 
@@ -97,7 +113,7 @@ public class MessageDatabase extends Ser
     static final long NOT_ACKED = -1;
     static final long UNMATCHED_SEQ = -2;
 
-    static final int VERSION = 3;
+    static final int VERSION = 4;
 
     protected class Metadata {
         protected Page<Metadata> page;
@@ -513,6 +529,7 @@ public class MessageDatabase extends Ser
         }
     }
 
+    @SuppressWarnings("unused")
     private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
         return TransactionIdConversion.convertToLocal(tx);
     }
@@ -1185,7 +1202,7 @@ public class MessageDatabase extends Ser
 
             sd.ackPositions.clear(tx);
             sd.ackPositions.unload(tx);
-            tx.free(sd.ackPositions.getPageId());
+            tx.free(sd.ackPositions.getHeadPageId());
         }
 
         String key = key(command.getDestination());
@@ -1207,10 +1224,12 @@ public class MessageDatabase extends Ser
                 addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey);
             }
             sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
+            sd.subscriptionCache.add(subscriptionKey);
         } else {
             // delete the sub...
             sd.subscriptions.remove(tx, subscriptionKey);
             sd.subscriptionAcks.remove(tx, subscriptionKey);
+            sd.subscriptionCache.remove(subscriptionKey);
             removeAckLocationsForSub(tx, sd, subscriptionKey);
         }
     }
@@ -1468,7 +1487,11 @@ public class MessageDatabase extends Ser
         BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
         BTreeIndex<String, LastAck> subscriptionAcks;
         HashMap<String, MessageOrderCursor> subscriptionCursors;
-        BTreeIndex<Long, HashSet<String>> ackPositions;
+        ListIndex<String, SequenceSet> ackPositions;
+
+        // Transient data used to track which Messages are no longer needed.
+        final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
+        final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
     }
 
     protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination>
{
@@ -1483,15 +1506,43 @@ public class MessageDatabase extends Ser
                 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile,
dataIn.readLong());
                 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile,
dataIn.readLong());
                 if (metadata.version >= 3) {
-                    value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile,
dataIn.readLong());
+                    value.ackPositions = new ListIndex<String, SequenceSet>(pageFile,
dataIn.readLong());
                 } else {
                     // upgrade
                     pageFile.tx().execute(new Transaction.Closure<IOException>() {
                         public void execute(Transaction tx) throws IOException {
-                            value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile,
tx.allocate());
-                            value.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
-                            value.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
-                            value.ackPositions.load(tx);
+                            BTreeIndex<Long, HashSet<String>> oldAckPositions
=
+                                new BTreeIndex<Long, HashSet<String>>(pageFile,
tx.allocate());
+                            oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
+                            oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
+                            oldAckPositions.load(tx);
+
+                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String,
SequenceSet>();
+
+                            // Do the initial build of the data in memory before writing
into the store
+                            // based Ack Positions List to avoid a lot of disk thrashing.
+                            Iterator<Entry<Long, HashSet<String>>> iterator
= oldAckPositions.iterator(tx);
+                            while (iterator.hasNext()) {
+                                Entry<Long, HashSet<String>> entry = iterator.next();
+
+                                for(String subKey : entry.getValue()) {
+                                    SequenceSet pendingAcks = temp.get(subKey);
+                                    if (pendingAcks == null) {
+                                        pendingAcks = new SequenceSet();
+                                        temp.put(subKey, pendingAcks);
+                                    }
+
+                                    pendingAcks.add(entry.getKey());
+                                }
+                            }
+
+                            // Now move the pending messages to ack data into the store backed
+                            // structure.
+                            value.ackPositions = new ListIndex<String, SequenceSet>(pageFile,
tx.allocate());
+                            for(String subscriptionKey : temp.keySet()) {
+                                value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
+                            }
+
                         }
                     });
                 }
@@ -1527,7 +1578,7 @@ public class MessageDatabase extends Ser
                 dataOut.writeBoolean(true);
                 dataOut.writeLong(value.subscriptions.getPageId());
                 dataOut.writeLong(value.subscriptionAcks.getPageId());
-                dataOut.writeLong(value.ackPositions.getPageId());
+                dataOut.writeLong(value.ackPositions.getHeadPageId());
             } else {
                 dataOut.writeBoolean(false);
             }
@@ -1594,7 +1645,7 @@ public class MessageDatabase extends Ser
             if (topic) {
                 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile,
tx.allocate());
                 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
-                rc.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile,
tx.allocate());
+                rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
             }
             metadata.destinations.put(tx, key, rc);
         }
@@ -1624,8 +1675,8 @@ public class MessageDatabase extends Ser
             rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
             rc.subscriptionAcks.load(tx);
 
-            rc.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
-            rc.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
+            rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
+            rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
             rc.ackPositions.load(tx);
 
             rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
@@ -1645,6 +1696,27 @@ public class MessageDatabase extends Ser
                 }
             }
 
+            // Configure the message references index
+            Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
+            while (subscriptions.hasNext()) {
+                Entry<String, SequenceSet> subscription = subscriptions.next();
+                if (subscription.getValue() != null) {
+                    for(Long sequenceId : subscription.getValue()) {
+                        Long current = rc.messageReferences.get(sequenceId);
+                        if (current == null) {
+                            current = new Long(0);
+                        }
+                        rc.messageReferences.put(sequenceId, Long.valueOf(current.longValue()
+ 1));
+                    }
+                }
+            }
+
+            // Configure the subscription cache
+            for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx);
iterator.hasNext(); ) {
+                Entry<String, LastAck> entry = iterator.next();
+                rc.subscriptionCache.add(entry.getKey());
+            }
+
             if (rc.orderIndex.nextMessageId == 0) {
                 // check for existing durable sub all acked out - pull next seq from acks
as messages are gone
                 if (!rc.subscriptionAcks.isEmpty(tx)) {
@@ -1656,16 +1728,16 @@ public class MessageDatabase extends Ser
                 }
             } else {
                 // update based on ackPositions for unmatched, last entry is always the next
-                if (!rc.ackPositions.isEmpty(tx)) {
-                    Entry<Long,HashSet<String>> last = rc.ackPositions.getLast(tx);
+                if (!rc.messageReferences.isEmpty()) {
+                    Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size()
- 1];
                     rc.orderIndex.nextMessageId =
-                        Math.max(rc.orderIndex.nextMessageId, last.getKey());
+                            Math.max(rc.orderIndex.nextMessageId, nextMessageId);
                 }
             }
 
         }
 
-        if (metadata.version < 3) {
+        if (metadata.version < VERSION) {
             // store again after upgrade
             metadata.destinations.put(tx, key, rc);
         }
@@ -1673,42 +1745,105 @@ public class MessageDatabase extends Ser
     }
 
     private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence,
String subscriptionKey) throws IOException {
-        HashSet<String> hs = sd.ackPositions.get(tx, messageSequence);
-        if (hs == null) {
-            hs = new HashSet<String>();
-        }
-        hs.add(subscriptionKey);
-        // every ack location addition needs to be a btree modification to get it stored
-        sd.ackPositions.put(tx, messageSequence, hs);
+        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
+        if (sequences == null) {
+            sequences = new SequenceSet();
+            sequences.add(messageSequence);
+            sd.ackPositions.put(tx, subscriptionKey, sequences);
+        } else {
+            sequences.add(messageSequence);
+            sd.ackPositions.add(tx, subscriptionKey, sequences);
+        }
+
+        Long count = sd.messageReferences.get(messageSequence);
+        if (count == null) {
+            count = Long.valueOf(0L);
+        }
+        count = count.longValue() + 1;
+        sd.messageReferences.put(messageSequence, count);
     }
 
     // new sub is interested in potentially all existing messages
     private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long
messageSequence, String subscriptionKey) throws IOException {
-        for (Iterator<Entry<Long, HashSet<String>>> iterator = sd.ackPositions.iterator(tx,
messageSequence); iterator.hasNext(); ) {
-            Entry<Long, HashSet<String>> entry = iterator.next();
-            entry.getValue().add(subscriptionKey);
-            sd.ackPositions.put(tx, entry.getKey(), entry.getValue());
+        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
+        if (sequences == null) {
+            sequences = new SequenceSet();
+            sequences.add(messageSequence);
+            sd.ackPositions.put(tx, subscriptionKey, sequences);
+        } else {
+            sequences.add(messageSequence);
+            sd.ackPositions.add(tx, subscriptionKey, sequences);
+        }
+
+        Long count = sd.messageReferences.get(messageSequence);
+        if (count == null) {
+            count = Long.valueOf(0L);
         }
+        count = count.longValue() + 1;
+        sd.messageReferences.put(messageSequence, count);
     }
 
-    final HashSet nextMessageIdMarker = new HashSet<String>();
     // on a new message add, all existing subs are interested in this message
     private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence)
throws IOException {
-        HashSet hs = new HashSet<String>();
-        for (Iterator<Entry<String, LastAck>> iterator = sd.subscriptionAcks.iterator(tx);
iterator.hasNext();) {
-            Entry<String, LastAck> entry = iterator.next();
-            hs.add(entry.getKey());
-        }
-        sd.ackPositions.put(tx, messageSequence, hs);
-        // add empty next to keep track of nextMessage
-        sd.ackPositions.put(tx, messageSequence+1, nextMessageIdMarker);
+        for(String subscriptionKey : sd.subscriptionCache) {
+            SequenceSet sequences = null;
+            sequences = sd.ackPositions.get(tx, subscriptionKey);
+            if (sequences == null) {
+                sequences = new SequenceSet();
+                sequences.add(new Sequence(messageSequence, messageSequence + 1));
+                sd.ackPositions.put(tx, subscriptionKey, sequences);
+            } else {
+                sequences.add(new Sequence(messageSequence, messageSequence + 1));
+                sd.ackPositions.add(tx, subscriptionKey, sequences);
+            }
+
+            Long count = sd.messageReferences.get(messageSequence);
+            if (count == null) {
+                count = Long.valueOf(0L);
+            }
+            count = count.longValue() + 1;
+            sd.messageReferences.put(messageSequence, count);
+            sd.messageReferences.put(messageSequence+1, Long.valueOf(0L));
+        }
     }
 
     private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey)
throws IOException {
         if (!sd.ackPositions.isEmpty(tx)) {
-            Long end = sd.ackPositions.getLast(tx).getKey();
-            for (Long sequence = sd.ackPositions.getFirst(tx).getKey(); sequence <= end;
sequence++) {
-                removeAckLocation(tx, sd, subscriptionKey, sequence);
+            SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
+            if (sequences == null || sequences.isEmpty()) {
+                return;
+            }
+
+            ArrayList<Long> unreferenced = new ArrayList<Long>();
+
+            for(Long sequenceId : sequences) {
+                long references = 0;
+                Long count = sd.messageReferences.get(sequenceId);
+                if (count != null) {
+                    references = count.longValue() - 1;
+                } else {
+                    continue;
+                }
+
+                if (references > 0) {
+                    sd.messageReferences.put(sequenceId, Long.valueOf(references));
+                } else {
+                    sd.messageReferences.remove(sequenceId);
+                    unreferenced.add(sequenceId);
+                }
+            }
+
+            for(Long sequenceId : unreferenced) {
+                // Find all the entries that need to get deleted.
+                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long,
MessageKeys>>();
+                sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
+
+                // Do the actual deletes.
+                for (Entry<Long, MessageKeys> entry : deletes) {
+                    sd.locationIndex.remove(tx, entry.getValue().location);
+                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
+                    sd.orderIndex.remove(tx, entry.getKey());
+                }
             }
         }
     }
@@ -1723,31 +1858,54 @@ public class MessageDatabase extends Ser
     private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey,
Long sequenceId) throws IOException {
         // Remove the sub from the previous location set..
         if (sequenceId != null) {
-            HashSet<String> hs = sd.ackPositions.get(tx, sequenceId);
-            if (hs != null) {
-                hs.remove(subscriptionKey);
-                if (hs.isEmpty()) {
-                    HashSet<String> firstSet = sd.ackPositions.getFirst(tx).getValue();
-                    sd.ackPositions.remove(tx, sequenceId);
-
-                    // Find all the entries that need to get deleted.
-                    ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long,
MessageKeys>>();
-                    sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
-
-                    // Do the actual deletes.
-                    for (Entry<Long, MessageKeys> entry : deletes) {
-                        sd.locationIndex.remove(tx, entry.getValue().location);
-                        sd.messageIdIndex.remove(tx, entry.getValue().messageId);
-                        sd.orderIndex.remove(tx, entry.getKey());
-                    }
+            SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
+            if (range != null && !range.isEmpty()) {
+                range.remove(sequenceId);
+                if (!range.isEmpty()) {
+                    sd.ackPositions.put(tx, subscriptionKey, range);
                 } else {
-                    // update
-                    sd.ackPositions.put(tx, sequenceId, hs);
+                    sd.ackPositions.remove(tx, subscriptionKey);
+                }
+
+                // Check if the message is reference by any other subscription.
+                Long count = sd.messageReferences.get(sequenceId);
+                long references = count.longValue() - 1;
+                if (references > 0) {
+                    sd.messageReferences.put(sequenceId, Long.valueOf(references));
+                    return;
+                } else {
+                    sd.messageReferences.remove(sequenceId);
+                }
+
+                // Find all the entries that need to get deleted.
+                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long,
MessageKeys>>();
+                sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
+
+                // Do the actual deletes.
+                for (Entry<Long, MessageKeys> entry : deletes) {
+                    sd.locationIndex.remove(tx, entry.getValue().location);
+                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
+                    sd.orderIndex.remove(tx, entry.getKey());
                 }
             }
         }
     }
 
+    public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey)
throws IOException {
+        return sd.subscriptionAcks.get(tx, subscriptionKey);
+    }
+
+    public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey)
throws IOException {
+        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
+        if (messageSequences != null) {
+            long result = messageSequences.rangeSize();
+            // if there's anything in the range the last value is always the nextMessage
marker, so remove 1.
+            return result > 0 ? result - 1 : 0;
+        }
+
+        return 0;
+    }
+
     private String key(KahaDestination destination) {
         return destination.getType().getNumber() + ":" + destination.getName();
     }
@@ -1799,6 +1957,7 @@ public class MessageDatabase extends Ser
         return tx;
     }
 
+    @SuppressWarnings("unused")
     private TransactionId key(KahaTransactionInfo transactionInfo) {
         return TransactionIdConversion.convert(transactionInfo);
     }
@@ -2452,6 +2611,7 @@ public class MessageDatabase extends Ser
             dataOut.write(data);
         }
 
+        @SuppressWarnings("unchecked")
         public HashSet<String> readPayload(DataInput dataIn) throws IOException {
             int dataLen = dataIn.readInt();
             byte[] data = new byte[dataLen];



Mime
View raw message