From dev-return-9779-apmail-activemq-dev-archive=activemq.apache.org@activemq.apache.org Mon Mar 10 06:07:58 2008 Return-Path: Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: (qmail 10671 invoked from network); 10 Mar 2008 06:07:58 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 10 Mar 2008 06:07:58 -0000 Received: (qmail 95987 invoked by uid 500); 10 Mar 2008 06:07:54 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 95956 invoked by uid 500); 10 Mar 2008 06:07:54 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 95947 invoked by uid 99); 10 Mar 2008 06:07:54 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 09 Mar 2008 23:07:54 -0700 X-ASF-Spam-Status: No, hits=-0.0 required=10.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: local policy) Received: from [203.217.22.128] (HELO web1.nuix.com.au) (203.217.22.128) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Mar 2008 06:07:17 +0000 Received: from [192.168.222.61] (unknown [192.168.222.61]) by web1.nuix.com.au (Postfix) with ESMTP id 94FD71DF101 for ; Mon, 10 Mar 2008 17:06:39 +1100 (EST) Message-ID: <47D4D02B.4060509@nuix.com> Date: Mon, 10 Mar 2008 17:07:39 +1100 From: David Sitsky Organization: NUIX Pty Ltd User-Agent: Thunderbird 2.0.0.12 (Windows/20080213) MIME-Version: 1.0 To: dev@activemq.apache.org Subject: Re: Queue performance from recent changes References: <20080218094428.843041A9832@eris.apache.org> <47BA6480.4020604@nuix.com> <2C159E97-AD73-4BB1-879C-0505B6B6E801@gmail.com> <47BA9424.9040701@nuix.com> <93C79F5E-8F0C-4B65-B7A9-21687FD00CD0@gmail.com> <47BBC94B.8010302@nuix.com> <47CF8732.7020600@nuix.com> <0AA2DC4F-55F6-400B-BD40-FE17472C1A7B@gmail.com> <47D074BC.2010407@nuix.com> <9232BEE7-FAB1-41C9-B509-229EC2351B7D@gmail.com> In-Reply-To: <9232BEE7-FAB1-41C9-B509-229EC2351B7D@gmail.com> Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit X-Virus-Checked: Checked by ClamAV on apache.org Hi Rob, I'll give it a go in the next day or two when I get some spare time. From what I can see, we call iterate() inline rather than delegating it to the dedicated "wakeup" thread when optimizeDispatch is set. From memory, this caused contention with synchronisation blocks last time - as many consumers would end up calling iterate(). Would this still not be an issue? I must admit I'd need to check the code closely. Cheers, David Rob Davies wrote: > David, > > you might like to try enabling the optimizeDispatch property on the > Destination policy map - see > http://activemq.apache.org/configure-version-5-brokers.html from trunk, > if you are using non-persistent messages > > cheers, > > Rob > On 6 Mar 2008, at 22:48, David Sitsky wrote: > >> I am sure it will be application-dependent, so making it a policy >> makes a lot of sense. For my application, I only have a pending size >> of 1 since each work item's processing requirements can vary >> tremendously. >> >> Just curious - what kind of benchmarks did you run this against? I'm >> curious to know what kind of performance degregation you saw.. it >> would be interesting to understand why. I am using non-persistent >> messaging, so perhaps that could make a difference, since I am only >> paging a small number of messages in at a time. >> >> Cheers, >> David >> >> Rob Davies wrote: >>> Hi David, >>> Yes - actually - I tried it a few days ago. I haven't committed it >>> because message throughput is generally lower. I will look at making >>> it optional via a destination policy >>> cheers, >>> Rob >>> On 6 Mar 2008, at 05:54, David Sitsky wrote: >>>> Hi Rob, >>>> >>>> I know its been a couple of weeks. I've been using my changes for a >>>> while and I see nice CPU and memory usage on the broker, and good >>>> messaging performance for my application. Have you had a chance to >>>> try it out? >>>> >>>> Cheers, >>>> David >>>> >>>> Rob Davies wrote: >>>>> Hi David, >>>>> thanks for the great feedback - will try your patch and see how it >>>>> works! >>>>> cheers, >>>>> Rob >>>>> On 20 Feb 2008, at 06:31, David Sitsky wrote: >>>>>> Hi Rob, >>>>>> >>>>>> I like the new changes, but with the changes as they are, for my >>>>>> application for one of my benchmarks, it takes twice as long to >>>>>> complete. >>>>>> >>>>>> I believe the culprit for this is that when the new code can't >>>>>> find a consumer which is not full, the broker chooses the consumer >>>>>> with the lowest dispatch queue size. >>>>>> >>>>>> In my application, since I have a prefetch size of 1, and have >>>>>> longish-running transactions, the dispatch queue size is not >>>>>> indicative of the current load for that consumer. As a result, I >>>>>> think this is what is responsible for poor load-balancing in my case. >>>>>> >>>>>> For applications which commit() after each processed message, I am >>>>>> sure this wouldn't be the case. In some ways, reverting to the >>>>>> old behaviour of adding the pending message to all consumers might >>>>>> lead to better load balancing with this code. >>>>>> >>>>>> However - I think it is better if the consumers can decide when >>>>>> they want more messages rather than the broker pushing messages at >>>>>> them? I've attached a patch which demonstrates this. When >>>>>> LAZY_DISPATCH is set to true (set via a system property for now >>>>>> for testing purposes) this changes the behaviour slightly. >>>>>> >>>>>> The basic idea is pageInMessages() only pages in the minimum >>>>>> number of messages that can be dispatched immediately to non-full >>>>>> consumers. Whenever a consumer acks a message, which updates its >>>>>> prefetch size, we make sure Queue.wakeup() is called so that the >>>>>> consumer will receive new messages. >>>>>> >>>>>> With this change in effect - I see slightly faster or almost the >>>>>> same times with the previous benchmark. However memory usage on >>>>>> the broker is far better, as the pending queues for each consumer >>>>>> is either 0 or very small. >>>>>> >>>>>> What do you think? I guess there are better ways of doing this. >>>>>> >>>>>> I am doing a large overnight run with 16 consumers, so we'll see >>>>>> how the performance goes. >>>>>> >>>>>> You'll also notice in the patch, that in Queue.addSubscriber(), I >>>>>> thought there didn't seem to be any need for adding a message to a >>>>>> new consumer if the message has already been locked by another >>>>>> consumer? >>>>>> >>>>>> Cheers, >>>>>> David >>>>>> >>>>>> Rob Davies wrote: >>>>>>> Hi David, >>>>>>> please let us know if these changes helps/hinders your app! >>>>>>> cheers, >>>>>>> Rob >>>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote: >>>>>>>>>> If what I said above is true, then the immediately above if >>>>>>>>>> statement needs to be moved outside its enclosing if - >>>>>>>>>> otherwise it only gets executed when targets != null. We'd >>>>>>>>>> want this to execute if we found a matching target wouldn't we? >>>>>>>>> Don't think so? We only want the message going to one >>>>>>>>> subscription? I may have misunderstood what you mean! >>>>>>>> Yes - ignore what I said, I had my wires crossed. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> David >>>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Cheers, >>>>>> David >>>>>> >>>>>> Nuix Pty Ltd >>>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 >>>>>> 9280 0699 >>>>>> Web: http://www.nuix.com Fax: +61 2 >>>>>> 9212 6902 >>>>>> Index: >>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java >>>>>> >>>>>> =================================================================== >>>>>> --- >>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java >>>>>> (revision 628917) >>>>>> +++ >>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java >>>>>> (working copy) >>>>>> @@ -160,6 +160,8 @@ >>>>>> public void acknowledge(final ConnectionContext context,final >>>>>> MessageAck ack) throws Exception { >>>>>> // Handle the standard acknowledgment case. >>>>>> boolean callDispatchMatched = false; >>>>>> + Queue queue = null; >>>>>> + synchronized(dispatchLock) { >>>>>> if (ack.isStandardAck()) { >>>>>> // Acknowledge all dispatched messages up till the >>>>>> message id of >>>>>> @@ -223,8 +225,12 @@ >>>>>> prefetchExtension = Math.max(0, >>>>>> prefetchExtension - (index + >>>>>> 1)); >>>>>> } >>>>>> + if (queue == null) >>>>>> + { >>>>>> + queue = (Queue)node.getRegionDestination(); >>>>>> + } >>>>>> callDispatchMatched = true; >>>>>> - break; >>>>>> + break; >>>>>> } >>>>>> } >>>>>> } >>>>>> @@ -253,6 +259,10 @@ >>>>>> if >>>>>> (ack.getLastMessageId().equals(node.getMessageId())) { >>>>>> prefetchExtension = Math.max(prefetchExtension, >>>>>> index + 1); >>>>>> + if (queue == null) >>>>>> + { >>>>>> + queue = >>>>>> (Queue)node.getRegionDestination(); >>>>>> + } >>>>>> callDispatchMatched = true; >>>>>> break; >>>>>> } >>>>>> @@ -279,6 +289,10 @@ >>>>>> if (inAckRange) { >>>>>> node.incrementRedeliveryCounter(); >>>>>> if (ack.getLastMessageId().equals(messageId)) { >>>>>> + if (queue == null) >>>>>> + { >>>>>> + queue = (Queue)node.getRegionDestination(); >>>>>> + } >>>>>> callDispatchMatched = true; >>>>>> break; >>>>>> } >>>>>> @@ -320,6 +334,10 @@ >>>>>> if (ack.getLastMessageId().equals(messageId)) { >>>>>> prefetchExtension = Math.max(0, >>>>>> prefetchExtension >>>>>> - (index + 1)); >>>>>> + if (queue == null) >>>>>> + { >>>>>> + queue = (Queue)node.getRegionDestination(); >>>>>> + } >>>>>> callDispatchMatched = true; >>>>>> break; >>>>>> } >>>>>> @@ -336,6 +354,9 @@ >>>>>> } >>>>>> } >>>>>> if (callDispatchMatched) { >>>>>> + if (Queue.LAZY_DISPATCH) { >>>>>> + queue.wakeup(); >>>>>> + } >>>>>> dispatchPending(); >>>>>> } else { >>>>>> if (isSlave()) { >>>>>> Index: >>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java >>>>>> >>>>>> =================================================================== >>>>>> --- >>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java >>>>>> (revision 628917) >>>>>> +++ >>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java >>>>>> (working copy) >>>>>> @@ -75,6 +75,8 @@ >>>>>> * @version $Revision: 1.28 $ >>>>>> */ >>>>>> public class Queue extends BaseDestination implements Task { >>>>>> + public static final boolean LAZY_DISPATCH = >>>>>> + >>>>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch", >>>>>> "true")); >>>>>> private final Log log; >>>>>> private final List consumers = new >>>>>> ArrayList(50); >>>>>> private PendingMessageCursor messages; >>>>>> @@ -212,12 +214,12 @@ >>>>>> synchronized (pagedInMessages) { >>>>>> // Add all the matching messages in the queue to the >>>>>> // subscription. >>>>>> - >>>>>> for (Iterator i = >>>>>> pagedInMessages.values() >>>>>> .iterator(); i.hasNext();) { >>>>>> QueueMessageReference node = >>>>>> (QueueMessageReference) i >>>>>> .next(); >>>>>> - if (!node.isDropped() && !node.isAcked() && >>>>>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) { >>>>>> + if ((!node.isDropped() || >>>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() && >>>>>> + node.getLockOwner() == null) { >>>>>> msgContext.setMessageReference(node); >>>>>> if (sub.matches(node, msgContext)) { >>>>>> sub.add(node); >>>>>> @@ -940,7 +945,11 @@ >>>>>> dispatchLock.lock(); >>>>>> try{ >>>>>> >>>>>> - final int toPageIn = getMaxPageSize() - >>>>>> pagedInMessages.size(); >>>>>> + int toPageIn = getMaxPageSize() - >>>>>> pagedInMessages.size(); >>>>>> + if (LAZY_DISPATCH) { >>>>>> + // Only page in the minimum number of messages which can >>>>>> be dispatched immediately. >>>>>> + toPageIn = Math.min(getConsumerMessageCountBeforeFull(), >>>>>> toPageIn); >>>>>> + } >>>>>> if ((force || !consumers.isEmpty()) && toPageIn > 0) { >>>>>> messages.setMaxBatchSize(toPageIn); >>>>>> int count = 0; >>>>>> @@ -976,12 +985,25 @@ >>>>>> } >>>>>> return result; >>>>>> } >>>>>> + >>>>>> + private int getConsumerMessageCountBeforeFull() throws >>>>>> Exception { >>>>>> + int total = 0; >>>>>> + synchronized (consumers) { >>>>>> + for (Subscription s : consumers) { >>>>>> + if (s instanceof PrefetchSubscription) { >>>>>> + total += ((PrefetchSubscription)s).countBeforeFull(); >>>>>> + } >>>>>> + } >>>>>> + } >>>>>> + return total; >>>>>> + } >>>>>> >>>>>> private void doDispatch(List list) throws >>>>>> Exception { >>>>>> >>>>>> if (list != null) { >>>>>> synchronized (consumers) { >>>>>> for (MessageReference node : list) { >>>>>> + >>>>>> Subscription target = null; >>>>>> List targets = null; >>>>>> for (Subscription s : consumers) { >>>> >>>> >>>> -- >>>> Cheers, >>>> David >>>> >>>> Nuix Pty Ltd >>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 >>>> 0699 >>>> Web: http://www.nuix.com Fax: +61 2 9212 >>>> 6902 >> >> >> -- >> Cheers, >> David >> >> Nuix Pty Ltd >> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699 >> Web: http://www.nuix.com Fax: +61 2 9212 6902 > > -- Cheers, David Nuix Pty Ltd Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699 Web: http://www.nuix.com Fax: +61 2 9212 6902