activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rob Davies <rajdav...@gmail.com>
Subject Re: Optimising PrefetchSubscription.dispatchPending() ideas
Date Wed, 13 Feb 2008 06:39:10 GMT
Hi David,

I think this is a valid patch. What I'm looking at at the moment is  
only adding messages to a QueueSubscriber's pending list which it can  
dispatch - rather than lots checking to see if they are able to  
dispatch it.

cheers,

Rob
On Feb 13, 2008, at 6:22 AM, David Sitsky wrote:

> Hi Rob,
>
> I changed the condition for when to check the "trash" list to:
>
>               if (count > 0 || trash.size() > 1000)
>
> this gave much better performance for a range of application data.   
> I've re-attached the patch again to avoid confusion.  As I said  
> before - the broker consumes far less CPU now than before, so I am  
> able to add a lot more consumers now.
>
> Any thoughts?  Are there better ways of implementing this?
>
> Cheers,
> David
>
> David Sitsky wrote:
>> Hi Rob,
>> I was using a version that did have you most recent changes.
>> To give you a better idea of what I meant, I hacked up some changes  
>> which you can see from the attached patch.
>> The idea is instead of going through the pending list performing  
>> the same computations over and over again on messages which have  
>> been already been handled by other subscriptions, to move them to  
>> another list.
>> For a particular run, this reduced my application run-time from 47  
>> minutes to 38 minutes.
>> I'm sure there are better ways of implementing this - but do you  
>> see what I mean?
>> Cheers,
>> David
>> Rob Davies wrote:
>>> David,
>>>
>>> which release are you working on ? There was a change last night  
>>> in Queue's that might affect the cpu usage.
>>> On Feb 8, 2008, at 5:11 PM, David Sitsky wrote:
>>>
>>>> In my application, I have noticed with 20 consumers, the broker's  
>>>> CPU is going through the roof, with many threads in  
>>>> PrefetchSubscription.dispatchPending().  With my consumers, it  
>>>> might be 500-1000 messages dispatched before a commit() can be  
>>>> called.  With 20 consumers, this means there can be a build-up of  
>>>> 20,000 uncommited messages lying around the system, let-alone the  
>>>> new messages which are being pumped into the system at a furious  
>>>> rate.  Not nice I know, but I don't have much choice about it at  
>>>> the moment, for application-specific reasons.
>>>>
>>>> As you can imagine, I can have some very big pending queue sizes  
>>>> - sometimes 100,000 in size.
>>>>
>>>> I am experimenting with different prefetch sizes which may help,  
>>>> but I suspect every time a prefetch thread is trying to dispatch  
>>>> a message, it might have to iterate through very large numbers of  
>>>> deleted messages or messages which have been claimed by other  
>>>> subscribers before it finds a matching message.  Multiply this by  
>>>> 20, and there is a lot of CPU being consumed.  This worries me  
>>>> for scalability reasons - if I want to keep bumping up the number  
>>>> of consumers.
>>>>
>>>> I'm not sure what the best way of improving this is... is it  
>>>> possible when we call dispatchPending() to not call  
>>>> pendingMessageCursor.reset() perhaps?
>>> reset() is a nop for the QueueStoreCursor :(
>>>>
>>>>
>>>> I'm trying to understand why we need to reset the cursor, when  
>>>> presumably all off the messages we have gone over before in a  
>>>> previous dispatchPending() call are either deleted, dispatched or  
>>>> locked by another node, and therefore don't need to be checked  
>>>> again (or we check if we reach the end of the cursor list)?
>>> I
>>>>
>>>>
>>>> I realise if a transaction is rolled back, that a message that  
>>>> was previously locked by another consumer may be freed.  There  
>>>> are probably message ordering isues too.
>>>>
>>>> Is it possible when we are iterating through the cursor if we  
>>>> find a node locked by another consumer to perhaps move it to the  
>>>> end of the cursor (or another list) and check it only if we found  
>>>> no matches?
>>>>
>>>> I'm sure there are a lot of complexities here I am not aware of -  
>>>> but I am curious what others think.
>>>>
>>>> Doing this sort of chance should reduce the latencies and CPU  
>>>> usage of the broker significantly.
>>>>
>>>> Cheers,
>>>> David
>>>>
>>>>
>
>
> -- 
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280  
> 0699
> Web: http://www.nuix.com                            Fax: +61 2 9212  
> 6902
> Index: activemq-core/src/main/java/org/apache/activemq/broker/region/ 
> PrefetchSubscription.java
> ===================================================================
> --- activemq-core/src/main/java/org/apache/activemq/broker/region/ 
> PrefetchSubscription.java	(revision 619666)
> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/ 
> PrefetchSubscription.java	(working copy)
> @@ -19,6 +19,7 @@
> import java.io.IOException;
> import java.util.ArrayList;
> import java.util.Iterator;
> +import java.util.LinkedList;
> import java.util.List;
> import java.util.concurrent.CopyOnWriteArrayList;
>
> @@ -54,6 +55,7 @@
>
>     private static final Log LOG =  
> LogFactory.getLog(PrefetchSubscription.class);
>     protected PendingMessageCursor pending;
> +    protected List<MessageReference> trash = new  
> LinkedList<MessageReference>();
>     protected final List<MessageReference> dispatched = new  
> CopyOnWriteArrayList<MessageReference>();
>     protected int prefetchExtension;
>     protected long enqueueCounter;
> @@ -439,22 +449,41 @@
>
>     protected void dispatchPending() throws IOException {
>         if (!isSlave()) {
> +           int count = 0;
>            synchronized(pendingLock) {
>                 try {
>                     int numberToDispatch = countBeforeFull();
>                     if (numberToDispatch > 0) {
>                         pending.setMaxBatchSize(numberToDispatch);
> -                        int count = 0;
>                         pending.reset();
>                         while (pending.hasNext() && !isFull()
>                                 && count < numberToDispatch) {
>                             MessageReference node = pending.next();
> +                            LockOwner lockOwner;
>                             if (node == null) {
>                                 break;
>                             }
>                             if(isDropped(node)) {
>                                 pending.remove();
>                             }
> +                            else if (node instanceof  
> QueueMessageReference &&
> +                                      
> (((QueueMessageReference)node).isAcked()))
> +                            {
> +                                // Message has been acked.  Move it  
> to the trash, since it
> +                                // is unlikely to be dispatched to  
> this subscription.
> +                                pending.remove();
> +                                trash.add(node);
> +                            }
> +                            else if (node instanceof  
> IndirectMessageReference &&
> +                                     (lockOwner =  
> ((IndirectMessageReference)node).getLockOwner()) != null &&
> +                                     lockOwner != this)
> +                            {
> +                                // Message which has been locked by  
> another subscription.
> +                                // Move it to the trash, since it  
> is unlikely to be
> +                                // dispatched to this subscription.
> +                                pending.remove();
> +                                trash.add(node);
> +                            }
>                             else if (canDispatch(node)) {
>                                 pending.remove();
>                                 // Message may have been sitting in  
> the pending
> @@ -475,7 +504,40 @@
>                 } finally {
>                     pending.release();
>                 }
> -            }
> +
> +               // Check if any trash can be cleaned up or some  
> messages need to be placed
> +               // back into the pending list.
> +               if (count > 0 || trash.size() > 1000)
> +               {
> +                   for (Iterator<MessageReference> iter =  
> trash.iterator(); iter.hasNext();)
> +                   {
> +                       MessageReference node = iter.next();
> +                       if (isDropped(node))
> +                       {
> +                           // Message has been deleted, so it can  
> be removed.
> +                           iter.remove();
> +                       }
> +                       else if ((node instanceof  
> QueueMessageReference &&
> +                               !((QueueMessageReference)  
> node).isAcked()) ||
> +                                (node instanceof  
> IndirectMessageReference &&
> +                               ((IndirectMessageReference)  
> node).getLockOwner() == null))
> +                       {
> +                           // Message is no longer acked or it is  
> not locked by anyone
> +                           // probably due to a rolledback  
> transaction.  Re-inject it into
> +                           // the pending list again.  This  
> shouldn't be very common.
> +                           try
> +                           {
> +                               pending.addMessageLast(node);
> +                               iter.remove();
> +                           }
> +                           catch (Exception e)
> +                           {
> +                               throw new IOException("Unable to add  
> message to pending list", e);
> +                           }
> +                       }
> +                   }
> +               }
> +           }
>         }
>     }
>


Mime
View raw message