activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hiram Chirino" <hi...@hiramchirino.com>
Subject Re: svn commit: r491346 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java cursors/TopicStorePrefetch.java
Date Sun, 31 Dec 2006 08:32:42 GMT
On 12/31/06, Rob Davies <rajdavies@gmail.com> wrote:
>
> On 31 Dec 2006, at 07:59, Hiram Chirino wrote:
>
> > On 12/31/06, Rob Davies <rajdavies@gmail.com> wrote:
> >> Hey Hiram,
> >>
> >> this change breaks  org.apache.activemq.broker.RecoveryBrokerTest,
> >> oorg.apache.activemq.broker.BrokerTest, etc   for me.
> >>
> >
> > yeah I think I have fix for that. sorry I broke it.  I'm running the
> > test suite again now.  Basically I think I need to default boolean
> > empty=false;  So that an initial recovery of subscription is done.
> >
> >> also - I'm not sure I like TopicStorePrefetch possibly returning null
> >> when a hasNext() has returned true
> >>
> >
> > Yeah me neither :)  I did not fully understand why it was returning
> > null when I expected it to return a value.  I was thinking it could be
> > a timing issue with the MessageStore.
> >
> >> What was the problem in CursorDurableTest ? I hadn't seen that one
> >>
> >
> > CursorDurableTest had a test that was failing due to out of
> > order/duplicates showing up.  This was cause sometimes some messages
> > were direct dispatched and at other times they are dispatched from the
> > pending list.  But since the pending list's .next() was returning the
> > items that were directly dispatched and not even added to the pending
> > list.  This is when the dups and out of order issues would show up.
> >
> > The problem is that TopicStorePrefetch.next() was returning everything
> > added to the durable subscription since it's backed by the
> > MessageStore.  And that's not what we want.  We only want it to return
> > things that are explicitly added to it since it's the pending list.
>
> I wonder if the real problem is then in PrefetchSubscription.add() -
> because only if pending is empty (nothing in the store) should it
> dispatch directly

Could be an interaction.  I think TopicStorePrefetch still needs a
little more work.  I think we need to recover the TopicStorePrefetch
when the the durable subscription is created so that way we know if it
is initially empty or not.

> >
> >
> >> cheers,
> >>
> >> Rob
> >>
> >> On 30 Dec 2006, at 23:49, chirino@apache.org wrote:
> >>
> >> > Author: chirino
> >> > Date: Sat Dec 30 15:49:03 2006
> >> > New Revision: 491346
> >> >
> >> > URL: http://svn.apache.org/viewvc?view=rev&rev=491346
> >> > Log:
> >> > Fix for CursorDurableTest.
> >> > The TopicStorePrefetch was iterating items that were in the
> >> > subscription but not added to the pending list.
> >> >
> >> > Modified:
> >> >     incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> apache/
> >> > activemq/broker/region/PrefetchSubscription.java
> >> >     incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> apache/
> >> > activemq/broker/region/cursors/TopicStorePrefetch.java
> >> >
> >> > Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> > apache/activemq/broker/region/PrefetchSubscription.java
> >> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/
> >> activemq-
> >> > core/src/main/java/org/apache/activemq/broker/region/
> >> > PrefetchSubscription.java?view=diff&rev=491346&r1=491345&r2=491346
> >> >
> >> =====================================================================
> >> =
> >> > ========
> >> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> apache/
> >> > activemq/broker/region/PrefetchSubscription.java (original)
> >> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> apache/
> >> > activemq/broker/region/PrefetchSubscription.java Sat Dec 30
> >> > 15:49:03 2006
> >> > @@ -406,7 +406,9 @@
> >> >                              pending.reset();
> >> >                              while(pending.hasNext()&&!isFull()
> >> > &&count<numberToDispatch){
> >> >                                  MessageReference
> >> node=pending.next();
> >> > -
> >> > +                                if ( node == null )
> >> > +                                     break;
> >> > +
> >> >                                  if(canDispatch(node)){
> >> >                                      pending.remove();
> >> >                                      // Message may have been
> >> > sitting in the pending list a while
> >> >
> >> > Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> > apache/activemq/broker/region/cursors/TopicStorePrefetch.java
> >> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/
> >> activemq-
> >> > core/src/main/java/org/apache/activemq/broker/region/cursors/
> >> > TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346
> >> >
> >> =====================================================================
> >> =
> >> > ========
> >> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> apache/
> >> > activemq/broker/region/cursors/TopicStorePrefetch.java (original)
> >> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> apache/
> >> > activemq/broker/region/cursors/TopicStorePrefetch.java Sat Dec 30
> >> > 15:49:03 2006
> >> > @@ -20,7 +20,7 @@
> >> >
> >> >  import java.io.IOException;
> >> >  import java.util.LinkedList;
> >> > -import javax.jms.JMSException;
> >> > +
> >> >  import org.apache.activemq.broker.region.Destination;
> >> >  import org.apache.activemq.broker.region.MessageReference;
> >> >  import org.apache.activemq.broker.region.Topic;
> >> > @@ -48,6 +48,10 @@
> >> >      private String subscriberName;
> >> >      private Destination regionDestination;
> >> >
> >> > +    boolean empty=true;
> >> > +     private MessageId firstMessageId;
> >> > +     private MessageId lastMessageId;
> >> > +
> >> >      /**
> >> >       * @param topic
> >> >       * @param clientId
> >> > @@ -73,7 +77,7 @@
> >> >       * @return true if there are no pending messages
> >> >       */
> >> >      public boolean isEmpty(){
> >> > -        return batchList.isEmpty();
> >> > +        return empty;
> >> >      }
> >> >
> >> >      public synchronized int size(){
> >> > @@ -86,27 +90,55 @@
> >> >      }
> >> >
> >> >      public synchronized void addMessageLast(MessageReference node)
> >> > throws Exception{
> >> > -        if(node!=null){
> >> > +             if(node!=null){
> >> > +                     if( empty ) {
> >> > +                             firstMessageId = node.getMessageId();
> >> > +                             empty=false;
> >> > +                     }
> >> > +             lastMessageId = node.getMessageId();
> >> >              node.decrementReferenceCount();
> >> >          }
> >> >      }
> >> >
> >> > -    public synchronized boolean hasNext(){
> >> > -        if(isEmpty()){
> >> > -            try{
> >> > -                fillBatch();
> >> > -            }catch(Exception e){
> >> > -                log.error("Failed to fill batch",e);
> >> > -                throw new RuntimeException(e);
> >> > -            }
> >> > -        }
> >> > +    public synchronized boolean hasNext() {
> >> >          return !isEmpty();
> >> >      }
> >> >
> >> >      public synchronized MessageReference next(){
> >> > -        Message result = (Message)batchList.removeFirst();
> >> > -        result.setRegionDestination(regionDestination);
> >> > -        return result;
> >> > +
> >> > +        if( empty ) {
> >> > +             return null;
> >> > +        } else {
> >> > +
> >> > +             // We may need to fill in the batch...
> >> > +            if(batchList.isEmpty()){
> >> > +                try{
> >> > +                    fillBatch();
> >> > +                }catch(Exception e){
> >> > +                    log.error("Failed to fill batch",e);
> >> > +                    throw new RuntimeException(e);
> >> > +                }
> >> > +                if( batchList.isEmpty()) {
> >> > +                     return null;
> >> > +                }
> >> > +            }
> >> > +
> >> > +            Message result = (Message)batchList.removeFirst();
> >> > +
> >> > +             if( firstMessageId != null ) {
> >> > +             // Skip messages until we get to the first message.
> >> > +                     if( !result.getMessageId().equals
> >> (firstMessageId) )
> >> > +                             return null;
> >> > +                     firstMessageId = null;
> >> > +             }
> >> > +             if( lastMessageId != null ) {
> >> > +                     if( result.getMessageId().equals
> >> (lastMessageId) ) {
> >> > +                             empty=true;
> >> > +                     }
> >> > +             }
> >> > +            result.setRegionDestination(regionDestination);
> >> > +            return result;
> >> > +        }
> >> >      }
> >> >
> >> >      public void reset(){
> >> > @@ -130,13 +162,7 @@
> >> >
> >> >      // implementation
> >> >      protected void fillBatch() throws Exception{
> >> > -        store.recoverNextMessages(clientId,subscriberName,
> >> > -                maxBatchSize,this);
> >> > -        // this will add more messages to the batch list
> >> > -        if(!batchList.isEmpty()){
> >> > -            Message message=(Message)batchList.getLast();
> >> > -
> >> > -        }
> >> > +        store.recoverNextMessages
> >> > (clientId,subscriberName,maxBatchSize,this);
> >> >      }
> >> >
> >> >      public void gc() {
> >> >
> >> >
> >>
> >>
> >
> >
> > --
> > Regards,
> > Hiram
> >
> > Blog: http://hiramchirino.com
>
>


-- 
Regards,
Hiram

Blog: http://hiramchirino.com

Mime
View raw message