activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Sitsky <s...@nuix.com>
Subject Re: Queue performance from recent changes
Date Thu, 06 Mar 2008 05:54:58 GMT
Hi Rob,

I know its been a couple of weeks.  I've been using my changes for a 
while and I see nice CPU and memory usage on the broker, and good 
messaging performance for my application.  Have you had a chance to try 
it out?

Cheers,
David

Rob Davies wrote:
> Hi David,
> 
> thanks for the great feedback - will try your patch and see how it works!
> 
> cheers,
> 
> Rob
> On 20 Feb 2008, at 06:31, David Sitsky wrote:
> 
>> Hi Rob,
>>
>> I like the new changes, but with the changes as they are, for my 
>> application for one of my benchmarks, it takes twice as long to complete.
>>
>> I believe the culprit for this is that when the new code can't find a 
>> consumer which is not full, the broker chooses the consumer with the 
>> lowest dispatch queue size.
>>
>> In my application, since I have a prefetch size of 1, and have 
>> longish-running transactions, the dispatch queue size is not 
>> indicative of the current load for that consumer.  As a result, I 
>> think this is what is responsible for poor load-balancing in my case.
>>
>> For applications which commit() after each processed message, I am 
>> sure this wouldn't be the case.  In some ways, reverting to the old 
>> behaviour of adding the pending message to all consumers might lead to 
>> better load balancing with this code.
>>
>> However - I think it is better if the consumers can decide when they 
>> want more messages rather than the broker pushing messages at them? 
>> I've attached a patch which demonstrates this.  When LAZY_DISPATCH is 
>> set to true (set via a system property for now for testing purposes) 
>> this changes the behaviour slightly.
>>
>> The basic idea is pageInMessages() only pages in the minimum number of 
>> messages that can be dispatched immediately to non-full consumers. 
>> Whenever a consumer acks a message, which updates its prefetch size, 
>> we make sure Queue.wakeup() is called so that the consumer will 
>> receive new messages.
>>
>> With this change in effect - I see slightly faster or almost the same 
>> times with the previous benchmark.  However memory usage on the broker 
>> is far better, as the pending queues for each consumer is either 0 or 
>> very small.
>>
>> What do you think?  I guess there are better ways of doing this.
>>
>> I am doing a large overnight run with 16 consumers, so we'll see how 
>> the  performance goes.
>>
>> You'll also notice in the patch, that in Queue.addSubscriber(), I 
>> thought there didn't seem to be any need for adding a message to a new 
>> consumer if the message has already been locked by another consumer?
>>
>> Cheers,
>> David
>>
>> Rob Davies wrote:
>>> Hi David,
>>> please let us know if these changes helps/hinders your app!
>>> cheers,
>>> Rob
>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>> If what I said above is true, then the immediately above if 
>>>>>> statement needs to be moved outside its enclosing if - otherwise

>>>>>> it only gets executed when targets != null.  We'd want this to 
>>>>>> execute if we found a matching target wouldn't we?
>>>>> Don't think so? We only want the message going to  one 
>>>>> subscription? I may have misunderstood what you mean!
>>>> Yes - ignore what I said, I had my wires crossed.
>>>>
>>>> Cheers,
>>>> David
>>>>
>>
>>
>> -- 
>> Cheers,
>> David
>>
>> Nuix Pty Ltd
>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
>> Web: http://www.nuix.com                            Fax: +61 2 9212 6902
>> Index: 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java

>>
>> ===================================================================
>> --- 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
   
>> (revision 628917)
>> +++ 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
   
>> (working copy)
>> @@ -160,6 +160,8 @@
>>     public  void acknowledge(final ConnectionContext context,final 
>> MessageAck ack) throws Exception {
>>         // Handle the standard acknowledgment case.
>>         boolean callDispatchMatched = false;
>> +    Queue queue = null;
>> +   
>>         synchronized(dispatchLock) {
>>             if (ack.isStandardAck()) {
>>                 // Acknowledge all dispatched messages up till the 
>> message id of
>> @@ -223,8 +225,12 @@
>>                                 prefetchExtension = Math.max(0,
>>                                         prefetchExtension - (index + 1));
>>                             }
>> +                if (queue == null)
>> +                {
>> +                queue = (Queue)node.getRegionDestination();
>> +                }
>>                             callDispatchMatched = true;
>> -                            break;
>> +                break;
>>                         }
>>                     }
>>                 }
>> @@ -253,6 +259,10 @@
>>                     if 
>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>                         prefetchExtension = Math.max(prefetchExtension,
>>                                 index + 1);
>> +                        if (queue == null)
>> +                        {
>> +                            queue = (Queue)node.getRegionDestination();
>> +                        }
>>                         callDispatchMatched = true;
>>                         break;
>>                     }
>> @@ -279,6 +289,10 @@
>>                     if (inAckRange) {
>>                         node.incrementRedeliveryCounter();
>>                         if (ack.getLastMessageId().equals(messageId)) {
>> +                if (queue == null)
>> +                {
>> +                queue = (Queue)node.getRegionDestination();
>> +                }
>>                             callDispatchMatched = true;
>>                             break;
>>                         }
>> @@ -320,6 +334,10 @@
>>                         if (ack.getLastMessageId().equals(messageId)) {
>>                             prefetchExtension = Math.max(0, 
>> prefetchExtension
>>                                     - (index + 1));
>> +                if (queue == null)
>> +                {
>> +                queue = (Queue)node.getRegionDestination();
>> +                }
>>                             callDispatchMatched = true;
>>                             break;
>>                         }
>> @@ -336,6 +354,9 @@
>>             }
>>         }
>>         if (callDispatchMatched) {
>> +        if (Queue.LAZY_DISPATCH) {
>> +        queue.wakeup();
>> +        }
>>             dispatchPending();
>>         } else {
>>             if (isSlave()) {
>> Index: 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>> ===================================================================
>> --- 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java    
>> (revision 628917)
>> +++ 
>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java    
>> (working copy)
>> @@ -75,6 +75,8 @@
>>  * @version $Revision: 1.28 $
>>  */
>> public class Queue extends BaseDestination implements Task {
>> +    public static final boolean LAZY_DISPATCH =
>> +    Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch", 
>> "true"));
>>     private final Log log;
>>     private final List<Subscription> consumers = new 
>> ArrayList<Subscription>(50);
>>     private PendingMessageCursor messages;
>> @@ -212,12 +214,12 @@
>>             synchronized (pagedInMessages) {
>>                 // Add all the matching messages in the queue to the
>>                 // subscription.
>> -
>>                 for (Iterator<MessageReference> i = 
>> pagedInMessages.values()
>>                         .iterator(); i.hasNext();) {
>>                     QueueMessageReference node = 
>> (QueueMessageReference) i
>>                             .next();
>> -                    if (!node.isDropped() && !node.isAcked() &&

>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>> +                    if ((!node.isDropped() || 
>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>> +            node.getLockOwner() == null) {
>>                         msgContext.setMessageReference(node);
>>                         if (sub.matches(node, msgContext)) {
>>                             sub.add(node);
>> @@ -940,7 +945,11 @@
>>         dispatchLock.lock();
>>         try{
>>
>> -            final int toPageIn = getMaxPageSize() - 
>> pagedInMessages.size();
>> +            int toPageIn = getMaxPageSize() - pagedInMessages.size();
>> +        if (LAZY_DISPATCH) {
>> +        // Only page in the minimum number of messages which can be 
>> dispatched immediately.
>> +        toPageIn = Math.min(getConsumerMessageCountBeforeFull(), 
>> toPageIn);
>> +        }
>>             if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>                 messages.setMaxBatchSize(toPageIn);
>>                 int count = 0;
>> @@ -976,12 +985,25 @@
>>         }
>>         return result;
>>     }
>> +
>> +    private int getConsumerMessageCountBeforeFull() throws Exception {
>> +    int total = 0;
>> +        synchronized (consumers) {
>> +            for (Subscription s : consumers) {
>> +        if (s instanceof PrefetchSubscription) {
>> +            total += ((PrefetchSubscription)s).countBeforeFull();
>> +        }
>> +        }
>> +    }
>> +    return total;
>> +    }
>>
>>     private void doDispatch(List<MessageReference> list) throws 
>> Exception {
>>
>>         if (list != null) {
>>             synchronized (consumers) {
>>                 for (MessageReference node : list) {
>> +
>>                     Subscription target = null;
>>                     List<Subscription> targets = null;
>>                     for (Subscription s : consumers) {


-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Mime
View raw message