activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rob Davies <rajdav...@gmail.com>
Subject Re: Queue performance from recent changes
Date Fri, 07 Mar 2008 16:54:35 GMT
David,

you might like to try enabling the optimizeDispatch property on the  
Destination policy map - see http://activemq.apache.org/configure-version-5-brokers.html 
  from trunk, if you are using non-persistent messages

cheers,

Rob
On 6 Mar 2008, at 22:48, David Sitsky wrote:

> I am sure it will be application-dependent, so making it a policy  
> makes a lot of sense.  For my application, I only have a pending  
> size of 1 since each work item's processing requirements can vary  
> tremendously.
>
> Just curious - what kind of benchmarks did you run this against?   
> I'm curious to know what kind of performance degregation you saw..  
> it would be interesting to understand why.  I am using non- 
> persistent messaging, so perhaps that could make a difference, since  
> I am only paging a small number of messages in at a time.
>
> Cheers,
> David
>
> Rob Davies wrote:
>> Hi David,
>> Yes - actually - I tried it a few days ago. I haven't committed it  
>> because message throughput is generally lower. I will look at  
>> making it optional via a destination policy
>> cheers,
>> Rob
>> On 6 Mar 2008, at 05:54, David Sitsky wrote:
>>> Hi Rob,
>>>
>>> I know its been a couple of weeks.  I've been using my changes for  
>>> a while and I see nice CPU and memory usage on the broker, and  
>>> good messaging performance for my application.  Have you had a  
>>> chance to try it out?
>>>
>>> Cheers,
>>> David
>>>
>>> Rob Davies wrote:
>>>> Hi David,
>>>> thanks for the great feedback - will try your patch and see how  
>>>> it works!
>>>> cheers,
>>>> Rob
>>>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>>>> Hi Rob,
>>>>>
>>>>> I like the new changes, but with the changes as they are, for my  
>>>>> application for one of my benchmarks, it takes twice as long to  
>>>>> complete.
>>>>>
>>>>> I believe the culprit for this is that when the new code can't  
>>>>> find a consumer which is not full, the broker chooses the  
>>>>> consumer with the lowest dispatch queue size.
>>>>>
>>>>> In my application, since I have a prefetch size of 1, and have  
>>>>> longish-running transactions, the dispatch queue size is not  
>>>>> indicative of the current load for that consumer.  As a result,  
>>>>> I think this is what is responsible for poor load-balancing in  
>>>>> my case.
>>>>>
>>>>> For applications which commit() after each processed message, I  
>>>>> am sure this wouldn't be the case.  In some ways, reverting to  
>>>>> the old behaviour of adding the pending message to all consumers  
>>>>> might lead to better load balancing with this code.
>>>>>
>>>>> However - I think it is better if the consumers can decide when  
>>>>> they want more messages rather than the broker pushing messages  
>>>>> at them? I've attached a patch which demonstrates this.  When  
>>>>> LAZY_DISPATCH is set to true (set via a system property for now  
>>>>> for testing purposes) this changes the behaviour slightly.
>>>>>
>>>>> The basic idea is pageInMessages() only pages in the minimum  
>>>>> number of messages that can be dispatched immediately to non- 
>>>>> full consumers. Whenever a consumer acks a message, which  
>>>>> updates its prefetch size, we make sure Queue.wakeup() is called  
>>>>> so that the consumer will receive new messages.
>>>>>
>>>>> With this change in effect - I see slightly faster or almost the  
>>>>> same times with the previous benchmark.  However memory usage on  
>>>>> the broker is far better, as the pending queues for each  
>>>>> consumer is either 0 or very small.
>>>>>
>>>>> What do you think?  I guess there are better ways of doing this.
>>>>>
>>>>> I am doing a large overnight run with 16 consumers, so we'll see  
>>>>> how the  performance goes.
>>>>>
>>>>> You'll also notice in the patch, that in Queue.addSubscriber(),  
>>>>> I thought there didn't seem to be any need for adding a message  
>>>>> to a new consumer if the message has already been locked by  
>>>>> another consumer?
>>>>>
>>>>> Cheers,
>>>>> David
>>>>>
>>>>> Rob Davies wrote:
>>>>>> Hi David,
>>>>>> please let us know if these changes helps/hinders your app!
>>>>>> cheers,
>>>>>> Rob
>>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>>>> If what I said above is true, then the immediately above
if  
>>>>>>>>> statement needs to be moved outside its enclosing if
-  
>>>>>>>>> otherwise it only gets executed when targets != null.
 We'd  
>>>>>>>>> want this to execute if we found a matching target wouldn't
 
>>>>>>>>> we?
>>>>>>>> Don't think so? We only want the message going to  one  
>>>>>>>> subscription? I may have misunderstood what you mean!
>>>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> David
>>>>>>>
>>>>>
>>>>>
>>>>> -- 
>>>>> Cheers,
>>>>> David
>>>>>
>>>>> Nuix Pty Ltd
>>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2  
>>>>> 9280 0699
>>>>> Web: http://www.nuix.com                            Fax: +61 2  
>>>>> 9212 6902
>>>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>> region/PrefetchSubscription.java
>>>>> = 
>>>>> ==================================================================
>>>>> --- activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>> region/PrefetchSubscription.java    (revision 628917)
>>>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>> region/PrefetchSubscription.java    (working copy)
>>>>> @@ -160,6 +160,8 @@
>>>>>   public  void acknowledge(final ConnectionContext context,final  
>>>>> MessageAck ack) throws Exception {
>>>>>       // Handle the standard acknowledgment case.
>>>>>       boolean callDispatchMatched = false;
>>>>> +    Queue queue = null;
>>>>> +           synchronized(dispatchLock) {
>>>>>           if (ack.isStandardAck()) {
>>>>>               // Acknowledge all dispatched messages up till the  
>>>>> message id of
>>>>> @@ -223,8 +225,12 @@
>>>>>                               prefetchExtension = Math.max(0,
>>>>>                                       prefetchExtension - (index  
>>>>> + 1));
>>>>>                           }
>>>>> +                if (queue == null)
>>>>> +                {
>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>> +                }
>>>>>                           callDispatchMatched = true;
>>>>> -                            break;
>>>>> +                break;
>>>>>                       }
>>>>>                   }
>>>>>               }
>>>>> @@ -253,6 +259,10 @@
>>>>>                   if  
>>>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>>>                       prefetchExtension =  
>>>>> Math.max(prefetchExtension,
>>>>>                               index + 1);
>>>>> +                        if (queue == null)
>>>>> +                        {
>>>>> +                            queue =  
>>>>> (Queue)node.getRegionDestination();
>>>>> +                        }
>>>>>                       callDispatchMatched = true;
>>>>>                       break;
>>>>>                   }
>>>>> @@ -279,6 +289,10 @@
>>>>>                   if (inAckRange) {
>>>>>                       node.incrementRedeliveryCounter();
>>>>>                       if  
>>>>> (ack.getLastMessageId().equals(messageId)) {
>>>>> +                if (queue == null)
>>>>> +                {
>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>> +                }
>>>>>                           callDispatchMatched = true;
>>>>>                           break;
>>>>>                       }
>>>>> @@ -320,6 +334,10 @@
>>>>>                       if  
>>>>> (ack.getLastMessageId().equals(messageId)) {
>>>>>                           prefetchExtension = Math.max(0,  
>>>>> prefetchExtension
>>>>>                                   - (index + 1));
>>>>> +                if (queue == null)
>>>>> +                {
>>>>> +                queue = (Queue)node.getRegionDestination();
>>>>> +                }
>>>>>                           callDispatchMatched = true;
>>>>>                           break;
>>>>>                       }
>>>>> @@ -336,6 +354,9 @@
>>>>>           }
>>>>>       }
>>>>>       if (callDispatchMatched) {
>>>>> +        if (Queue.LAZY_DISPATCH) {
>>>>> +        queue.wakeup();
>>>>> +        }
>>>>>           dispatchPending();
>>>>>       } else {
>>>>>           if (isSlave()) {
>>>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>> region/Queue.java
>>>>> = 
>>>>> ==================================================================
>>>>> --- activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>> region/Queue.java    (revision 628917)
>>>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/ 
>>>>> region/Queue.java    (working copy)
>>>>> @@ -75,6 +75,8 @@
>>>>> * @version $Revision: 1.28 $
>>>>> */
>>>>> public class Queue extends BaseDestination implements Task {
>>>>> +    public static final boolean LAZY_DISPATCH =
>>>>> +     
>>>>> Boolean 
>>>>> .parseBoolean(System.getProperty("activemq.lazy.dispatch",  
>>>>> "true"));
>>>>>   private final Log log;
>>>>>   private final List<Subscription> consumers = new  
>>>>> ArrayList<Subscription>(50);
>>>>>   private PendingMessageCursor messages;
>>>>> @@ -212,12 +214,12 @@
>>>>>           synchronized (pagedInMessages) {
>>>>>               // Add all the matching messages in the queue to the
>>>>>               // subscription.
>>>>> -
>>>>>               for (Iterator<MessageReference> i =  
>>>>> pagedInMessages.values()
>>>>>                       .iterator(); i.hasNext();) {
>>>>>                   QueueMessageReference node =  
>>>>> (QueueMessageReference) i
>>>>>                           .next();
>>>>> -                    if (!node.isDropped() && !node.isAcked()
&&  
>>>>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>>>> +                    if ((!node.isDropped() ||  
>>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>>>> +            node.getLockOwner() == null) {
>>>>>                       msgContext.setMessageReference(node);
>>>>>                       if (sub.matches(node, msgContext)) {
>>>>>                           sub.add(node);
>>>>> @@ -940,7 +945,11 @@
>>>>>       dispatchLock.lock();
>>>>>       try{
>>>>>
>>>>> -            final int toPageIn = getMaxPageSize() -  
>>>>> pagedInMessages.size();
>>>>> +            int toPageIn = getMaxPageSize() -  
>>>>> pagedInMessages.size();
>>>>> +        if (LAZY_DISPATCH) {
>>>>> +        // Only page in the minimum number of messages which  
>>>>> can be dispatched immediately.
>>>>> +        toPageIn =  
>>>>> Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
>>>>> +        }
>>>>>           if ((force || !consumers.isEmpty()) && toPageIn >
0) {
>>>>>               messages.setMaxBatchSize(toPageIn);
>>>>>               int count = 0;
>>>>> @@ -976,12 +985,25 @@
>>>>>       }
>>>>>       return result;
>>>>>   }
>>>>> +
>>>>> +    private int getConsumerMessageCountBeforeFull() throws  
>>>>> Exception {
>>>>> +    int total = 0;
>>>>> +        synchronized (consumers) {
>>>>> +            for (Subscription s : consumers) {
>>>>> +        if (s instanceof PrefetchSubscription) {
>>>>> +            total += ((PrefetchSubscription)s).countBeforeFull();
>>>>> +        }
>>>>> +        }
>>>>> +    }
>>>>> +    return total;
>>>>> +    }
>>>>>
>>>>>   private void doDispatch(List<MessageReference> list) throws 

>>>>> Exception {
>>>>>
>>>>>       if (list != null) {
>>>>>           synchronized (consumers) {
>>>>>               for (MessageReference node : list) {
>>>>> +
>>>>>                   Subscription target = null;
>>>>>                   List<Subscription> targets = null;
>>>>>                   for (Subscription s : consumers) {
>>>
>>>
>>> -- 
>>> Cheers,
>>> David
>>>
>>> Nuix Pty Ltd
>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2  
>>> 9280 0699
>>> Web: http://www.nuix.com                            Fax: +61 2  
>>> 9212 6902
>
>
> -- 
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280  
> 0699
> Web: http://www.nuix.com                            Fax: +61 2 9212  
> 6902


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message