activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r491346 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java cursors/TopicStorePrefetch.java
Date Sat, 30 Dec 2006 23:49:04 GMT
Author: chirino
Date: Sat Dec 30 15:49:03 2006
New Revision: 491346

URL: http://svn.apache.org/viewvc?view=rev&rev=491346
Log:
Fix for CursorDurableTest.
The TopicStorePrefetch was iterating items that were in the subscription but not added to
the pending list.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=491346&r1=491345&r2=491346
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Sat Dec 30 15:49:03 2006
@@ -406,7 +406,9 @@
                             pending.reset();
                             while(pending.hasNext()&&!isFull()&&count<numberToDispatch){
                                 MessageReference node=pending.next();
-                               
+                                if ( node == null )
+                                	break;
+                                
                                 if(canDispatch(node)){
                                     pending.remove();
                                     // Message may have been sitting in the pending list
a while

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
Sat Dec 30 15:49:03 2006
@@ -20,7 +20,7 @@
 
 import java.io.IOException;
 import java.util.LinkedList;
-import javax.jms.JMSException;
+
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Topic;
@@ -48,6 +48,10 @@
     private String subscriberName;
     private Destination regionDestination;
 
+    boolean empty=true;
+	private MessageId firstMessageId;
+	private MessageId lastMessageId;
+
     /**
      * @param topic
      * @param clientId
@@ -73,7 +77,7 @@
      * @return true if there are no pending messages
      */
     public boolean isEmpty(){
-        return batchList.isEmpty();
+        return empty;
     }
     
     public synchronized int size(){
@@ -86,27 +90,55 @@
     }
     
     public synchronized void addMessageLast(MessageReference node) throws Exception{
-        if(node!=null){
+		if(node!=null){
+			if( empty ) {
+				firstMessageId = node.getMessageId();
+				empty=false;
+			}
+	        lastMessageId = node.getMessageId();
             node.decrementReferenceCount();
         }
     }
 
-    public synchronized boolean hasNext(){
-        if(isEmpty()){
-            try{
-                fillBatch();
-            }catch(Exception e){
-                log.error("Failed to fill batch",e);
-                throw new RuntimeException(e);
-            }
-        }
+    public synchronized boolean hasNext() {
         return !isEmpty();
     }
 
     public synchronized MessageReference next(){
-        Message result = (Message)batchList.removeFirst();
-        result.setRegionDestination(regionDestination);
-        return result;
+    	    	
+        if( empty ) {
+        	return null;
+        } else {
+
+        	// We may need to fill in the batch...
+            if(batchList.isEmpty()){
+                try{
+                    fillBatch();
+                }catch(Exception e){
+                    log.error("Failed to fill batch",e);
+                    throw new RuntimeException(e);
+                }
+                if( batchList.isEmpty()) {
+                	return null;
+                }
+            }
+
+            Message result = (Message)batchList.removeFirst();
+        	
+        	if( firstMessageId != null ) {
+            	// Skip messages until we get to the first message.
+        		if( !result.getMessageId().equals(firstMessageId) ) 
+        			return null;
+        		firstMessageId = null;
+        	}
+        	if( lastMessageId != null ) {
+        		if( result.getMessageId().equals(lastMessageId) ) {
+        			empty=true;
+        		}
+        	}        	
+            result.setRegionDestination(regionDestination);
+            return result;
+        }
     }
 
     public void reset(){
@@ -130,13 +162,7 @@
 
     // implementation
     protected void fillBatch() throws Exception{
-        store.recoverNextMessages(clientId,subscriberName,
-                maxBatchSize,this);
-        // this will add more messages to the batch list
-        if(!batchList.isEmpty()){
-            Message message=(Message)batchList.getLast();
-          
-        }
+        store.recoverNextMessages(clientId,subscriberName,maxBatchSize,this);
     }
     
     public void gc() {



Mime
View raw message