activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r512640 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors: StoreDurableSubscriberCursor.java TopicStorePrefetch.java
Date Wed, 28 Feb 2007 07:23:38 GMT
Author: rajdavies
Date: Tue Feb 27 23:23:37 2007
New Revision: 512640

URL: http://svn.apache.org/viewvc?view=rev&rev=512640
Log:
tidied up the way messages are page in for a durable subscriber

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=512640&r1=512639&r2=512640
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Tue Feb 27 23:23:37 2007
@@ -168,6 +168,10 @@
 
     public void clear(){
         pendingCount=0;
+        nonPersistent.clear();
+        for(PendingMessageCursor tsp: storePrefetches){
+            tsp.clear();
+        }
     }
 
     public synchronized boolean hasNext(){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=512640&r1=512639&r2=512640
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
Tue Feb 27 23:23:37 2007
@@ -27,7 +27,7 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
+ * perist pendingCount messages pendingCount message (messages awaiting disptach to a consumer)
cursor
  * 
  * @version $Revision$
  */
@@ -39,15 +39,15 @@
     private String clientId;
     private String subscriberName;
     private Destination regionDestination;
-    boolean empty;
     private MessageId firstMessageId;
     private MessageId lastMessageId;
+    private int pendingCount;
+    private boolean started;
 
     /**
      * @param topic
      * @param clientId
      * @param subscriberName
-     * @throws IOException
      */
     public TopicStorePrefetch(Topic topic,String clientId,String subscriberName){
         this.regionDestination=topic;
@@ -56,49 +56,70 @@
         this.subscriberName=subscriberName;
     }
 
-    public synchronized void start() throws Exception{
-        if(batchList.isEmpty()){
+    public synchronized void start(){
+        if(!started){
+            started=true;
+            pendingCount=getStoreSize();
             try{
                 fillBatch();
             }catch(Exception e){
                 log.error("Failed to fill batch",e);
                 throw new RuntimeException(e);
             }
-            empty=batchList.isEmpty();
         }
     }
 
-    public synchronized void stop() throws Exception{
-        store.resetBatching(clientId,subscriberName);
-        gc();
+    public synchronized void stop(){
+        if(started){
+            started=false;
+            store.resetBatching(clientId,subscriberName);
+            gc();
+        }
     }
 
     /**
-     * @return true if there are no pending messages
+     * @return true if there are no pendingCount messages
      */
     public boolean isEmpty(){
-        return empty;
+        return pendingCount <= 0;
     }
 
     public synchronized int size(){
-        try{
-            return store.getMessageCount(clientId,subscriberName);
-        }catch(IOException e){
-            log.error(this+" Failed to get the outstanding message count from the store",e);
-            throw new RuntimeException(e);
-        }
+        return getPendingCount();
     }
 
     public synchronized void addMessageLast(MessageReference node) throws Exception{
         if(node!=null){
-            if(empty){
+            if(isEmpty() && started){
                 firstMessageId=node.getMessageId();
-                empty=false;
             }
             lastMessageId=node.getMessageId();
             node.decrementReferenceCount();
+            pendingCount++;
         }
     }
+    
+    public void addMessageFirst(MessageReference node) throws Exception{
+        if(node!=null){
+            if(started){
+                firstMessageId=node.getMessageId();
+            }
+            node.decrementReferenceCount();
+            pendingCount++;
+        }
+    }
+    
+    public synchronized void remove(){
+        pendingCount--;
+    }
+    
+    public synchronized void remove(MessageReference node){
+        pendingCount--;
+    }
+    
+    public synchronized void clear(){
+        pendingCount=0;
+    }
 
     public synchronized boolean hasNext(){
         return !isEmpty();
@@ -106,7 +127,7 @@
 
     public synchronized MessageReference next(){
         Message result=null;
-        if(!empty){
+        if(!isEmpty()){
             if(batchList.isEmpty()){
                 try{
                     fillBatch();
@@ -120,19 +141,12 @@
             }
             if(!batchList.isEmpty()){
                 result=batchList.removeFirst();
-                if(firstMessageId!=null){
-                    // Skip messages until we get to the first message.
-                    if(!result.getMessageId().equals(firstMessageId))
-                        result=null;
-                    firstMessageId=null;
-                }else{
-                    if(lastMessageId!=null){
-                        if(result.getMessageId().equals(lastMessageId)){
-                            empty=true;
-                        }
+                if(lastMessageId!=null){
+                    if(result.getMessageId().equals(lastMessageId)){
+                        //pendingCount=0;
                     }
-                    result.setRegionDestination(regionDestination);
                 }
+                result.setRegionDestination(regionDestination);
             }
         }
         return result;
@@ -161,8 +175,47 @@
 
     // implementation
     protected synchronized void fillBatch() throws Exception{
-        store.recoverNextMessages(clientId,subscriberName,maxBatchSize,this);
+        if(!isEmpty()){
+            store.recoverNextMessages(clientId,subscriberName,maxBatchSize,this);
+            if(firstMessageId!=null){
+                int pos=0;
+                for(Message msg:batchList){
+                    if(msg.getMessageId().equals(firstMessageId)){
+                        firstMessageId=null;
+                        break;
+                    }
+                    pos++;
+                }
+                if(pos>0){
+                    for(int i=0;i<pos&&!batchList.isEmpty();i++){
+                        batchList.removeFirst();
+                    }
+                    if(batchList.isEmpty()){
+                        log.warn("Refilling batch - haven't got past first message = " +
firstMessageId);
+                        fillBatch();
+                    }
+                }
+            }
+        }
+    }
+    
+    protected synchronized int getPendingCount(){
+        if(pendingCount <= 0){
+            pendingCount = getStoreSize();
+        }
+        return pendingCount;
+    }
+    
+    protected synchronized int getStoreSize(){
+        try{
+            return store.getMessageCount(clientId,subscriberName);
+        }catch(IOException e){
+            log.error(this+" Failed to get the outstanding message count from the store",e);
+            throw new RuntimeException(e);
+        }
     }
+    
+    
 
     public synchronized void gc(){
         for(Message msg:batchList){



Mime
View raw message