activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rob Davies <rajdav...@gmail.com>
Subject Re: svn commit: r491346 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java cursors/TopicStorePrefetch.java
Date Sun, 31 Dec 2006 07:45:20 GMT
Hey Hiram,

this change breaks  org.apache.activemq.broker.RecoveryBrokerTest,  
oorg.apache.activemq.broker.BrokerTest, etc   for me.

also - I'm not sure I like TopicStorePrefetch possibly returning null  
when a hasNext() has returned true

What was the problem in CursorDurableTest ? I hadn't seen that one

cheers,

Rob

On 30 Dec 2006, at 23:49, chirino@apache.org wrote:

> Author: chirino
> Date: Sat Dec 30 15:49:03 2006
> New Revision: 491346
>
> URL: http://svn.apache.org/viewvc?view=rev&rev=491346
> Log:
> Fix for CursorDurableTest.
> The TopicStorePrefetch was iterating items that were in the  
> subscription but not added to the pending list.
>
> Modified:
>     incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ 
> activemq/broker/region/PrefetchSubscription.java
>     incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ 
> activemq/broker/region/cursors/TopicStorePrefetch.java
>
> Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/ 
> apache/activemq/broker/region/PrefetchSubscription.java
> URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq- 
> core/src/main/java/org/apache/activemq/broker/region/ 
> PrefetchSubscription.java?view=diff&rev=491346&r1=491345&r2=491346
> ====================================================================== 
> ========
> --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ 
> activemq/broker/region/PrefetchSubscription.java (original)
> +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ 
> activemq/broker/region/PrefetchSubscription.java Sat Dec 30  
> 15:49:03 2006
> @@ -406,7 +406,9 @@
>                              pending.reset();
>                              while(pending.hasNext()&&!isFull() 
> &&count<numberToDispatch){
>                                  MessageReference node=pending.next();
> -
> +                                if ( node == null )
> +                                	break;
> +
>                                  if(canDispatch(node)){
>                                      pending.remove();
>                                      // Message may have been  
> sitting in the pending list a while
>
> Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/ 
> apache/activemq/broker/region/cursors/TopicStorePrefetch.java
> URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq- 
> core/src/main/java/org/apache/activemq/broker/region/cursors/ 
> TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346
> ====================================================================== 
> ========
> --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ 
> activemq/broker/region/cursors/TopicStorePrefetch.java (original)
> +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ 
> activemq/broker/region/cursors/TopicStorePrefetch.java Sat Dec 30  
> 15:49:03 2006
> @@ -20,7 +20,7 @@
>
>  import java.io.IOException;
>  import java.util.LinkedList;
> -import javax.jms.JMSException;
> +
>  import org.apache.activemq.broker.region.Destination;
>  import org.apache.activemq.broker.region.MessageReference;
>  import org.apache.activemq.broker.region.Topic;
> @@ -48,6 +48,10 @@
>      private String subscriberName;
>      private Destination regionDestination;
>
> +    boolean empty=true;
> +	private MessageId firstMessageId;
> +	private MessageId lastMessageId;
> +
>      /**
>       * @param topic
>       * @param clientId
> @@ -73,7 +77,7 @@
>       * @return true if there are no pending messages
>       */
>      public boolean isEmpty(){
> -        return batchList.isEmpty();
> +        return empty;
>      }
>
>      public synchronized int size(){
> @@ -86,27 +90,55 @@
>      }
>
>      public synchronized void addMessageLast(MessageReference node)  
> throws Exception{
> -        if(node!=null){
> +		if(node!=null){
> +			if( empty ) {
> +				firstMessageId = node.getMessageId();
> +				empty=false;
> +			}
> +	        lastMessageId = node.getMessageId();
>              node.decrementReferenceCount();
>          }
>      }
>
> -    public synchronized boolean hasNext(){
> -        if(isEmpty()){
> -            try{
> -                fillBatch();
> -            }catch(Exception e){
> -                log.error("Failed to fill batch",e);
> -                throw new RuntimeException(e);
> -            }
> -        }
> +    public synchronized boolean hasNext() {
>          return !isEmpty();
>      }
>
>      public synchronized MessageReference next(){
> -        Message result = (Message)batchList.removeFirst();
> -        result.setRegionDestination(regionDestination);
> -        return result;
> +    	    	
> +        if( empty ) {
> +        	return null;
> +        } else {
> +
> +        	// We may need to fill in the batch...
> +            if(batchList.isEmpty()){
> +                try{
> +                    fillBatch();
> +                }catch(Exception e){
> +                    log.error("Failed to fill batch",e);
> +                    throw new RuntimeException(e);
> +                }
> +                if( batchList.isEmpty()) {
> +                	return null;
> +                }
> +            }
> +
> +            Message result = (Message)batchList.removeFirst();
> +        	
> +        	if( firstMessageId != null ) {
> +            	// Skip messages until we get to the first message.
> +        		if( !result.getMessageId().equals(firstMessageId) )
> +        			return null;
> +        		firstMessageId = null;
> +        	}
> +        	if( lastMessageId != null ) {
> +        		if( result.getMessageId().equals(lastMessageId) ) {
> +        			empty=true;
> +        		}
> +        	}        	
> +            result.setRegionDestination(regionDestination);
> +            return result;
> +        }
>      }
>
>      public void reset(){
> @@ -130,13 +162,7 @@
>
>      // implementation
>      protected void fillBatch() throws Exception{
> -        store.recoverNextMessages(clientId,subscriberName,
> -                maxBatchSize,this);
> -        // this will add more messages to the batch list
> -        if(!batchList.isEmpty()){
> -            Message message=(Message)batchList.getLast();
> -
> -        }
> +        store.recoverNextMessages 
> (clientId,subscriberName,maxBatchSize,this);
>      }
>
>      public void gc() {
>
>


Mime
View raw message