activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Sitsky <s...@nuix.com>
Subject Re: Queue performance from recent changes
Date Mon, 10 Mar 2008 06:07:39 GMT
Hi Rob,

I'll give it a go in the next day or two when I get some spare time. 
 From what I can see, we call iterate() inline rather than delegating it 
to the dedicated "wakeup" thread when optimizeDispatch is set.

 From memory, this caused contention with synchronisation blocks last 
time - as many consumers would end up calling iterate().  Would this 
still not be an issue?  I must admit I'd need to check the code closely.

Cheers,
David

Rob Davies wrote:
> David,
> 
> you might like to try enabling the optimizeDispatch property on the 
> Destination policy map - see 
> http://activemq.apache.org/configure-version-5-brokers.html from trunk, 
> if you are using non-persistent messages
> 
> cheers,
> 
> Rob
> On 6 Mar 2008, at 22:48, David Sitsky wrote:
> 
>> I am sure it will be application-dependent, so making it a policy 
>> makes a lot of sense.  For my application, I only have a pending size 
>> of 1 since each work item's processing requirements can vary 
>> tremendously.
>>
>> Just curious - what kind of benchmarks did you run this against?  I'm 
>> curious to know what kind of performance degregation you saw.. it 
>> would be interesting to understand why.  I am using non-persistent 
>> messaging, so perhaps that could make a difference, since I am only 
>> paging a small number of messages in at a time.
>>
>> Cheers,
>> David
>>
>> Rob Davies wrote:
>>> Hi David,
>>> Yes - actually - I tried it a few days ago. I haven't committed it 
>>> because message throughput is generally lower. I will look at making 
>>> it optional via a destination policy
>>> cheers,
>>> Rob
>>> On 6 Mar 2008, at 05:54, David Sitsky wrote:
>>>> Hi Rob,
>>>>
>>>> I know its been a couple of weeks.  I've been using my changes for a 
>>>> while and I see nice CPU and memory usage on the broker, and good 
>>>> messaging performance for my application.  Have you had a chance to 
>>>> try it out?
>>>>
>>>> Cheers,
>>>> David
>>>>
>>>> Rob Davies wrote:
>>>>> Hi David,
>>>>> thanks for the great feedback - will try your patch and see how it 
>>>>> works!
>>>>> cheers,
>>>>> Rob
>>>>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>>>>> Hi Rob,
>>>>>>
>>>>>> I like the new changes, but with the changes as they are, for my

>>>>>> application for one of my benchmarks, it takes twice as long to 
>>>>>> complete.
>>>>>>
>>>>>> I believe the culprit for this is that when the new code can't 
>>>>>> find a consumer which is not full, the broker chooses the consumer

>>>>>> with the lowest dispatch queue size.
>>>>>>
>>>>>> In my application, since I have a prefetch size of 1, and have 
>>>>>> longish-running transactions, the dispatch queue size is not 
>>>>>> indicative of the current load for that consumer.  As a result, I

>>>>>> think this is what is responsible for poor load-balancing in my case.
>>>>>>
>>>>>> For applications which commit() after each processed message, I am

>>>>>> sure this wouldn't be the case.  In some ways, reverting to the 
>>>>>> old behaviour of adding the pending message to all consumers might

>>>>>> lead to better load balancing with this code.
>>>>>>
>>>>>> However - I think it is better if the consumers can decide when 
>>>>>> they want more messages rather than the broker pushing messages at

>>>>>> them? I've attached a patch which demonstrates this.  When 
>>>>>> LAZY_DISPATCH is set to true (set via a system property for now 
>>>>>> for testing purposes) this changes the behaviour slightly.
>>>>>>
>>>>>> The basic idea is pageInMessages() only pages in the minimum 
>>>>>> number of messages that can be dispatched immediately to non-full

>>>>>> consumers. Whenever a consumer acks a message, which updates its

>>>>>> prefetch size, we make sure Queue.wakeup() is called so that the

>>>>>> consumer will receive new messages.
>>>>>>
>>>>>> With this change in effect - I see slightly faster or almost the

>>>>>> same times with the previous benchmark.  However memory usage on

>>>>>> the broker is far better, as the pending queues for each consumer

>>>>>> is either 0 or very small.
>>>>>>
>>>>>> What do you think?  I guess there are better ways of doing this.
>>>>>>
>>>>>> I am doing a large overnight run with 16 consumers, so we'll see

>>>>>> how the  performance goes.
>>>>>>
>>>>>> You'll also notice in the patch, that in Queue.addSubscriber(), I

>>>>>> thought there didn't seem to be any need for adding a message to
a 
>>>>>> new consumer if the message has already been locked by another 
>>>>>> consumer?
>>>>>>
>>>>>> Cheers,
>>>>>> David
>>>>>>
>>>>>> Rob Davies wrote:
>>>>>>> Hi David,
>>>>>>> please let us know if these changes helps/hinders your app!
>>>>>>> cheers,
>>>>>>> Rob
>>>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>>>>> If what I said above is true, then the immediately
above if 
>>>>>>>>>> statement needs to be moved outside its enclosing
if - 
>>>>>>>>>> otherwise it only gets executed when targets != null.
 We'd 
>>>>>>>>>> want this to execute if we found a matching target
wouldn't we?
>>>>>>>>> Don't think so? We only want the message going to  one

>>>>>>>>> subscription? I may have misunderstood what you mean!
>>>>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> David
>>>>>>>>
>>>>>>
>>>>>>
>>>>>> -- 
>>>>>> Cheers,
>>>>>> David
>>>>>>
>>>>>> Nuix Pty Ltd
>>>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 
>>>>>> 9280 0699
>>>>>> Web: http://www.nuix.com                            Fax: +61 2 
>>>>>> 9212 6902
>>>>>> Index: 
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java

>>>>>>
>>>>>> ===================================================================
>>>>>> --- 
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
   
>>>>>> (revision 628917)
>>>>>> +++ 
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
   
>>>>>> (working copy)
>>>>>> @@ -160,6 +160,8 @@
>>>>>>   public  void acknowledge(final ConnectionContext context,final

>>>>>> MessageAck ack) throws Exception {
>>>>>>       // Handle the standard acknowledgment case.
>>>>>>       boolean callDispatchMatched = false;
>>>>>> +    Queue queue = null;
>>>>>> +           synchronized(dispatchLock) {
>>>>>>           if (ack.isStandardAck()) {
>>>>>>               // Acknowledge all dispatched messages up till the

>>>>>> message id of
>>>>>> @@ -223,8 +225,12 @@
>>>>>>                               prefetchExtension = Math.max(0,
>>>>>>                                       prefetchExtension - (index
+ 
>>>>>> 1));
>>>>>>                           }
>>>>>> +                if (queue == null)
>>>>>> +                {
>>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>>> +                }
>>>>>>                           callDispatchMatched = true;
>>>>>> -                            break;
>>>>>> +                break;
>>>>>>                       }
>>>>>>                   }
>>>>>>               }
>>>>>> @@ -253,6 +259,10 @@
>>>>>>                   if 
>>>>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>>>>                       prefetchExtension = Math.max(prefetchExtension,
>>>>>>                               index + 1);
>>>>>> +                        if (queue == null)
>>>>>> +                        {
>>>>>> +                            queue = 
>>>>>> (Queue)node.getRegionDestination();
>>>>>> +                        }
>>>>>>                       callDispatchMatched = true;
>>>>>>                       break;
>>>>>>                   }
>>>>>> @@ -279,6 +289,10 @@
>>>>>>                   if (inAckRange) {
>>>>>>                       node.incrementRedeliveryCounter();
>>>>>>                       if (ack.getLastMessageId().equals(messageId))
{
>>>>>> +                if (queue == null)
>>>>>> +                {
>>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>>> +                }
>>>>>>                           callDispatchMatched = true;
>>>>>>                           break;
>>>>>>                       }
>>>>>> @@ -320,6 +334,10 @@
>>>>>>                       if (ack.getLastMessageId().equals(messageId))
{
>>>>>>                           prefetchExtension = Math.max(0, 
>>>>>> prefetchExtension
>>>>>>                                   - (index + 1));
>>>>>> +                if (queue == null)
>>>>>> +                {
>>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>>> +                }
>>>>>>                           callDispatchMatched = true;
>>>>>>                           break;
>>>>>>                       }
>>>>>> @@ -336,6 +354,9 @@
>>>>>>           }
>>>>>>       }
>>>>>>       if (callDispatchMatched) {
>>>>>> +        if (Queue.LAZY_DISPATCH) {
>>>>>> +        queue.wakeup();
>>>>>> +        }
>>>>>>           dispatchPending();
>>>>>>       } else {
>>>>>>           if (isSlave()) {
>>>>>> Index: 
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

>>>>>>
>>>>>> ===================================================================
>>>>>> --- 
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
   
>>>>>> (revision 628917)
>>>>>> +++ 
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
   
>>>>>> (working copy)
>>>>>> @@ -75,6 +75,8 @@
>>>>>> * @version $Revision: 1.28 $
>>>>>> */
>>>>>> public class Queue extends BaseDestination implements Task {
>>>>>> +    public static final boolean LAZY_DISPATCH =
>>>>>> +    
>>>>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch",

>>>>>> "true"));
>>>>>>   private final Log log;
>>>>>>   private final List<Subscription> consumers = new 
>>>>>> ArrayList<Subscription>(50);
>>>>>>   private PendingMessageCursor messages;
>>>>>> @@ -212,12 +214,12 @@
>>>>>>           synchronized (pagedInMessages) {
>>>>>>               // Add all the matching messages in the queue to the
>>>>>>               // subscription.
>>>>>> -
>>>>>>               for (Iterator<MessageReference> i = 
>>>>>> pagedInMessages.values()
>>>>>>                       .iterator(); i.hasNext();) {
>>>>>>                   QueueMessageReference node = 
>>>>>> (QueueMessageReference) i
>>>>>>                           .next();
>>>>>> -                    if (!node.isDropped() && !node.isAcked()
&& 
>>>>>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>>>>> +                    if ((!node.isDropped() || 
>>>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>>>>> +            node.getLockOwner() == null) {
>>>>>>                       msgContext.setMessageReference(node);
>>>>>>                       if (sub.matches(node, msgContext)) {
>>>>>>                           sub.add(node);
>>>>>> @@ -940,7 +945,11 @@
>>>>>>       dispatchLock.lock();
>>>>>>       try{
>>>>>>
>>>>>> -            final int toPageIn = getMaxPageSize() - 
>>>>>> pagedInMessages.size();
>>>>>> +            int toPageIn = getMaxPageSize() - 
>>>>>> pagedInMessages.size();
>>>>>> +        if (LAZY_DISPATCH) {
>>>>>> +        // Only page in the minimum number of messages which can

>>>>>> be dispatched immediately.
>>>>>> +        toPageIn = Math.min(getConsumerMessageCountBeforeFull(),

>>>>>> toPageIn);
>>>>>> +        }
>>>>>>           if ((force || !consumers.isEmpty()) && toPageIn
> 0) {
>>>>>>               messages.setMaxBatchSize(toPageIn);
>>>>>>               int count = 0;
>>>>>> @@ -976,12 +985,25 @@
>>>>>>       }
>>>>>>       return result;
>>>>>>   }
>>>>>> +
>>>>>> +    private int getConsumerMessageCountBeforeFull() throws 
>>>>>> Exception {
>>>>>> +    int total = 0;
>>>>>> +        synchronized (consumers) {
>>>>>> +            for (Subscription s : consumers) {
>>>>>> +        if (s instanceof PrefetchSubscription) {
>>>>>> +            total += ((PrefetchSubscription)s).countBeforeFull();
>>>>>> +        }
>>>>>> +        }
>>>>>> +    }
>>>>>> +    return total;
>>>>>> +    }
>>>>>>
>>>>>>   private void doDispatch(List<MessageReference> list) throws

>>>>>> Exception {
>>>>>>
>>>>>>       if (list != null) {
>>>>>>           synchronized (consumers) {
>>>>>>               for (MessageReference node : list) {
>>>>>> +
>>>>>>                   Subscription target = null;
>>>>>>                   List<Subscription> targets = null;
>>>>>>                   for (Subscription s : consumers) {
>>>>
>>>>
>>>> -- 
>>>> Cheers,
>>>> David
>>>>
>>>> Nuix Pty Ltd
>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 
>>>> 0699
>>>> Web: http://www.nuix.com                            Fax: +61 2 9212 
>>>> 6902
>>
>>
>> -- 
>> Cheers,
>> David
>>
>> Nuix Pty Ltd
>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
>> Web: http://www.nuix.com                            Fax: +61 2 9212 6902
> 
> 


-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Mime
View raw message