activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r515630 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor: ConsumerMessageRef.java ConsumerMessageRefMarshaller.java KahaTopicMessageStore.java KahaTopicReferenceStore.java TopicSubContainer.java
Date Wed, 07 Mar 2007 16:15:09 GMT
Author: rajdavies
Date: Wed Mar  7 08:15:08 2007
New Revision: 515630

URL: http://svn.apache.org/viewvc?view=rev&rev=515630
Log:
check messageId when removing ConsumerMessageRef (pesky redo logs can add/delete things -
breaking the assumption we always get a nice ordered behaviour for durable subs)

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java?view=diff&rev=515630&r1=515629&r2=515630
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java
Wed Mar  7 08:15:08 2007
@@ -14,6 +14,7 @@
 
 package org.apache.activemq.store.kahadaptor;
 
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.kaha.StoreEntry;
 
 /**
@@ -23,6 +24,7 @@
  */
 public class ConsumerMessageRef{
 
+    private MessageId messageId;
     private StoreEntry messageEntry;
     private StoreEntry ackEntry;
     
@@ -52,6 +54,22 @@
      */
     public void setMessageEntry(StoreEntry messageEntry){
         this.messageEntry=messageEntry;
+    }
+
+    
+    /**
+     * @return the messageId
+     */
+    public MessageId getMessageId(){
+        return this.messageId;
+    }
+
+    
+    /**
+     * @param messageId the messageId to set
+     */
+    public void setMessageId(MessageId messageId){
+        this.messageId=messageId;
     }
 
        

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java?view=diff&rev=515630&r1=515629&r2=515630
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java
Wed Mar  7 08:15:08 2007
@@ -20,6 +20,7 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.impl.index.IndexItem;
 
@@ -39,6 +40,7 @@
      */
     public void writePayload(Object object,DataOutput dataOut) throws IOException{
        ConsumerMessageRef ref = (ConsumerMessageRef) object;
+       dataOut.writeUTF(ref.getMessageId().toString());
        IndexItem item = (IndexItem)ref.getMessageEntry();
        dataOut.writeLong(item.getOffset());
        item.write(dataOut);
@@ -46,6 +48,7 @@
        dataOut.writeLong(item.getOffset());
        item.write(dataOut);
        
+       
     }
 
     /**
@@ -56,6 +59,7 @@
      */
     public Object readPayload(DataInput dataIn) throws IOException{
         ConsumerMessageRef ref = new ConsumerMessageRef();
+        ref.setMessageId(new MessageId(dataIn.readUTF()));
         IndexItem item = new IndexItem();
         item.setOffset(dataIn.readLong());
         item.read(dataIn);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=515630&r1=515629&r2=515630
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
Wed Mar  7 08:15:08 2007
@@ -58,7 +58,8 @@
     public synchronized void addMessage(ConnectionContext context,Message message) throws
IOException{
         int subscriberCount=subscriberMessages.size();
         if(subscriberCount>0){
-            StoreEntry messageEntry=messageContainer.place(message.getMessageId(),message);
+            MessageId id = message.getMessageId();
+            StoreEntry messageEntry=messageContainer.place(id,message);
             TopicSubAck tsa=new TopicSubAck();
             tsa.setCount(subscriberCount);
             tsa.setMessageEntry(messageEntry);
@@ -68,6 +69,7 @@
                 ConsumerMessageRef ref=new ConsumerMessageRef();
                 ref.setAckEntry(ackEntry);
                 ref.setMessageEntry(messageEntry);
+                ref.setMessageId(id);
                 container.add(ref);
             }
         }
@@ -78,7 +80,7 @@
         String subcriberId=getSubscriptionKey(clientId,subscriptionName);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
         if(container!=null){
-            ConsumerMessageRef ref=container.remove();
+            ConsumerMessageRef ref=container.remove(messageId);
             if(container.isEmpty()){
                 container.reset();
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=515630&r1=515629&r2=515630
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Wed Mar  7 08:15:08 2007
@@ -68,30 +68,29 @@
         listener.recoverMessageReference(new MessageId(record.getMessageId()));
     }
 
-    public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData
data)
-            throws IOException{
-        ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
-        int subscriberCount=subscriberMessages.size();
+    public void addMessageReference(final ConnectionContext context,final MessageId messageId,final
ReferenceData data){
+        final ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
+        final int subscriberCount=subscriberMessages.size();
         if(subscriberCount>0){
-            StoreEntry messageEntry=messageContainer.place(messageId,record);
+            final StoreEntry messageEntry=messageContainer.place(messageId,record);
             addInterest(record);
-            TopicSubAck tsa=new TopicSubAck();
+            final TopicSubAck tsa=new TopicSubAck();
             tsa.setCount(subscriberCount);
             tsa.setMessageEntry(messageEntry);
-            StoreEntry ackEntry=ackContainer.placeLast(tsa);
-            for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
-                TopicSubContainer container=(TopicSubContainer)i.next();
-                ConsumerMessageRef ref=new ConsumerMessageRef();
+            final StoreEntry ackEntry=ackContainer.placeLast(tsa);
+            for(final Iterator i=subscriberMessages.values().iterator();i.hasNext();){
+                final TopicSubContainer container=(TopicSubContainer)i.next();
+                final ConsumerMessageRef ref=new ConsumerMessageRef();
                 ref.setAckEntry(ackEntry);
                 ref.setMessageEntry(messageEntry);
-                StoreEntry listEntry = container.add(ref);
-                
+                ref.setMessageId(messageId);
+                container.add(ref);
             }
         }
     }
 
-    public ReferenceData getMessageReference(MessageId identity) throws IOException{
-        ReferenceRecord result=messageContainer.get(identity);
+    public ReferenceData getMessageReference(final MessageId identity) throws IOException{
+        final ReferenceRecord result=messageContainer.get(identity);
         if(result==null)
             return null;
         return result.getData();
@@ -119,12 +118,10 @@
     public synchronized void acknowledge(ConnectionContext context,String clientId,String
subscriptionName,
             MessageId messageId) throws IOException{
         String key=getSubscriptionKey(clientId,subscriptionName);
+        
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
         if(container!=null){
-            ConsumerMessageRef ref=container.remove();
-            if(container.isEmpty()){
-                container.reset();
-            }
+            ConsumerMessageRef ref=container.remove(messageId);
             if(ref!=null){
                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
                 if(tsa!=null){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?view=diff&rev=515630&r1=515629&r2=515630
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
Wed Mar  7 08:15:08 2007
@@ -15,6 +15,7 @@
 package org.apache.activemq.store.kahadaptor;
 
 import java.util.Iterator;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.StoreEntry;
 
@@ -58,14 +59,17 @@
         return listContainer.placeLast(ref);
     }
     
-    public ConsumerMessageRef remove(){
+    public ConsumerMessageRef remove(MessageId id){
         ConsumerMessageRef result=null;
         if(!listContainer.isEmpty()){
-            StoreEntry entry=listContainer.getFirst();
-            if(entry!=null){
-                result=(ConsumerMessageRef)listContainer.removeFirst();
-                if(listContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
-                    reset();
+            for(StoreEntry entry=listContainer.getFirst();entry!=null;entry=listContainer.getNext(entry)){
+                ConsumerMessageRef ref=(ConsumerMessageRef)listContainer.get(entry);
+                if(ref!=null&&ref.getMessageId().equals(id)){
+                    listContainer.remove(entry);
+                    result=ref;
+                    if(listContainer.isEmpty()||batchEntry.equals(entry)){
+                        reset();
+                    }
                 }
             }
         }



Mime
View raw message