activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hiram Chirino" <hi...@hiramchirino.com>
Subject Re: svn commit: r491346 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java cursors/TopicStorePrefetch.java
Date Sun, 31 Dec 2006 07:59:45 GMT
On 12/31/06, Rob Davies <rajdavies@gmail.com> wrote:
> Hey Hiram,
>
> this change breaks  org.apache.activemq.broker.RecoveryBrokerTest,
> oorg.apache.activemq.broker.BrokerTest, etc   for me.
>

yeah I think I have fix for that. sorry I broke it.  I'm running the
test suite again now.  Basically I think I need to default boolean
empty=false;  So that an initial recovery of subscription is done.

> also - I'm not sure I like TopicStorePrefetch possibly returning null
> when a hasNext() has returned true
>

Yeah me neither :)  I did not fully understand why it was returning
null when I expected it to return a value.  I was thinking it could be
a timing issue with the MessageStore.

> What was the problem in CursorDurableTest ? I hadn't seen that one
>

CursorDurableTest had a test that was failing due to out of
order/duplicates showing up.  This was cause sometimes some messages
were direct dispatched and at other times they are dispatched from the
pending list.  But since the pending list's .next() was returning the
items that were directly dispatched and not even added to the pending
list.  This is when the dups and out of order issues would show up.

The problem is that TopicStorePrefetch.next() was returning everything
added to the durable subscription since it's backed by the
MessageStore.  And that's not what we want.  We only want it to return
things that are explicitly added to it since it's the pending list.


> cheers,
>
> Rob
>
> On 30 Dec 2006, at 23:49, chirino@apache.org wrote:
>
> > Author: chirino
> > Date: Sat Dec 30 15:49:03 2006
> > New Revision: 491346
> >
> > URL: http://svn.apache.org/viewvc?view=rev&rev=491346
> > Log:
> > Fix for CursorDurableTest.
> > The TopicStorePrefetch was iterating items that were in the
> > subscription but not added to the pending list.
> >
> > Modified:
> >     incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
> > activemq/broker/region/PrefetchSubscription.java
> >     incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
> > activemq/broker/region/cursors/TopicStorePrefetch.java
> >
> > Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
> > apache/activemq/broker/region/PrefetchSubscription.java
> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-
> > core/src/main/java/org/apache/activemq/broker/region/
> > PrefetchSubscription.java?view=diff&rev=491346&r1=491345&r2=491346
> > ======================================================================
> > ========
> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
> > activemq/broker/region/PrefetchSubscription.java (original)
> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
> > activemq/broker/region/PrefetchSubscription.java Sat Dec 30
> > 15:49:03 2006
> > @@ -406,7 +406,9 @@
> >                              pending.reset();
> >                              while(pending.hasNext()&&!isFull()
> > &&count<numberToDispatch){
> >                                  MessageReference node=pending.next();
> > -
> > +                                if ( node == null )
> > +                                     break;
> > +
> >                                  if(canDispatch(node)){
> >                                      pending.remove();
> >                                      // Message may have been
> > sitting in the pending list a while
> >
> > Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
> > apache/activemq/broker/region/cursors/TopicStorePrefetch.java
> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-
> > core/src/main/java/org/apache/activemq/broker/region/cursors/
> > TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346
> > ======================================================================
> > ========
> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
> > activemq/broker/region/cursors/TopicStorePrefetch.java (original)
> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
> > activemq/broker/region/cursors/TopicStorePrefetch.java Sat Dec 30
> > 15:49:03 2006
> > @@ -20,7 +20,7 @@
> >
> >  import java.io.IOException;
> >  import java.util.LinkedList;
> > -import javax.jms.JMSException;
> > +
> >  import org.apache.activemq.broker.region.Destination;
> >  import org.apache.activemq.broker.region.MessageReference;
> >  import org.apache.activemq.broker.region.Topic;
> > @@ -48,6 +48,10 @@
> >      private String subscriberName;
> >      private Destination regionDestination;
> >
> > +    boolean empty=true;
> > +     private MessageId firstMessageId;
> > +     private MessageId lastMessageId;
> > +
> >      /**
> >       * @param topic
> >       * @param clientId
> > @@ -73,7 +77,7 @@
> >       * @return true if there are no pending messages
> >       */
> >      public boolean isEmpty(){
> > -        return batchList.isEmpty();
> > +        return empty;
> >      }
> >
> >      public synchronized int size(){
> > @@ -86,27 +90,55 @@
> >      }
> >
> >      public synchronized void addMessageLast(MessageReference node)
> > throws Exception{
> > -        if(node!=null){
> > +             if(node!=null){
> > +                     if( empty ) {
> > +                             firstMessageId = node.getMessageId();
> > +                             empty=false;
> > +                     }
> > +             lastMessageId = node.getMessageId();
> >              node.decrementReferenceCount();
> >          }
> >      }
> >
> > -    public synchronized boolean hasNext(){
> > -        if(isEmpty()){
> > -            try{
> > -                fillBatch();
> > -            }catch(Exception e){
> > -                log.error("Failed to fill batch",e);
> > -                throw new RuntimeException(e);
> > -            }
> > -        }
> > +    public synchronized boolean hasNext() {
> >          return !isEmpty();
> >      }
> >
> >      public synchronized MessageReference next(){
> > -        Message result = (Message)batchList.removeFirst();
> > -        result.setRegionDestination(regionDestination);
> > -        return result;
> > +
> > +        if( empty ) {
> > +             return null;
> > +        } else {
> > +
> > +             // We may need to fill in the batch...
> > +            if(batchList.isEmpty()){
> > +                try{
> > +                    fillBatch();
> > +                }catch(Exception e){
> > +                    log.error("Failed to fill batch",e);
> > +                    throw new RuntimeException(e);
> > +                }
> > +                if( batchList.isEmpty()) {
> > +                     return null;
> > +                }
> > +            }
> > +
> > +            Message result = (Message)batchList.removeFirst();
> > +
> > +             if( firstMessageId != null ) {
> > +             // Skip messages until we get to the first message.
> > +                     if( !result.getMessageId().equals(firstMessageId) )
> > +                             return null;
> > +                     firstMessageId = null;
> > +             }
> > +             if( lastMessageId != null ) {
> > +                     if( result.getMessageId().equals(lastMessageId) ) {
> > +                             empty=true;
> > +                     }
> > +             }
> > +            result.setRegionDestination(regionDestination);
> > +            return result;
> > +        }
> >      }
> >
> >      public void reset(){
> > @@ -130,13 +162,7 @@
> >
> >      // implementation
> >      protected void fillBatch() throws Exception{
> > -        store.recoverNextMessages(clientId,subscriberName,
> > -                maxBatchSize,this);
> > -        // this will add more messages to the batch list
> > -        if(!batchList.isEmpty()){
> > -            Message message=(Message)batchList.getLast();
> > -
> > -        }
> > +        store.recoverNextMessages
> > (clientId,subscriberName,maxBatchSize,this);
> >      }
> >
> >      public void gc() {
> >
> >
>
>


-- 
Regards,
Hiram

Blog: http://hiramchirino.com

Mime
View raw message