activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rob Davies <rajdav...@gmail.com>
Subject Re: Queue performance from recent changes
Date Tue, 11 Mar 2008 19:58:28 GMT
Hi David,

the changes you suggested are now in and lazyDispatch can be set by a  
destination policy - its currently on by default

cheers,

Rob
On 6 Mar 2008, at 05:54, David Sitsky wrote:

> Hi Rob,
>
> I know its been a couple of weeks.  I've been using my changes for a  
> while and I see nice CPU and memory usage on the broker, and good  
> messaging performance for my application.  Have you had a chance to  
> try it out?
>
> Cheers,
> David
>
> Rob Davies wrote:
>> Hi David,
>> thanks for the great feedback - will try your patch and see how it  
>> works!
>> cheers,
>> Rob
>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>> Hi Rob,
>>>
>>> I like the new changes, but with the changes as they are, for my  
>>> application for one of my benchmarks, it takes twice as long to  
>>> complete.
>>>
>>> I believe the culprit for this is that when the new code can't  
>>> find a consumer which is not full, the broker chooses the consumer  
>>> with the lowest dispatch queue size.
>>>
>>> In my application, since I have a prefetch size of 1, and have  
>>> longish-running transactions, the dispatch queue size is not  
>>> indicative of the current load for that consumer.  As a result, I  
>>> think this is what is responsible for poor load-balancing in my  
>>> case.
>>>
>>> For applications which commit() after each processed message, I am  
>>> sure this wouldn't be the case.  In some ways, reverting to the  
>>> old behaviour of adding the pending message to all consumers might  
>>> lead to better load balancing with this code.
>>>
>>> However - I think it is better if the consumers can decide when  
>>> they want more messages rather than the broker pushing messages at  
>>> them? I've attached a patch which demonstrates this.  When  
>>> LAZY_DISPATCH is set to true (set via a system property for now  
>>> for testing purposes) this changes the behaviour slightly.
>>>
>>> The basic idea is pageInMessages() only pages in the minimum  
>>> number of messages that can be dispatched immediately to non-full  
>>> consumers. Whenever a consumer acks a message, which updates its  
>>> prefetch size, we make sure Queue.wakeup() is called so that the  
>>> consumer will receive new messages.
>>>
>>> With this change in effect - I see slightly faster or almost the  
>>> same times with the previous benchmark.  However memory usage on  
>>> the broker is far better, as the pending queues for each consumer  
>>> is either 0 or very small.
>>>
>>> What do you think?  I guess there are better ways of doing this.
>>>
>>> I am doing a large overnight run with 16 consumers, so we'll see  
>>> how the  performance goes.
>>>
>>> You'll also notice in the patch, that in Queue.addSubscriber(), I  
>>> thought there didn't seem to be any need for adding a message to a  
>>> new consumer if the message has already been locked by another  
>>> consumer?
>>>
>>> Cheers,
>>> David
>>>
>>> Rob Davies wrote:
>>>> Hi David,
>>>> please let us know if these changes helps/hinders your app!
>>>> cheers,
>>>> Rob
>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>> If what I said above is true, then the immediately above if 

>>>>>>> statement needs to be moved outside its enclosing if -  
>>>>>>> otherwise it only gets executed when targets != null.  We'd 

>>>>>>> want this to execute if we found a matching target wouldn't we?
>>>>>> Don't think so? We only want the message going to  one  
>>>>>> subscription? I may have misunderstood what you mean!
>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>
>>>>> Cheers,
>>>>> David
>>>>>
>>>
>>>
>>> -- 
>>> Cheers,
>>> David
>>>
>>> Nuix Pty Ltd
>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2  
>>> 9280 0699
>>> Web: http://www.nuix.com                            Fax: +61 2  
>>> 9212 6902
>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/ 
>>> region/PrefetchSubscription.java
>>> ===================================================================
>>> --- activemq-core/src/main/java/org/apache/activemq/broker/region/ 
>>> PrefetchSubscription.java    (revision 628917)
>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/ 
>>> PrefetchSubscription.java    (working copy)
>>> @@ -160,6 +160,8 @@
>>>    public  void acknowledge(final ConnectionContext context,final  
>>> MessageAck ack) throws Exception {
>>>        // Handle the standard acknowledgment case.
>>>        boolean callDispatchMatched = false;
>>> +    Queue queue = null;
>>> +           synchronized(dispatchLock) {
>>>            if (ack.isStandardAck()) {
>>>                // Acknowledge all dispatched messages up till the  
>>> message id of
>>> @@ -223,8 +225,12 @@
>>>                                prefetchExtension = Math.max(0,
>>>                                        prefetchExtension - (index  
>>> + 1));
>>>                            }
>>> +                if (queue == null)
>>> +                {
>>> +                queue = (Queue)node.getRegionDestination();
>>> +                }
>>>                            callDispatchMatched = true;
>>> -                            break;
>>> +                break;
>>>                        }
>>>                    }
>>>                }
>>> @@ -253,6 +259,10 @@
>>>                    if  
>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>                        prefetchExtension =  
>>> Math.max(prefetchExtension,
>>>                                index + 1);
>>> +                        if (queue == null)
>>> +                        {
>>> +                            queue =  
>>> (Queue)node.getRegionDestination();
>>> +                        }
>>>                        callDispatchMatched = true;
>>>                        break;
>>>                    }
>>> @@ -279,6 +289,10 @@
>>>                    if (inAckRange) {
>>>                        node.incrementRedeliveryCounter();
>>>                        if  
>>> (ack.getLastMessageId().equals(messageId)) {
>>> +                if (queue == null)
>>> +                {
>>> +                queue = (Queue)node.getRegionDestination();
>>> +                }
>>>                            callDispatchMatched = true;
>>>                            break;
>>>                        }
>>> @@ -320,6 +334,10 @@
>>>                        if  
>>> (ack.getLastMessageId().equals(messageId)) {
>>>                            prefetchExtension = Math.max(0,  
>>> prefetchExtension
>>>                                    - (index + 1));
>>> +                if (queue == null)
>>> +                {
>>> +                queue = (Queue)node.getRegionDestination();
>>> +                }
>>>                            callDispatchMatched = true;
>>>                            break;
>>>                        }
>>> @@ -336,6 +354,9 @@
>>>            }
>>>        }
>>>        if (callDispatchMatched) {
>>> +        if (Queue.LAZY_DISPATCH) {
>>> +        queue.wakeup();
>>> +        }
>>>            dispatchPending();
>>>        } else {
>>>            if (isSlave()) {
>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/ 
>>> region/Queue.java
>>> ===================================================================
>>> --- activemq-core/src/main/java/org/apache/activemq/broker/region/ 
>>> Queue.java    (revision 628917)
>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/ 
>>> Queue.java    (working copy)
>>> @@ -75,6 +75,8 @@
>>> * @version $Revision: 1.28 $
>>> */
>>> public class Queue extends BaseDestination implements Task {
>>> +    public static final boolean LAZY_DISPATCH =
>>> +     
>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch",  
>>> "true"));
>>>    private final Log log;
>>>    private final List<Subscription> consumers = new  
>>> ArrayList<Subscription>(50);
>>>    private PendingMessageCursor messages;
>>> @@ -212,12 +214,12 @@
>>>            synchronized (pagedInMessages) {
>>>                // Add all the matching messages in the queue to the
>>>                // subscription.
>>> -
>>>                for (Iterator<MessageReference> i =  
>>> pagedInMessages.values()
>>>                        .iterator(); i.hasNext();) {
>>>                    QueueMessageReference node =  
>>> (QueueMessageReference) i
>>>                            .next();
>>> -                    if (!node.isDropped() && !node.isAcked() &&
(! 
>>> node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>> +                    if ((!node.isDropped() ||  
>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>> +            node.getLockOwner() == null) {
>>>                        msgContext.setMessageReference(node);
>>>                        if (sub.matches(node, msgContext)) {
>>>                            sub.add(node);
>>> @@ -940,7 +945,11 @@
>>>        dispatchLock.lock();
>>>        try{
>>>
>>> -            final int toPageIn = getMaxPageSize() -  
>>> pagedInMessages.size();
>>> +            int toPageIn = getMaxPageSize() -  
>>> pagedInMessages.size();
>>> +        if (LAZY_DISPATCH) {
>>> +        // Only page in the minimum number of messages which can  
>>> be dispatched immediately.
>>> +        toPageIn = Math.min(getConsumerMessageCountBeforeFull(),  
>>> toPageIn);
>>> +        }
>>>            if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>>                messages.setMaxBatchSize(toPageIn);
>>>                int count = 0;
>>> @@ -976,12 +985,25 @@
>>>        }
>>>        return result;
>>>    }
>>> +
>>> +    private int getConsumerMessageCountBeforeFull() throws  
>>> Exception {
>>> +    int total = 0;
>>> +        synchronized (consumers) {
>>> +            for (Subscription s : consumers) {
>>> +        if (s instanceof PrefetchSubscription) {
>>> +            total += ((PrefetchSubscription)s).countBeforeFull();
>>> +        }
>>> +        }
>>> +    }
>>> +    return total;
>>> +    }
>>>
>>>    private void doDispatch(List<MessageReference> list) throws  
>>> Exception {
>>>
>>>        if (list != null) {
>>>            synchronized (consumers) {
>>>                for (MessageReference node : list) {
>>> +
>>>                    Subscription target = null;
>>>                    List<Subscription> targets = null;
>>>                    for (Subscription s : consumers) {
>
>
> -- 
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280  
> 0699
> Web: http://www.nuix.com                            Fax: +61 2 9212  
> 6902


Mime
View raw message