activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rob Davies <rajdav...@gmail.com>
Subject Re: svn commit: r491346 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java cursors/TopicStorePrefetch.java
Date Sun, 31 Dec 2006 08:49:13 GMT

On 31 Dec 2006, at 08:32, Hiram Chirino wrote:

> On 12/31/06, Rob Davies <rajdavies@gmail.com> wrote:
>>
>> On 31 Dec 2006, at 07:59, Hiram Chirino wrote:
>>
>> > On 12/31/06, Rob Davies <rajdavies@gmail.com> wrote:
>> >> Hey Hiram,
>> >>
>> >> this change breaks  org.apache.activemq.broker.RecoveryBrokerTest,
>> >> oorg.apache.activemq.broker.BrokerTest, etc   for me.
>> >>
>> >
>> > yeah I think I have fix for that. sorry I broke it.  I'm running  
>> the
>> > test suite again now.  Basically I think I need to default boolean
>> > empty=false;  So that an initial recovery of subscription is done.
>> >
>> >> also - I'm not sure I like TopicStorePrefetch possibly  
>> returning null
>> >> when a hasNext() has returned true
>> >>
>> >
>> > Yeah me neither :)  I did not fully understand why it was returning
>> > null when I expected it to return a value.  I was thinking it  
>> could be
>> > a timing issue with the MessageStore.
>> >
>> >> What was the problem in CursorDurableTest ? I hadn't seen that one
>> >>
>> >
>> > CursorDurableTest had a test that was failing due to out of
>> > order/duplicates showing up.  This was cause sometimes some  
>> messages
>> > were direct dispatched and at other times they are dispatched  
>> from the
>> > pending list.  But since the pending list's .next() was  
>> returning the
>> > items that were directly dispatched and not even added to the  
>> pending
>> > list.  This is when the dups and out of order issues would show up.
>> >
>> > The problem is that TopicStorePrefetch.next() was returning  
>> everything
>> > added to the durable subscription since it's backed by the
>> > MessageStore.  And that's not what we want.  We only want it to  
>> return
>> > things that are explicitly added to it since it's the pending list.
>>
>> I wonder if the real problem is then in PrefetchSubscription.add() -
>> because only if pending is empty (nothing in the store) should it
>> dispatch directly
>
> Could be an interaction.  I think TopicStorePrefetch still needs a
> little more work.  I think we need to recover the TopicStorePrefetch
> when the the durable subscription is created so that way we know if it
> is initially empty or not.

it does (or did) - when the subscriber is activated - it sees how  
many messages are outstanding in the store
>
>> >
>> >
>> >> cheers,
>> >>
>> >> Rob
>> >>
>> >> On 30 Dec 2006, at 23:49, chirino@apache.org wrote:
>> >>
>> >> > Author: chirino
>> >> > Date: Sat Dec 30 15:49:03 2006
>> >> > New Revision: 491346
>> >> >
>> >> > URL: http://svn.apache.org/viewvc?view=rev&rev=491346
>> >> > Log:
>> >> > Fix for CursorDurableTest.
>> >> > The TopicStorePrefetch was iterating items that were in the
>> >> > subscription but not added to the pending list.
>> >> >
>> >> > Modified:
>> >> >     incubator/activemq/trunk/activemq-core/src/main/java/org/
>> >> apache/
>> >> > activemq/broker/region/PrefetchSubscription.java
>> >> >     incubator/activemq/trunk/activemq-core/src/main/java/org/
>> >> apache/
>> >> > activemq/broker/region/cursors/TopicStorePrefetch.java
>> >> >
>> >> > Modified: incubator/activemq/trunk/activemq-core/src/main/ 
>> java/org/
>> >> > apache/activemq/broker/region/PrefetchSubscription.java
>> >> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/
>> >> activemq-
>> >> > core/src/main/java/org/apache/activemq/broker/region/
>> >> > PrefetchSubscription.java? 
>> view=diff&rev=491346&r1=491345&r2=491346
>> >> >
>> >>  
>> =====================================================================
>> >> =
>> >> > ========
>> >> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/
>> >> apache/
>> >> > activemq/broker/region/PrefetchSubscription.java (original)
>> >> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/
>> >> apache/
>> >> > activemq/broker/region/PrefetchSubscription.java Sat Dec 30
>> >> > 15:49:03 2006
>> >> > @@ -406,7 +406,9 @@
>> >> >                              pending.reset();
>> >> >                              while(pending.hasNext()&&!isFull()
>> >> > &&count<numberToDispatch){
>> >> >                                  MessageReference
>> >> node=pending.next();
>> >> > -
>> >> > +                                if ( node == null )
>> >> > +                                     break;
>> >> > +
>> >> >                                  if(canDispatch(node)){
>> >> >                                      pending.remove();
>> >> >                                      // Message may have been
>> >> > sitting in the pending list a while
>> >> >
>> >> > Modified: incubator/activemq/trunk/activemq-core/src/main/ 
>> java/org/
>> >> > apache/activemq/broker/region/cursors/TopicStorePrefetch.java
>> >> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/
>> >> activemq-
>> >> > core/src/main/java/org/apache/activemq/broker/region/cursors/
>> >> > TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346
>> >> >
>> >>  
>> =====================================================================
>> >> =
>> >> > ========
>> >> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/
>> >> apache/
>> >> > activemq/broker/region/cursors/TopicStorePrefetch.java  
>> (original)
>> >> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/
>> >> apache/
>> >> > activemq/broker/region/cursors/TopicStorePrefetch.java Sat  
>> Dec 30
>> >> > 15:49:03 2006
>> >> > @@ -20,7 +20,7 @@
>> >> >
>> >> >  import java.io.IOException;
>> >> >  import java.util.LinkedList;
>> >> > -import javax.jms.JMSException;
>> >> > +
>> >> >  import org.apache.activemq.broker.region.Destination;
>> >> >  import org.apache.activemq.broker.region.MessageReference;
>> >> >  import org.apache.activemq.broker.region.Topic;
>> >> > @@ -48,6 +48,10 @@
>> >> >      private String subscriberName;
>> >> >      private Destination regionDestination;
>> >> >
>> >> > +    boolean empty=true;
>> >> > +     private MessageId firstMessageId;
>> >> > +     private MessageId lastMessageId;
>> >> > +
>> >> >      /**
>> >> >       * @param topic
>> >> >       * @param clientId
>> >> > @@ -73,7 +77,7 @@
>> >> >       * @return true if there are no pending messages
>> >> >       */
>> >> >      public boolean isEmpty(){
>> >> > -        return batchList.isEmpty();
>> >> > +        return empty;
>> >> >      }
>> >> >
>> >> >      public synchronized int size(){
>> >> > @@ -86,27 +90,55 @@
>> >> >      }
>> >> >
>> >> >      public synchronized void addMessageLast(MessageReference  
>> node)
>> >> > throws Exception{
>> >> > -        if(node!=null){
>> >> > +             if(node!=null){
>> >> > +                     if( empty ) {
>> >> > +                             firstMessageId =  
>> node.getMessageId();
>> >> > +                             empty=false;
>> >> > +                     }
>> >> > +             lastMessageId = node.getMessageId();
>> >> >              node.decrementReferenceCount();
>> >> >          }
>> >> >      }
>> >> >
>> >> > -    public synchronized boolean hasNext(){
>> >> > -        if(isEmpty()){
>> >> > -            try{
>> >> > -                fillBatch();
>> >> > -            }catch(Exception e){
>> >> > -                log.error("Failed to fill batch",e);
>> >> > -                throw new RuntimeException(e);
>> >> > -            }
>> >> > -        }
>> >> > +    public synchronized boolean hasNext() {
>> >> >          return !isEmpty();
>> >> >      }
>> >> >
>> >> >      public synchronized MessageReference next(){
>> >> > -        Message result = (Message)batchList.removeFirst();
>> >> > -        result.setRegionDestination(regionDestination);
>> >> > -        return result;
>> >> > +
>> >> > +        if( empty ) {
>> >> > +             return null;
>> >> > +        } else {
>> >> > +
>> >> > +             // We may need to fill in the batch...
>> >> > +            if(batchList.isEmpty()){
>> >> > +                try{
>> >> > +                    fillBatch();
>> >> > +                }catch(Exception e){
>> >> > +                    log.error("Failed to fill batch",e);
>> >> > +                    throw new RuntimeException(e);
>> >> > +                }
>> >> > +                if( batchList.isEmpty()) {
>> >> > +                     return null;
>> >> > +                }
>> >> > +            }
>> >> > +
>> >> > +            Message result = (Message)batchList.removeFirst();
>> >> > +
>> >> > +             if( firstMessageId != null ) {
>> >> > +             // Skip messages until we get to the first  
>> message.
>> >> > +                     if( !result.getMessageId().equals
>> >> (firstMessageId) )
>> >> > +                             return null;
>> >> > +                     firstMessageId = null;
>> >> > +             }
>> >> > +             if( lastMessageId != null ) {
>> >> > +                     if( result.getMessageId().equals
>> >> (lastMessageId) ) {
>> >> > +                             empty=true;
>> >> > +                     }
>> >> > +             }
>> >> > +            result.setRegionDestination(regionDestination);
>> >> > +            return result;
>> >> > +        }
>> >> >      }
>> >> >
>> >> >      public void reset(){
>> >> > @@ -130,13 +162,7 @@
>> >> >
>> >> >      // implementation
>> >> >      protected void fillBatch() throws Exception{
>> >> > -        store.recoverNextMessages(clientId,subscriberName,
>> >> > -                maxBatchSize,this);
>> >> > -        // this will add more messages to the batch list
>> >> > -        if(!batchList.isEmpty()){
>> >> > -            Message message=(Message)batchList.getLast();
>> >> > -
>> >> > -        }
>> >> > +        store.recoverNextMessages
>> >> > (clientId,subscriberName,maxBatchSize,this);
>> >> >      }
>> >> >
>> >> >      public void gc() {
>> >> >
>> >> >
>> >>
>> >>
>> >
>> >
>> > --
>> > Regards,
>> > Hiram
>> >
>> > Blog: http://hiramchirino.com
>>
>>
>
>
> -- 
> Regards,
> Hiram
>
> Blog: http://hiramchirino.com


Mime
View raw message