activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r547906 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor: KahaMessageStore.java KahaReferenceStore.java KahaTopicMessageStore.java KahaTopicReferenceStore.java TopicSubContainer.java
Date Sat, 16 Jun 2007 11:27:49 GMT
Author: rajdavies
Date: Sat Jun 16 04:27:48 2007
New Revision: 547906

URL: http://svn.apache.org/viewvc?view=rev&rev=547906
Log:
check we don't keep hold of a batch entry after its been deleted

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=547906&r1=547905&r2=547906
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
Sat Jun 16 04:27:48 2007
@@ -66,8 +66,8 @@
         return result;
     }
 
-    protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
-        listener.recoverMessage((Message)msg);
+    protected void recoverMessage(MessageRecoveryListener listener,Message msg) throws Exception{
+        listener.recoverMessage(msg);
     }
 
     public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
@@ -89,7 +89,7 @@
     public synchronized void recover(MessageRecoveryListener listener) throws Exception{
         for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
             Message msg=(Message)messageContainer.getValue(entry);
-            recover(listener,msg);
+            recoverMessage(listener,msg);
         }
         listener.finished();
     }
@@ -158,9 +158,9 @@
         if(entry!=null){
             int count=0;
             do{
-                Object msg=messageContainer.getValue(entry);
+                Message msg=messageContainer.getValue(entry);
                 if(msg!=null){
-                    recover(listener,msg);
+                    recoverMessage(listener,msg);
                     count++;
                 }
                 batchEntry=entry;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=547906&r1=547905&r2=547906
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
Sat Jun 16 04:27:48 2007
@@ -15,7 +15,6 @@
 package org.apache.activemq.store.kahadaptor;
 
 import java.io.IOException;
-import java.util.Set;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -33,6 +32,7 @@
     protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
     protected KahaReferenceStoreAdapter adapter;
     private StoreEntry batchEntry=null;
+    private String lastBatchId=null;
 
     public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination
destination) throws IOException{
         this.adapter = adapter;
@@ -58,15 +58,14 @@
         throw new RuntimeException("Use addMessageReference instead");
     }
 
-    protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
-        ReferenceRecord record=(ReferenceRecord)msg;
+    protected final void recoverReference(MessageRecoveryListener listener,ReferenceRecord
record) throws Exception{
         listener.recoverMessageReference(new MessageId(record.getMessageId()));
     }
 
     public synchronized void recover(MessageRecoveryListener listener) throws Exception{
         for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
             ReferenceRecord record=messageContainer.getValue(entry);
-            recover(listener,new MessageId(record.getMessageId()));
+            recoverReference(listener,record);
         }
         listener.finished();
     }
@@ -77,17 +76,20 @@
             entry=messageContainer.getFirst();
         }else{
             entry=messageContainer.refresh(entry);
-            if (entry != null) {
-            entry=messageContainer.getNext(entry);
+            if(entry!=null){
+                entry=messageContainer.getNext(entry);
             }
         }
         if(entry!=null){
             int count=0;
             do{
-                Object msg=messageContainer.getValue(entry);
+                ReferenceRecord msg=messageContainer.getValue(entry);
                 if(msg!=null){
-                    recover(listener,msg);
+                    recoverReference(listener,msg);
                     count++;
+                    lastBatchId=msg.getMessageId();
+                }else{
+                    lastBatchId=null;
                 }
                 batchEntry=entry;
                 entry=messageContainer.getNext(entry);
@@ -96,14 +98,14 @@
         listener.finished();
     }
 
-    public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData
data)
+    public synchronized void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData
data)
             throws IOException{
         ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
         messageContainer.put(messageId,record);
         addInterest(record);
     }
 
-    public ReferenceData getMessageReference(MessageId identity) throws IOException{
+    public synchronized ReferenceData getMessageReference(MessageId identity) throws IOException{
         ReferenceRecord result=messageContainer.get(identity);
         if(result==null)
             return null;
@@ -127,7 +129,8 @@
             ReferenceRecord rr=messageContainer.remove(msgId);
             if(rr!=null){
                 removeInterest(rr);
-                if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
+                if(messageContainer.isEmpty()||(lastBatchId!=null&&lastBatchId.equals(msgId.toString()))
+                        ||(batchEntry!=null&&batchEntry.equals(entry))){
                     resetBatching();
                 }
             }
@@ -148,6 +151,7 @@
 
     public void resetBatching(){
         batchEntry=null;
+        lastBatchId=null;
     }
 
     public int getMessageCount(){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=547906&r1=547905&r2=547906
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
Sat Jun 16 04:27:48 2007
@@ -146,9 +146,9 @@
         if(container!=null){
             for(Iterator i=container.iterator();i.hasNext();){
                 ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
-                Object msg=messageContainer.get(ref.getMessageEntry());
+                Message msg=messageContainer.get(ref.getMessageEntry());
                 if(msg!=null){
-                	recover(listener, msg);
+                	recoverMessage(listener, msg);
                 }
             }
         }
@@ -173,12 +173,15 @@
             if(entry!=null){
                 do{
                     ConsumerMessageRef consumerRef=container.get(entry);
-                    Object msg=messageContainer.getValue(consumerRef.getMessageEntry());
+                    Message msg=messageContainer.getValue(consumerRef.getMessageEntry());
                     if(msg!=null){
-                    	recover(listener, msg);
+                    	recoverMessage(listener, msg);
                         count++;
+                        container.setBatchEntry(msg.getMessageId().toString(),entry);
+                    }else {
+                        container.reset();
                     }
-                    container.setBatchEntry(entry);
+                    
                     entry=container.getNextEntry(entry);
                 }while(entry!=null&&count<maxReturned&&listener.hasSpace());
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=547906&r1=547905&r2=547906
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Sat Jun 16 04:27:48 2007
@@ -63,11 +63,7 @@
         throw new RuntimeException("Use addMessageReference instead");
     }
 
-    protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
-        ReferenceRecord record=(ReferenceRecord)msg;
-        listener.recoverMessageReference(new MessageId(record.getMessageId()));
-    }
-
+    
     public void addMessageReference(final ConnectionContext context,final MessageId messageId,final
ReferenceData data){
         final ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
         final int subscriberCount=subscriberMessages.size();
@@ -193,7 +189,7 @@
         return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
     }
 
-    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
+    public synchronized void recoverNextMessages(String clientId,String subscriptionName,int
maxReturned,
             MessageRecoveryListener listener) throws Exception{
         String key=getSubscriptionKey(clientId,subscriptionName);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
@@ -208,15 +204,19 @@
                     entry=container.getNextEntry(entry);
                 }
             }
+           
             if(entry!=null){
                 do{
                     ConsumerMessageRef consumerRef=container.get(entry);
-                    Object msg=messageContainer.getValue(consumerRef.getMessageEntry());
+                    ReferenceRecord msg=messageContainer.getValue(consumerRef.getMessageEntry());
                     if(msg!=null){
-                        recover(listener,msg);
+                        recoverReference(listener,msg);
                         count++;
+                        container.setBatchEntry(msg.getMessageId().toString(),entry);
+                    }else {
+                        container.reset();
                     }
-                    container.setBatchEntry(entry);
+                    
                     entry=container.getNextEntry(entry);
                 }while(entry!=null&&count<maxReturned&&listener.hasSpace());
             }
@@ -232,9 +232,9 @@
         if(container!=null){
             for(Iterator i=container.iterator();i.hasNext();){
                 ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
-                Object msg=messageContainer.get(ref.getMessageEntry());
+                ReferenceRecord msg=messageContainer.get(ref.getMessageEntry());
                 if(msg!=null){
-                    recover(listener,msg);
+                    recoverReference(listener,msg);
                 }
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?view=diff&rev=547906&r1=547905&r2=547906
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
Sat Jun 16 04:27:48 2007
@@ -25,8 +25,9 @@
  * @version $Revision: 1.10 $
  */
 public class TopicSubContainer {
-    private ListContainer listContainer;
-    private StoreEntry batchEntry;
+    private transient ListContainer listContainer;
+    private transient StoreEntry batchEntry;
+    private transient String lastBatchId;
 
     public TopicSubContainer(ListContainer container) {
         this.listContainer = container;
@@ -42,11 +43,13 @@
     /**
      * @param batchEntry the batchEntry to set
      */
-    public void setBatchEntry(StoreEntry batchEntry) {
+    public void setBatchEntry(String id,StoreEntry batchEntry) {
+        this.lastBatchId=id;
         this.batchEntry = batchEntry;
     }
 
     public void reset() {
+        lastBatchId=null;
         batchEntry = null;
     }
 



Mime
View raw message