activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Sitsky <s...@nuix.com>
Subject Re: Optimising PrefetchSubscription.dispatchPending() ideas
Date Wed, 13 Feb 2008 22:49:42 GMT
Hi Rob,

That sounds like another good optimisation.  I guess we probably still 
need both changes, since if a large number of messages are being 
injected into the system, we will still end up with a large number of 
messages on the pending lists for all subscribers.

Cheers,
David

Rob Davies wrote:
> Hi David,
> 
> I think this is a valid patch. What I'm looking at at the moment is only 
> adding messages to a QueueSubscriber's pending list which it can 
> dispatch - rather than lots checking to see if they are able to dispatch 
> it.
> 
> cheers,
> 
> Rob
> On Feb 13, 2008, at 6:22 AM, David Sitsky wrote:
> 
>> Hi Rob,
>>
>> I changed the condition for when to check the "trash" list to:
>>
>>               if (count > 0 || trash.size() > 1000)
>>
>> this gave much better performance for a range of application data.  
>> I've re-attached the patch again to avoid confusion.  As I said before 
>> - the broker consumes far less CPU now than before, so I am able to 
>> add a lot more consumers now.
>>
>> Any thoughts?  Are there better ways of implementing this?
>>
>> Cheers,
>> David
>>
>> David Sitsky wrote:
>>> Hi Rob,
>>> I was using a version that did have you most recent changes.
>>> To give you a better idea of what I meant, I hacked up some changes 
>>> which you can see from the attached patch.
>>> The idea is instead of going through the pending list performing the 
>>> same computations over and over again on messages which have been 
>>> already been handled by other subscriptions, to move them to another 
>>> list.
>>> For a particular run, this reduced my application run-time from 47 
>>> minutes to 38 minutes.
>>> I'm sure there are better ways of implementing this - but do you see 
>>> what I mean?
>>> Cheers,
>>> David
>>> Rob Davies wrote:
>>>> David,
>>>>
>>>> which release are you working on ? There was a change last night in 
>>>> Queue's that might affect the cpu usage.
>>>> On Feb 8, 2008, at 5:11 PM, David Sitsky wrote:
>>>>
>>>>> In my application, I have noticed with 20 consumers, the broker's 
>>>>> CPU is going through the roof, with many threads in 
>>>>> PrefetchSubscription.dispatchPending().  With my consumers, it 
>>>>> might be 500-1000 messages dispatched before a commit() can be 
>>>>> called.  With 20 consumers, this means there can be a build-up of 
>>>>> 20,000 uncommited messages lying around the system, let-alone the 
>>>>> new messages which are being pumped into the system at a furious 
>>>>> rate.  Not nice I know, but I don't have much choice about it at 
>>>>> the moment, for application-specific reasons.
>>>>>
>>>>> As you can imagine, I can have some very big pending queue sizes - 
>>>>> sometimes 100,000 in size.
>>>>>
>>>>> I am experimenting with different prefetch sizes which may help, 
>>>>> but I suspect every time a prefetch thread is trying to dispatch a 
>>>>> message, it might have to iterate through very large numbers of 
>>>>> deleted messages or messages which have been claimed by other 
>>>>> subscribers before it finds a matching message.  Multiply this by 
>>>>> 20, and there is a lot of CPU being consumed.  This worries me for 
>>>>> scalability reasons - if I want to keep bumping up the number of 
>>>>> consumers.
>>>>>
>>>>> I'm not sure what the best way of improving this is... is it 
>>>>> possible when we call dispatchPending() to not call 
>>>>> pendingMessageCursor.reset() perhaps?
>>>> reset() is a nop for the QueueStoreCursor :(
>>>>>
>>>>>
>>>>> I'm trying to understand why we need to reset the cursor, when 
>>>>> presumably all off the messages we have gone over before in a 
>>>>> previous dispatchPending() call are either deleted, dispatched or 
>>>>> locked by another node, and therefore don't need to be checked 
>>>>> again (or we check if we reach the end of the cursor list)?
>>>> I
>>>>>
>>>>>
>>>>> I realise if a transaction is rolled back, that a message that was 
>>>>> previously locked by another consumer may be freed.  There are 
>>>>> probably message ordering isues too.
>>>>>
>>>>> Is it possible when we are iterating through the cursor if we find 
>>>>> a node locked by another consumer to perhaps move it to the end of 
>>>>> the cursor (or another list) and check it only if we found no matches?
>>>>>
>>>>> I'm sure there are a lot of complexities here I am not aware of - 
>>>>> but I am curious what others think.
>>>>>
>>>>> Doing this sort of chance should reduce the latencies and CPU usage 
>>>>> of the broker significantly.
>>>>>
>>>>> Cheers,
>>>>> David
>>>>>
>>>>>
>>
>>
>> -- 
>> Cheers,
>> David
>>
>> Nuix Pty Ltd
>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
>> Web: http://www.nuix.com                            Fax: +61 2 9212 6902
>> Index: 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java

>>
>> ===================================================================
>> --- 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
   
>> (revision 619666)
>> +++ 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
   
>> (working copy)
>> @@ -19,6 +19,7 @@
>> import java.io.IOException;
>> import java.util.ArrayList;
>> import java.util.Iterator;
>> +import java.util.LinkedList;
>> import java.util.List;
>> import java.util.concurrent.CopyOnWriteArrayList;
>>
>> @@ -54,6 +55,7 @@
>>
>>     private static final Log LOG = 
>> LogFactory.getLog(PrefetchSubscription.class);
>>     protected PendingMessageCursor pending;
>> +    protected List<MessageReference> trash = new 
>> LinkedList<MessageReference>();
>>     protected final List<MessageReference> dispatched = new 
>> CopyOnWriteArrayList<MessageReference>();
>>     protected int prefetchExtension;
>>     protected long enqueueCounter;
>> @@ -439,22 +449,41 @@
>>
>>     protected void dispatchPending() throws IOException {
>>         if (!isSlave()) {
>> +           int count = 0;
>>            synchronized(pendingLock) {
>>                 try {
>>                     int numberToDispatch = countBeforeFull();
>>                     if (numberToDispatch > 0) {
>>                         pending.setMaxBatchSize(numberToDispatch);
>> -                        int count = 0;
>>                         pending.reset();
>>                         while (pending.hasNext() && !isFull()
>>                                 && count < numberToDispatch) {
>>                             MessageReference node = pending.next();
>> +                            LockOwner lockOwner;
>>                             if (node == null) {
>>                                 break;
>>                             }
>>                             if(isDropped(node)) {
>>                                 pending.remove();
>>                             }
>> +                            else if (node instanceof 
>> QueueMessageReference &&
>> +                                     
>> (((QueueMessageReference)node).isAcked()))
>> +                            {
>> +                                // Message has been acked.  Move it 
>> to the trash, since it
>> +                                // is unlikely to be dispatched to 
>> this subscription.
>> +                                pending.remove();
>> +                                trash.add(node);
>> +                            }
>> +                            else if (node instanceof 
>> IndirectMessageReference &&
>> +                                     (lockOwner = 
>> ((IndirectMessageReference)node).getLockOwner()) != null &&
>> +                                     lockOwner != this)
>> +                            {
>> +                                // Message which has been locked by 
>> another subscription.
>> +                                // Move it to the trash, since it is 
>> unlikely to be
>> +                                // dispatched to this subscription.
>> +                                pending.remove();
>> +                                trash.add(node);
>> +                            }
>>                             else if (canDispatch(node)) {
>>                                 pending.remove();
>>                                 // Message may have been sitting in 
>> the pending
>> @@ -475,7 +504,40 @@
>>                 } finally {
>>                     pending.release();
>>                 }
>> -            }
>> +
>> +               // Check if any trash can be cleaned up or some 
>> messages need to be placed
>> +               // back into the pending list.
>> +               if (count > 0 || trash.size() > 1000)
>> +               {
>> +                   for (Iterator<MessageReference> iter = 
>> trash.iterator(); iter.hasNext();)
>> +                   {
>> +                       MessageReference node = iter.next();
>> +                       if (isDropped(node))
>> +                       {
>> +                           // Message has been deleted, so it can be 
>> removed.
>> +                           iter.remove();
>> +                       }
>> +                       else if ((node instanceof 
>> QueueMessageReference &&
>> +                               !((QueueMessageReference) 
>> node).isAcked()) ||
>> +                                (node instanceof 
>> IndirectMessageReference &&
>> +                               ((IndirectMessageReference) 
>> node).getLockOwner() == null))
>> +                       {
>> +                           // Message is no longer acked or it is not 
>> locked by anyone
>> +                           // probably due to a rolledback 
>> transaction.  Re-inject it into
>> +                           // the pending list again.  This shouldn't 
>> be very common.
>> +                           try
>> +                           {
>> +                               pending.addMessageLast(node);
>> +                               iter.remove();
>> +                           }
>> +                           catch (Exception e)
>> +                           {
>> +                               throw new IOException("Unable to add 
>> message to pending list", e);
>> +                           }
>> +                       }
>> +                   }
>> +               }
>> +           }
>>         }
>>     }
>>


-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Mime
View raw message