activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Sitsky <s...@nuix.com>
Subject Re: Queue performance from recent changes
Date Thu, 06 Mar 2008 22:48:28 GMT
I am sure it will be application-dependent, so making it a policy makes 
a lot of sense.  For my application, I only have a pending size of 1 
since each work item's processing requirements can vary tremendously.

Just curious - what kind of benchmarks did you run this against?  I'm 
curious to know what kind of performance degregation you saw.. it would 
be interesting to understand why.  I am using non-persistent messaging, 
so perhaps that could make a difference, since I am only paging a small 
number of messages in at a time.

Cheers,
David

Rob Davies wrote:
> Hi David,
> 
> Yes - actually - I tried it a few days ago. I haven't committed it 
> because message throughput is generally lower. I will look at making it 
> optional via a destination policy
> 
> cheers,
> 
> Rob
> On 6 Mar 2008, at 05:54, David Sitsky wrote:
> 
>> Hi Rob,
>>
>> I know its been a couple of weeks.  I've been using my changes for a 
>> while and I see nice CPU and memory usage on the broker, and good 
>> messaging performance for my application.  Have you had a chance to 
>> try it out?
>>
>> Cheers,
>> David
>>
>> Rob Davies wrote:
>>> Hi David,
>>> thanks for the great feedback - will try your patch and see how it 
>>> works!
>>> cheers,
>>> Rob
>>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>>> Hi Rob,
>>>>
>>>> I like the new changes, but with the changes as they are, for my 
>>>> application for one of my benchmarks, it takes twice as long to 
>>>> complete.
>>>>
>>>> I believe the culprit for this is that when the new code can't find 
>>>> a consumer which is not full, the broker chooses the consumer with 
>>>> the lowest dispatch queue size.
>>>>
>>>> In my application, since I have a prefetch size of 1, and have 
>>>> longish-running transactions, the dispatch queue size is not 
>>>> indicative of the current load for that consumer.  As a result, I 
>>>> think this is what is responsible for poor load-balancing in my case.
>>>>
>>>> For applications which commit() after each processed message, I am 
>>>> sure this wouldn't be the case.  In some ways, reverting to the old 
>>>> behaviour of adding the pending message to all consumers might lead 
>>>> to better load balancing with this code.
>>>>
>>>> However - I think it is better if the consumers can decide when they 
>>>> want more messages rather than the broker pushing messages at them? 
>>>> I've attached a patch which demonstrates this.  When LAZY_DISPATCH 
>>>> is set to true (set via a system property for now for testing 
>>>> purposes) this changes the behaviour slightly.
>>>>
>>>> The basic idea is pageInMessages() only pages in the minimum number 
>>>> of messages that can be dispatched immediately to non-full 
>>>> consumers. Whenever a consumer acks a message, which updates its 
>>>> prefetch size, we make sure Queue.wakeup() is called so that the 
>>>> consumer will receive new messages.
>>>>
>>>> With this change in effect - I see slightly faster or almost the 
>>>> same times with the previous benchmark.  However memory usage on the 
>>>> broker is far better, as the pending queues for each consumer is 
>>>> either 0 or very small.
>>>>
>>>> What do you think?  I guess there are better ways of doing this.
>>>>
>>>> I am doing a large overnight run with 16 consumers, so we'll see how 
>>>> the  performance goes.
>>>>
>>>> You'll also notice in the patch, that in Queue.addSubscriber(), I 
>>>> thought there didn't seem to be any need for adding a message to a 
>>>> new consumer if the message has already been locked by another 
>>>> consumer?
>>>>
>>>> Cheers,
>>>> David
>>>>
>>>> Rob Davies wrote:
>>>>> Hi David,
>>>>> please let us know if these changes helps/hinders your app!
>>>>> cheers,
>>>>> Rob
>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>>> If what I said above is true, then the immediately above
if 
>>>>>>>> statement needs to be moved outside its enclosing if - otherwise

>>>>>>>> it only gets executed when targets != null.  We'd want this
to 
>>>>>>>> execute if we found a matching target wouldn't we?
>>>>>>> Don't think so? We only want the message going to  one 
>>>>>>> subscription? I may have misunderstood what you mean!
>>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>>
>>>>>> Cheers,
>>>>>> David
>>>>>>
>>>>
>>>>
>>>> -- 
>>>> Cheers,
>>>> David
>>>>
>>>> Nuix Pty Ltd
>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 
>>>> 0699
>>>> Web: http://www.nuix.com                            Fax: +61 2 9212 
>>>> 6902
>>>> Index: 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java

>>>>
>>>> ===================================================================
>>>> --- 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
   
>>>> (revision 628917)
>>>> +++ 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
   
>>>> (working copy)
>>>> @@ -160,6 +160,8 @@
>>>>    public  void acknowledge(final ConnectionContext context,final 
>>>> MessageAck ack) throws Exception {
>>>>        // Handle the standard acknowledgment case.
>>>>        boolean callDispatchMatched = false;
>>>> +    Queue queue = null;
>>>> +           synchronized(dispatchLock) {
>>>>            if (ack.isStandardAck()) {
>>>>                // Acknowledge all dispatched messages up till the 
>>>> message id of
>>>> @@ -223,8 +225,12 @@
>>>>                                prefetchExtension = Math.max(0,
>>>>                                        prefetchExtension - (index + 
>>>> 1));
>>>>                            }
>>>> +                if (queue == null)
>>>> +                {
>>>> +                queue = (Queue)node.getRegionDestination();
>>>> +                }
>>>>                            callDispatchMatched = true;
>>>> -                            break;
>>>> +                break;
>>>>                        }
>>>>                    }
>>>>                }
>>>> @@ -253,6 +259,10 @@
>>>>                    if 
>>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>>                        prefetchExtension = Math.max(prefetchExtension,
>>>>                                index + 1);
>>>> +                        if (queue == null)
>>>> +                        {
>>>> +                            queue = 
>>>> (Queue)node.getRegionDestination();
>>>> +                        }
>>>>                        callDispatchMatched = true;
>>>>                        break;
>>>>                    }
>>>> @@ -279,6 +289,10 @@
>>>>                    if (inAckRange) {
>>>>                        node.incrementRedeliveryCounter();
>>>>                        if (ack.getLastMessageId().equals(messageId)) {
>>>> +                if (queue == null)
>>>> +                {
>>>> +                queue = (Queue)node.getRegionDestination();
>>>> +                }
>>>>                            callDispatchMatched = true;
>>>>                            break;
>>>>                        }
>>>> @@ -320,6 +334,10 @@
>>>>                        if (ack.getLastMessageId().equals(messageId)) {
>>>>                            prefetchExtension = Math.max(0, 
>>>> prefetchExtension
>>>>                                    - (index + 1));
>>>> +                if (queue == null)
>>>> +                {
>>>> +                queue = (Queue)node.getRegionDestination();
>>>> +                }
>>>>                            callDispatchMatched = true;
>>>>                            break;
>>>>                        }
>>>> @@ -336,6 +354,9 @@
>>>>            }
>>>>        }
>>>>        if (callDispatchMatched) {
>>>> +        if (Queue.LAZY_DISPATCH) {
>>>> +        queue.wakeup();
>>>> +        }
>>>>            dispatchPending();
>>>>        } else {
>>>>            if (isSlave()) {
>>>> Index: 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

>>>>
>>>> ===================================================================
>>>> --- 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
   
>>>> (revision 628917)
>>>> +++ 
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
   
>>>> (working copy)
>>>> @@ -75,6 +75,8 @@
>>>> * @version $Revision: 1.28 $
>>>> */
>>>> public class Queue extends BaseDestination implements Task {
>>>> +    public static final boolean LAZY_DISPATCH =
>>>> +    
>>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch", 
>>>> "true"));
>>>>    private final Log log;
>>>>    private final List<Subscription> consumers = new 
>>>> ArrayList<Subscription>(50);
>>>>    private PendingMessageCursor messages;
>>>> @@ -212,12 +214,12 @@
>>>>            synchronized (pagedInMessages) {
>>>>                // Add all the matching messages in the queue to the
>>>>                // subscription.
>>>> -
>>>>                for (Iterator<MessageReference> i = 
>>>> pagedInMessages.values()
>>>>                        .iterator(); i.hasNext();) {
>>>>                    QueueMessageReference node = 
>>>> (QueueMessageReference) i
>>>>                            .next();
>>>> -                    if (!node.isDropped() && !node.isAcked() &&

>>>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>>> +                    if ((!node.isDropped() || 
>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>>> +            node.getLockOwner() == null) {
>>>>                        msgContext.setMessageReference(node);
>>>>                        if (sub.matches(node, msgContext)) {
>>>>                            sub.add(node);
>>>> @@ -940,7 +945,11 @@
>>>>        dispatchLock.lock();
>>>>        try{
>>>>
>>>> -            final int toPageIn = getMaxPageSize() - 
>>>> pagedInMessages.size();
>>>> +            int toPageIn = getMaxPageSize() - pagedInMessages.size();
>>>> +        if (LAZY_DISPATCH) {
>>>> +        // Only page in the minimum number of messages which can be 
>>>> dispatched immediately.
>>>> +        toPageIn = Math.min(getConsumerMessageCountBeforeFull(), 
>>>> toPageIn);
>>>> +        }
>>>>            if ((force || !consumers.isEmpty()) && toPageIn > 0)
{
>>>>                messages.setMaxBatchSize(toPageIn);
>>>>                int count = 0;
>>>> @@ -976,12 +985,25 @@
>>>>        }
>>>>        return result;
>>>>    }
>>>> +
>>>> +    private int getConsumerMessageCountBeforeFull() throws Exception {
>>>> +    int total = 0;
>>>> +        synchronized (consumers) {
>>>> +            for (Subscription s : consumers) {
>>>> +        if (s instanceof PrefetchSubscription) {
>>>> +            total += ((PrefetchSubscription)s).countBeforeFull();
>>>> +        }
>>>> +        }
>>>> +    }
>>>> +    return total;
>>>> +    }
>>>>
>>>>    private void doDispatch(List<MessageReference> list) throws 
>>>> Exception {
>>>>
>>>>        if (list != null) {
>>>>            synchronized (consumers) {
>>>>                for (MessageReference node : list) {
>>>> +
>>>>                    Subscription target = null;
>>>>                    List<Subscription> targets = null;
>>>>                    for (Subscription s : consumers) {
>>
>>
>> -- 
>> Cheers,
>> David
>>
>> Nuix Pty Ltd
>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
>> Web: http://www.nuix.com                            Fax: +61 2 9212 6902


-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Mime
View raw message