activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r692502 - in /activemq/sandbox/kahadb/src/main: java/org/apache/kahadb/journal/ java/org/apache/kahadb/store/ proto/
Date Fri, 05 Sep 2008 18:07:10 GMT
Author: chirino
Date: Fri Sep  5 11:07:09 2008
New Revision: 692502

URL: http://svn.apache.org/viewvc?rev=692502&view=rev
Log:
Implemented the topic side of the MesageSTore.


Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java
    activemq/sandbox/kahadb/src/main/proto/journal-data.proto

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java?rev=692502&r1=692501&r2=692502&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java Fri Sep
 5 11:07:09 2008
@@ -42,13 +42,18 @@
     public Location() {
     }
 
-    Location(Location item) {
+    public Location(Location item) {
         this.dataFileId = item.dataFileId;
         this.offset = item.offset;
         this.size = item.size;
         this.type = item.type;
     }
 
+    public Location(int dataFileId, int offset) {
+        this.dataFileId=dataFileId;
+        this.offset=offset;
+    }
+
     boolean isValid() {
         return dataFileId != NOT_SET;
     }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java?rev=692502&r1=692501&r2=692502&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
Fri Sep  5 11:07:09 2008
@@ -22,6 +22,7 @@
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
@@ -53,9 +54,12 @@
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.wireformat.WireFormat;
+import org.apache.kahadb.StringMarshaller;
+import org.apache.kahadb.index.BTreeIndex;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Transaction;
 import org.apache.kahadb.store.MessageDatabase.AddOpperation;
+import org.apache.kahadb.store.MessageDatabase.LocationMarshaller;
 import org.apache.kahadb.store.MessageDatabase.Operation;
 import org.apache.kahadb.store.MessageDatabase.StoredDestination;
 import org.apache.kahadb.store.data.KahaAddMessageCommand;
@@ -68,22 +72,18 @@
 import org.apache.kahadb.store.data.KahaRemoveMessageCommand;
 import org.apache.kahadb.store.data.KahaRollbackCommand;
 import org.apache.kahadb.store.data.KahaTransactionInfo;
+import org.apache.kahadb.store.data.KahaSubscriptionCommand;
 import org.apache.kahadb.store.data.KahaXATransactionId;
 import org.apache.kahadb.store.data.KahaDestination.DestinationType;
 
 public class KahaDBPersistenceAdaptor extends MessageDatabase implements PersistenceAdapter
{
 
-    private String brokerName;
-    private SystemUsage usageManager;
     private WireFormat wireFormat = new OpenWireFormat();
     private AtomicBoolean started = new AtomicBoolean();
 
     public void setBrokerName(String brokerName) {
-        this.brokerName = brokerName;
     }
-    
     public void setUsageManager(SystemUsage usageManager) {
-        this.usageManager = usageManager;
     }
 
     public void start() throws Exception {
@@ -141,9 +141,9 @@
         };
     }
 
-    class KahaDBMessageStore implements MessageStore {
+    public class KahaDBMessageStore implements MessageStore {
         private final ActiveMQDestination destination;
-        private KahaDestination dest;
+        protected KahaDestination dest;
 
         public KahaDBMessageStore(ActiveMQDestination destination) {
             this.destination = destination;
@@ -252,8 +252,7 @@
                         if( entry!=null ) {
                             // Copy the location, cause the iterator gives us the key by
reference.. changing it
                             // would mess up the index.
-                            cursorPos = new Location();
-                            cursorPos.setDataFileId(entry.getKey().getDataFileId());
+                            cursorPos = new Location(entry.getKey());
                             cursorPos.setOffset(entry.getKey().getOffset()+1 );
                         }
                     }
@@ -273,50 +272,184 @@
         }
         
     }
-    
+        
     class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore
{
         public KahaDBTopicMessageStore(ActiveMQTopic destination) {
             super(destination);
         }
-
+        
         public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId) throws IOException {
+            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
+            command.setDestination(dest);
+            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
+            command.setMessageId(messageId.toString());
+            // We are not passed a transaction info.. so we can't participate in a transaction.
+            // Looks like a design issue with the TopicMessageStore interface.  Also we can't
recover the original ack
+            // to pass back to the XA recover method.
+            // command.setTransactionInfo();
+            store(command, true);
         }
 
         public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive)
throws IOException {
+            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
+            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
+            command.setDestination(dest);
+            command.setSubscriptionKey(subscriptionKey);
+            command.setRetroactive(retroactive);
+            ByteSequence packet = wireFormat.marshal(subscriptionInfo);
+            command.setSubscriptionInfo(ByteString.copyFrom(packet.getData(), packet.getOffset(),
packet.getLength()));
+            store(command, true);
         }
 
         public void deleteSubscription(String clientId, String subscriptionName) throws IOException
{
+            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
+            command.setDestination(dest);
+            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
+            store(command, true);
         }
 
         public SubscriptionInfo[] getAllSubscriptions() throws IOException {
-            return null;
-        }
+            
+            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<IOException>(){
+                    public void execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        for (Iterator<Entry<String, KahaSubscriptionCommand>>
iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
+                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
+                            SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal(
new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) );
+                            subscriptions.add(info);
 
-        public int getMessageCount(String clientId, String subscriberName) throws IOException
{
-            return 0;
+                        }
+                    }
+                });
+            }
+            
+            SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
+            subscriptions.toArray(rc);
+            return rc;
         }
 
         public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName)
throws IOException {
-            return null;
+            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            synchronized(indexMutex) {
+                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo,
IOException>(){
+                    public SubscriptionInfo execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
+                        if( command ==null ) {
+                            return null;
+                        }
+                        return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput())
);
+                    }
+                });
+            }
+        }
+       
+        public int getMessageCount(String clientId, String subscriptionName) throws IOException
{
+            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            synchronized(indexMutex) {
+                return pageFile.tx().execute(new Transaction.CallableClosure<Integer,
IOException>(){
+                    public Integer execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Location cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                        if ( cursorPos==null ) {
+                            // The subscription might not exist.
+                            return 0;
+                        }
+                        cursorPos = new Location(cursorPos);
+                        cursorPos.setOffset(cursorPos.getOffset()+1 );
+                        
+                        int counter = 0;
+                        for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx,
cursorPos); iterator.hasNext();) {
+                            iterator.next();
+                            counter++;
+                        }
+                        return counter;
+                    }
+                });
+            }        
         }
 
-        public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
MessageRecoveryListener listener) throws Exception {
+        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener
listener) throws Exception {
+            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    public void execute(Transaction tx) throws Exception {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Location cursorPos = new Location(sd.subscriptionAcks.get(tx, subscriptionKey));
+                        cursorPos.setOffset(cursorPos.getOffset()+1 );
+                        
+                        for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx,
cursorPos); iterator.hasNext();) {
+                            Entry<Location, String> entry = iterator.next();
+                            listener.recoverMessage( loadMessage(entry.getKey() ) );
+                        }
+                    }
+                });
+            }
         }
 
-        public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener
listener) throws Exception {
+        public void recoverNextMessages(String clientId, String subscriptionName, final int
maxReturned, final MessageRecoveryListener listener) throws Exception {
+            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    public void execute(Transaction tx) throws Exception {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Location cursorPos = sd.subscriptionCursors.get(subscriptionKey);
+                        if( cursorPos == null ) {
+                            cursorPos = new Location(sd.subscriptionAcks.get(tx, subscriptionKey));
+                            cursorPos.setOffset(cursorPos.getOffset()+1 );
+                        }
+                        
+                        Entry<Location, String> entry=null;
+                        int counter = 0;
+                        for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx,
cursorPos); iterator.hasNext();) {
+                            entry = iterator.next();
+                            listener.recoverMessage( loadMessage(entry.getKey() ) );
+                            counter++;
+                            if( counter >= maxReturned ) {
+                                break;
+                            }
+                        }
+                        if( entry!=null ) {
+                            // Copy the location, cause the iterator gives us the key by
reference.. changing it
+                            // would mess up the index.
+                            cursorPos = new Location(entry.getKey());
+                            cursorPos.setOffset(entry.getKey().getOffset()+1 );
+                            sd.subscriptionCursors.put(subscriptionKey, cursorPos);
+                        }
+                    }
+                });
+            }
         }
 
         public void resetBatching(String clientId, String subscriptionName) {
+            try {
+                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+                synchronized(indexMutex) {
+                    pageFile.tx().execute(new Transaction.Closure<IOException>(){
+                        public void execute(Transaction tx) throws IOException {
+                            StoredDestination sd = getStoredDestination(dest, tx);
+                            sd.subscriptionCursors.remove(subscriptionKey);
+                        }
+                    });
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
         }
     }
 
+    String subscriptionKey(String clientId, String subscriptionName){
+        return clientId+":"+subscriptionName;
+    }
+    
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException
{
         return new KahaDBMessageStore(destination);
     }
 
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException
{
-        throw new IOException("Not yet implemented.");
-//        return new KahaDBTopicMessageStore(destination);
+        return new KahaDBTopicMessageStore(destination);
     }
     
     public void deleteAllMessages() throws IOException {
@@ -385,7 +518,7 @@
      * @return
      * @throws IOException
      */
-    private Message loadMessage(Location location) throws IOException {
+    Message loadMessage(Location location) throws IOException {
         KahaAddMessageCommand addMessage = (KahaAddMessageCommand)load(location);
         Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addMessage.getMessage().newInput())
);
         return msg;
@@ -395,7 +528,7 @@
     // Internal conversion methods.
     ///////////////////////////////////////////////////////////////////
     
-    private KahaTransactionInfo createTransactionInfo(TransactionId txid) {
+    KahaTransactionInfo createTransactionInfo(TransactionId txid) {
         if( txid ==null ) {
             return null;
         }
@@ -424,14 +557,14 @@
         return rc;
     }
     
-    private KahaLocation convert(Location location) {
+    KahaLocation convert(Location location) {
         KahaLocation rc = new KahaLocation();
         rc.setLogId(location.getDataFileId());
         rc.setOffset(location.getOffset());
         return rc;
     }
     
-    private KahaDestination convert(ActiveMQDestination dest) {
+    KahaDestination convert(ActiveMQDestination dest) {
         KahaDestination rc = new KahaDestination();
         rc.setName(dest.getPhysicalName());
         switch( dest.getDestinationType() ) {
@@ -452,7 +585,7 @@
         }
     }
 
-    private ActiveMQDestination convert(String dest) {
+    ActiveMQDestination convert(String dest) {
         int p = dest.indexOf(":");
         if( p<0 ) {
             throw new IllegalArgumentException("Not in the valid destination format");

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=692502&r1=692501&r2=692502&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Fri
Sep  5 11:07:09 2008
@@ -20,13 +20,21 @@
 import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.TreeMap;
+import java.util.Map.Entry;
 
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.commons.logging.Log;
@@ -48,6 +56,7 @@
 import org.apache.kahadb.store.data.KahaRemoveDestinationCommand;
 import org.apache.kahadb.store.data.KahaRemoveMessageCommand;
 import org.apache.kahadb.store.data.KahaRollbackCommand;
+import org.apache.kahadb.store.data.KahaSubscriptionCommand;
 import org.apache.kahadb.store.data.KahaTraceCommand;
 import org.apache.kahadb.store.data.KahaTransactionInfo;
 import org.apache.kahadb.store.data.KahaXATransactionId;
@@ -68,15 +77,21 @@
         protected Page<Metadata> page;
         protected int state;
         protected BTreeIndex<String, StoredDestination> destinations;
-
+        protected Location lastUpdate;
+        protected Location firstInProgressTransactionLocation;
+        
         public void read(DataInput is) throws IOException {
             state = is.readInt();
             destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
+            lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
+            firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
         }
 
         public void write(DataOutput os) throws IOException {
             os.writeInt(state);
             os.writeLong(destinations.getPageId());
+            LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
+            LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation,
os);
         }
     }
 
@@ -160,6 +175,9 @@
                     metadata.page = page;
                     metadata.state = CLOSED_STATE;
                     metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile,
tx.allocate().getPageId());
+                    metadata.lastUpdate = new Location(0,0);
+                    metadata.firstInProgressTransactionLocation = new Location(0,0);
+                    
                     tx.store(metadata.page, metadataMarshaller, true);
                 } else {
                     Page<Metadata> page = tx.load(0, metadataMarshaller);
@@ -245,9 +263,10 @@
         DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
         os.writeByte(data.type().getNumber());
         data.writeTo(os);
-        Location result = asyncDataManager.write(os.getByteSequence(), sync);
-        process(data, result);
-        return result;
+        Location location = asyncDataManager.write(os.getByteSequence(), sync);
+        process(data, location);
+        metadata.lastUpdate = location;
+        return location;
     }
 
 
@@ -304,6 +323,11 @@
             public void visit(KahaRemoveDestinationCommand command) throws IOException {
                 process(command, location);
             }
+            
+            @Override
+            public void visit(KahaSubscriptionCommand command) throws IOException {
+                process(command, location);
+            }
         });
     }
 
@@ -348,6 +372,16 @@
         }
     }
     
+    protected void process(final KahaSubscriptionCommand command, final Location location)
throws IOException {
+        synchronized(indexMutex) {
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    updateIndex(tx, command, location);
+                }
+            });
+        }
+    }
+    
     protected void process(KahaCommitCommand command, Location location) throws IOException
{
         TransactionId key = key(command.getTransactionInfo());
         ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
@@ -385,7 +419,7 @@
             preparedTransactions.remove(key);
         }
     }
-
+    
     ///////////////////////////////////////////////////////////////////
     // These methods do the actual index updates.
     ///////////////////////////////////////////////////////////////////
@@ -394,16 +428,43 @@
     
     private void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location)
throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
+
+        // Skip adding the message to the index if this is a topic and there are no subscriptions.
+        if( sd.subscriptions!=null && sd.ackLocations.isEmpty() ) {
+            return;
+        }
+        
+        // Add the message.
         sd.orderIndex.put(tx, location, command.getMessageId());
         sd.messageIdIndex.put(tx, command.getMessageId(), location);
     }
 
-    private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location location)
throws IOException {
+    private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation)
throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
-        Location l = sd.messageIdIndex.remove(tx, command.getMessageId());
-        if( l!=null ) {
-            sd.orderIndex.remove(tx, l);
+        if( !command.hasSubscriptionKey() ) {
+            // In the queue case we just remove the message from the index..
+            Location messageLocation = sd.messageIdIndex.remove(tx, command.getMessageId());
+            if( messageLocation!=null ) {
+                sd.orderIndex.remove(tx, messageLocation);
+            }
+        } else {
+            // In the topic case we need remove the message once it's been acked by all the
subs
+            Location messageLocation = sd.messageIdIndex.get(tx, command.getMessageId());
+            
+            // Make sure it's a valid message id...
+            if( messageLocation!=null ) {
+                String subscriptionKey = command.getSubscriptionKey();
+                Location prev = sd.subscriptionAcks.put(tx, subscriptionKey, messageLocation);
+                
+                // The following method handles deleting un-referenced messages.
+                removeAckLocation(tx, sd, subscriptionKey, prev);
+                
+                // Add it to the new location set.
+                addAckLocation(sd, messageLocation, subscriptionKey);
+            }
+            
         }
+        metadata.lastUpdate = ackLocation;
     }
     
     private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location
location) throws IOException {
@@ -415,10 +476,47 @@
         tx.free(sd.orderIndex.getPageId());
         tx.free(sd.messageIdIndex.getPageId());
 
+        if( sd.subscriptions!=null ) {
+            sd.subscriptions.clear(tx);
+            sd.subscriptionAcks.clear(tx);
+            sd.subscriptions.unload();
+            sd.subscriptionAcks.unload();
+            tx.free(sd.subscriptions.getPageId());
+            tx.free(sd.subscriptionAcks.getPageId());
+        }
+        
         String key = key(command.getDestination());
         storedDestinations.remove(key);
         metadata.destinations.remove(tx, key);
     }
+    
+    private void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location)
throws IOException {
+        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
+        
+        // If set then we are creating it.. otherwise we are destroying the sub
+        if( command.hasSubscriptionInfo() ) {
+            String subscriptionKey = command.getSubscriptionKey();
+            sd.subscriptions.put(tx, subscriptionKey, command);
+            Location ackLocation;
+            if( command.getRetroactive() ) {
+                ackLocation = new Location(0,0);
+            } else {
+                ackLocation = location;
+            }
+            
+            sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
+            addAckLocation(sd, ackLocation, subscriptionKey);
+        } else {
+            // delete the sub...
+            String subscriptionKey = command.getSubscriptionKey();
+            sd.subscriptions.remove(tx, subscriptionKey);
+            Location prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
+            removeAckLocation(tx, sd, subscriptionKey, prev);
+        }
+
+    }
+
+    
 
     ///////////////////////////////////////////////////////////////////
     // StoredDestination related implementation methods.
@@ -426,9 +524,22 @@
 
     private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String,
StoredDestination>();
 
+    class StoredSubscription {
+        SubscriptionInfo subscriptionInfo;
+        String lastAckId;
+        Location lastAckLocation;
+        Location cursor;
+    }
+
     static class StoredDestination {
         BTreeIndex<Location, String> orderIndex;
         BTreeIndex<String, Location> messageIdIndex;
+
+        // These bits are only set for Topics
+        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
+        BTreeIndex<String, Location> subscriptionAcks;
+        HashMap<String, Location> subscriptionCursors;
+        TreeMap<Location, HashSet<String>> ackLocations;
     }
     
     protected class StoredDestinationMarshaller implements Marshaller<StoredDestination>
{
@@ -437,15 +548,27 @@
         }
 
         public StoredDestination readPayload(DataInput dataIn) throws IOException {
-            StoredDestination rc = new StoredDestination();
-            rc.orderIndex = new BTreeIndex<Location, String>(pageFile, dataIn.readLong());
-            rc.messageIdIndex = new BTreeIndex<String, Location>(pageFile, dataIn.readLong());
-            return rc;
-        }
-
-        public void writePayload(StoredDestination object, DataOutput dataOut) throws IOException
{
-            dataOut.writeLong(object.orderIndex.getPageId());
-            dataOut.writeLong(object.messageIdIndex.getPageId());
+            StoredDestination value = new StoredDestination();
+            value.orderIndex = new BTreeIndex<Location, String>(pageFile, dataIn.readLong());
+            value.messageIdIndex = new BTreeIndex<String, Location>(pageFile, dataIn.readLong());
+            
+            if( dataIn.readBoolean() ) {
+                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile,
dataIn.readLong());
+                value.subscriptionAcks = new BTreeIndex<String, Location>(pageFile,
dataIn.readLong());
+            }
+            return value;
+        }
+
+        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException
{
+            dataOut.writeLong(value.orderIndex.getPageId());
+            dataOut.writeLong(value.messageIdIndex.getPageId());
+            if( value.subscriptions !=null ) {
+                dataOut.writeBoolean(true);
+                dataOut.writeLong(value.subscriptions.getPageId());
+                dataOut.writeLong(value.subscriptionAcks.getPageId());
+            } else {
+                dataOut.writeBoolean(false);
+            }
         }
     }
     
@@ -468,6 +591,24 @@
             dataOut.writeInt(object.getOffset());
         }
     }
+    
+    static class KahaSubscriptionCommandMarshaller implements Marshaller<KahaSubscriptionCommand>
{
+        final static KahaSubscriptionCommandMarshaller INSTANCE = new  KahaSubscriptionCommandMarshaller();
+        
+        public Class<KahaSubscriptionCommand> getType() {
+            return KahaSubscriptionCommand.class;
+        }
+
+        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
+            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
+            rc.mergeFrom((InputStream)dataIn);
+            return rc;
+        }
+
+        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws
IOException {
+            object.writeTo((OutputStream)dataOut);
+        }
+    }
 
     protected StoredDestination getStoredDestination(KahaDestination destination, Transaction
tx) throws IOException {
         String key = key(destination);
@@ -480,16 +621,43 @@
                 rc = new StoredDestination();
                 rc.orderIndex = new BTreeIndex<Location, String>(pageFile, tx.allocate());
                 rc.messageIdIndex = new BTreeIndex<String, Location>(pageFile, tx.allocate());
+                
+                if( destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType()
== KahaDestination.DestinationType.TEMP_TOPIC ) {
+                    rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile,
tx.allocate());
+                    rc.subscriptionAcks = new BTreeIndex<String, Location>(pageFile,
tx.allocate());
+                }
             }
             
             // Configure the marshalers and load.
             rc.orderIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
             rc.orderIndex.setValueMarshaller(StringMarshaller.INSTANCE);
             rc.orderIndex.load();
+            
             rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
             rc.messageIdIndex.setValueMarshaller(LocationMarshaller.INSTANCE);
             rc.messageIdIndex.load();
             
+            // If it was a topic...
+            if( rc.subscriptions!=null ) {
+                
+                rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
+                rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
+                rc.subscriptions.load();
+                
+                rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
+                rc.subscriptionAcks.setValueMarshaller(LocationMarshaller.INSTANCE);
+                rc.subscriptionAcks.load();
+                
+                rc.ackLocations = new TreeMap<Location, HashSet<String>>();
+                rc.subscriptionCursors = new HashMap<String, Location>();
+                
+                for (Iterator<Entry<String, Location>> iterator = rc.subscriptionAcks.iterator(tx);
iterator.hasNext();) {
+                    Entry<String,Location> entry = iterator.next();
+                    addAckLocation(rc, entry.getValue(), entry.getKey());
+                }
+
+            }
+            
             // Cache it.  We may want to remove/unload destinations from the cache that are
not used for a while
             // to reduce memory usage.
             storedDestinations.put(key, rc);
@@ -497,6 +665,67 @@
         return rc;
     }
     
+    /**
+     * @param sd
+     * @param messageLocation
+     * @param subscriptionKey
+     */
+    private void addAckLocation(StoredDestination sd, Location messageLocation, String subscriptionKey)
{
+        HashSet<String> hs = sd.ackLocations.get(messageLocation);
+        if( hs == null ) {
+            hs = new HashSet<String>();
+            sd.ackLocations.put(messageLocation, hs);
+        }
+        hs.add(subscriptionKey);
+    }
+    
+    
+    /**
+     * @param tx
+     * @param sd
+     * @param subscriptionKey
+     * @param location
+     * @throws IOException
+     */
+    private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey,
Location location) throws IOException {
+        // Remove the sub from the previous location set..
+        if( location!=null ) {
+            HashSet<String> hs = sd.ackLocations.get(location);
+            if(hs!=null) {
+                hs.remove(subscriptionKey);
+                if( hs.isEmpty() ) {
+                    HashSet<String> firstSet = sd.ackLocations.values().iterator().next();
+                    sd.ackLocations.remove(location);
+                    
+                    // Did we just empty out the first set in the 
+                    // ordered list of ack locations?  Then it's time to 
+                    // delete some messages.
+                    if( hs==firstSet ) {
+
+                        
+                        // Find all the entries that need to get deleted.
+                        ArrayList<Entry<Location, String>> deletes = new ArrayList<Entry<Location,
String>>();
+                        for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx);
iterator.hasNext();) {
+                            Entry<Location, String> entry = iterator.next();
+                            while( entry.getKey().compareTo(location) <= 0 ) {
+                                // We don't do the actually delete while we are iterating
the BTree since 
+                                // iterating would fail.
+                                deletes.add(entry);
+                            }
+                        }
+                        
+                        // Do the actual deletes.
+                        for (Entry<Location, String> entry : deletes) {
+                            sd.messageIdIndex.remove(tx, entry.getValue());
+                            sd.orderIndex.remove(tx, entry.getKey());
+                        }
+                        
+                    }
+                }
+            }
+        }
+    }
+
     private String key(KahaDestination destination) {
         return destination.getType().getNumber()+":"+destination.getName();
     }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java?rev=692502&r1=692501&r2=692502&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java Fri Sep  5
11:07:09 2008
@@ -25,6 +25,7 @@
 import org.apache.kahadb.store.data.KahaRemoveMessageCommand;
 import org.apache.kahadb.store.data.KahaRollbackCommand;
 import org.apache.kahadb.store.data.KahaTraceCommand;
+import org.apache.kahadb.store.data.KahaSubscriptionCommand;
 
 public class Visitor {
 
@@ -49,4 +50,7 @@
     public void visit(KahaRemoveDestinationCommand command) throws IOException {
     }
 
+    public void visit(KahaSubscriptionCommand kahaUpdateSubscriptionCommand) throws IOException
{
+    }
+
 }

Modified: activemq/sandbox/kahadb/src/main/proto/journal-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/proto/journal-data.proto?rev=692502&r1=692501&r2=692502&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/proto/journal-data.proto (original)
+++ activemq/sandbox/kahadb/src/main/proto/journal-data.proto Fri Sep  5 11:07:09 2008
@@ -28,6 +28,7 @@
   KAHA_COMMIT_COMMAND = 4;
   KAHA_ROLLBACK_COMMAND = 5;
   KAHA_REMOVE_DESTINATION_COMMAND = 6;
+  KAHA_SUBSCRIPTION_COMMAND = 7;
 }
 
 message KahaTraceCommand {
@@ -61,7 +62,8 @@
   optional KahaTransactionInfo transaction_info=1;
   required KahaDestination destination = 2;
   required string messageId = 3;
-  required bytes ack = 4;
+  optional bytes ack = 4;
+  optional string subscriptionKey = 5;  // Set if it is a topic ack.
 }
 
 message KahaPrepareCommand {
@@ -95,6 +97,18 @@
 
   required KahaDestination destination = 1;
 }
+
+message KahaSubscriptionCommand {
+  //| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaSubscriptionCommand>";
+  //| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required KahaDestination destination = 1;
+  required string subscriptionKey = 2;
+  optional bool retroactive = 3;
+  optional bytes subscriptionInfo = 4;
+}
+
 message KahaDestination {
   enum DestinationType {
     QUEUE = 0;



Mime
View raw message