Return-Path: Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: (qmail 66037 invoked from network); 17 Mar 2008 06:10:51 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 17 Mar 2008 06:10:51 -0000 Received: (qmail 74838 invoked by uid 500); 17 Mar 2008 06:10:48 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 74814 invoked by uid 500); 17 Mar 2008 06:10:48 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 74805 invoked by uid 99); 17 Mar 2008 06:10:48 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 16 Mar 2008 23:10:48 -0700 X-ASF-Spam-Status: No, hits=-0.0 required=10.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: local policy) Received: from [203.217.22.128] (HELO web1.nuix.com.au) (203.217.22.128) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Mar 2008 06:09:56 +0000 Received: from [192.168.222.61] (unknown [192.168.222.61]) by web1.nuix.com.au (Postfix) with ESMTP id 517891DF102 for ; Mon, 17 Mar 2008 17:09:23 +1100 (EST) Message-ID: <47DE0B4E.3080301@nuix.com> Date: Mon, 17 Mar 2008 17:10:22 +1100 From: David Sitsky Organization: NUIX Pty Ltd User-Agent: Thunderbird 2.0.0.12 (Windows/20080213) MIME-Version: 1.0 To: dev@activemq.apache.org Subject: Re: Queue performance from recent changes References: <20080218094428.843041A9832@eris.apache.org> <47BA6480.4020604@nuix.com> <2C159E97-AD73-4BB1-879C-0505B6B6E801@gmail.com> <47BA9424.9040701@nuix.com> <93C79F5E-8F0C-4B65-B7A9-21687FD00CD0@gmail.com> <47BBC94B.8010302@nuix.com> <47CF8732.7020600@nuix.com> <47D70C7A.4030204@nuix.com> In-Reply-To: <47D70C7A.4030204@nuix.com> Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit X-Virus-Checked: Checked by ClamAV on apache.org Hi Rob, I've finally had some time to run some benchmarks with the trunk checked out today (637703) and the numbers look great! Many thanks for checking in this code - I'm happy now that I don't have to maintain any of my own private modifications to activemq. Thanks again. Cheers, David David Sitsky wrote: > Many thanks Rob - I'll try and do a fresh checkout today and let you > know how the performance looks using my standard benchmarks. > > Cheers, > David > > Rob Davies wrote: >> Hi David, >> >> the changes you suggested are now in and lazyDispatch can be set by a >> destination policy - its currently on by default >> >> cheers, >> >> Rob >> On 6 Mar 2008, at 05:54, David Sitsky wrote: >> >>> Hi Rob, >>> >>> I know its been a couple of weeks. I've been using my changes for a >>> while and I see nice CPU and memory usage on the broker, and good >>> messaging performance for my application. Have you had a chance to >>> try it out? >>> >>> Cheers, >>> David >>> >>> Rob Davies wrote: >>>> Hi David, >>>> thanks for the great feedback - will try your patch and see how it >>>> works! >>>> cheers, >>>> Rob >>>> On 20 Feb 2008, at 06:31, David Sitsky wrote: >>>>> Hi Rob, >>>>> >>>>> I like the new changes, but with the changes as they are, for my >>>>> application for one of my benchmarks, it takes twice as long to >>>>> complete. >>>>> >>>>> I believe the culprit for this is that when the new code can't find >>>>> a consumer which is not full, the broker chooses the consumer with >>>>> the lowest dispatch queue size. >>>>> >>>>> In my application, since I have a prefetch size of 1, and have >>>>> longish-running transactions, the dispatch queue size is not >>>>> indicative of the current load for that consumer. As a result, I >>>>> think this is what is responsible for poor load-balancing in my case. >>>>> >>>>> For applications which commit() after each processed message, I am >>>>> sure this wouldn't be the case. In some ways, reverting to the old >>>>> behaviour of adding the pending message to all consumers might lead >>>>> to better load balancing with this code. >>>>> >>>>> However - I think it is better if the consumers can decide when >>>>> they want more messages rather than the broker pushing messages at >>>>> them? I've attached a patch which demonstrates this. When >>>>> LAZY_DISPATCH is set to true (set via a system property for now for >>>>> testing purposes) this changes the behaviour slightly. >>>>> >>>>> The basic idea is pageInMessages() only pages in the minimum number >>>>> of messages that can be dispatched immediately to non-full >>>>> consumers. Whenever a consumer acks a message, which updates its >>>>> prefetch size, we make sure Queue.wakeup() is called so that the >>>>> consumer will receive new messages. >>>>> >>>>> With this change in effect - I see slightly faster or almost the >>>>> same times with the previous benchmark. However memory usage on >>>>> the broker is far better, as the pending queues for each consumer >>>>> is either 0 or very small. >>>>> >>>>> What do you think? I guess there are better ways of doing this. >>>>> >>>>> I am doing a large overnight run with 16 consumers, so we'll see >>>>> how the performance goes. >>>>> >>>>> You'll also notice in the patch, that in Queue.addSubscriber(), I >>>>> thought there didn't seem to be any need for adding a message to a >>>>> new consumer if the message has already been locked by another >>>>> consumer? >>>>> >>>>> Cheers, >>>>> David >>>>> >>>>> Rob Davies wrote: >>>>>> Hi David, >>>>>> please let us know if these changes helps/hinders your app! >>>>>> cheers, >>>>>> Rob >>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote: >>>>>>>>> If what I said above is true, then the immediately above if >>>>>>>>> statement needs to be moved outside its enclosing if - >>>>>>>>> otherwise it only gets executed when targets != null. We'd >>>>>>>>> want this to execute if we found a matching target wouldn't we? >>>>>>>> Don't think so? We only want the message going to one >>>>>>>> subscription? I may have misunderstood what you mean! >>>>>>> Yes - ignore what I said, I had my wires crossed. >>>>>>> >>>>>>> Cheers, >>>>>>> David >>>>>>> >>>>> >>>>> >>>>> -- >>>>> Cheers, >>>>> David >>>>> >>>>> Nuix Pty Ltd >>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 >>>>> 0699 >>>>> Web: http://www.nuix.com Fax: +61 2 9212 >>>>> 6902 >>>>> Index: >>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java >>>>> >>>>> =================================================================== >>>>> --- >>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java >>>>> (revision 628917) >>>>> +++ >>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java >>>>> (working copy) >>>>> @@ -160,6 +160,8 @@ >>>>> public void acknowledge(final ConnectionContext context,final >>>>> MessageAck ack) throws Exception { >>>>> // Handle the standard acknowledgment case. >>>>> boolean callDispatchMatched = false; >>>>> + Queue queue = null; >>>>> + synchronized(dispatchLock) { >>>>> if (ack.isStandardAck()) { >>>>> // Acknowledge all dispatched messages up till the >>>>> message id of >>>>> @@ -223,8 +225,12 @@ >>>>> prefetchExtension = Math.max(0, >>>>> prefetchExtension - (index + >>>>> 1)); >>>>> } >>>>> + if (queue == null) >>>>> + { >>>>> + queue = (Queue)node.getRegionDestination(); >>>>> + } >>>>> callDispatchMatched = true; >>>>> - break; >>>>> + break; >>>>> } >>>>> } >>>>> } >>>>> @@ -253,6 +259,10 @@ >>>>> if >>>>> (ack.getLastMessageId().equals(node.getMessageId())) { >>>>> prefetchExtension = Math.max(prefetchExtension, >>>>> index + 1); >>>>> + if (queue == null) >>>>> + { >>>>> + queue = >>>>> (Queue)node.getRegionDestination(); >>>>> + } >>>>> callDispatchMatched = true; >>>>> break; >>>>> } >>>>> @@ -279,6 +289,10 @@ >>>>> if (inAckRange) { >>>>> node.incrementRedeliveryCounter(); >>>>> if (ack.getLastMessageId().equals(messageId)) { >>>>> + if (queue == null) >>>>> + { >>>>> + queue = (Queue)node.getRegionDestination(); >>>>> + } >>>>> callDispatchMatched = true; >>>>> break; >>>>> } >>>>> @@ -320,6 +334,10 @@ >>>>> if (ack.getLastMessageId().equals(messageId)) { >>>>> prefetchExtension = Math.max(0, >>>>> prefetchExtension >>>>> - (index + 1)); >>>>> + if (queue == null) >>>>> + { >>>>> + queue = (Queue)node.getRegionDestination(); >>>>> + } >>>>> callDispatchMatched = true; >>>>> break; >>>>> } >>>>> @@ -336,6 +354,9 @@ >>>>> } >>>>> } >>>>> if (callDispatchMatched) { >>>>> + if (Queue.LAZY_DISPATCH) { >>>>> + queue.wakeup(); >>>>> + } >>>>> dispatchPending(); >>>>> } else { >>>>> if (isSlave()) { >>>>> Index: >>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java >>>>> >>>>> =================================================================== >>>>> --- >>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java >>>>> (revision 628917) >>>>> +++ >>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java >>>>> (working copy) >>>>> @@ -75,6 +75,8 @@ >>>>> * @version $Revision: 1.28 $ >>>>> */ >>>>> public class Queue extends BaseDestination implements Task { >>>>> + public static final boolean LAZY_DISPATCH = >>>>> + >>>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch", >>>>> "true")); >>>>> private final Log log; >>>>> private final List consumers = new >>>>> ArrayList(50); >>>>> private PendingMessageCursor messages; >>>>> @@ -212,12 +214,12 @@ >>>>> synchronized (pagedInMessages) { >>>>> // Add all the matching messages in the queue to the >>>>> // subscription. >>>>> - >>>>> for (Iterator i = >>>>> pagedInMessages.values() >>>>> .iterator(); i.hasNext();) { >>>>> QueueMessageReference node = >>>>> (QueueMessageReference) i >>>>> .next(); >>>>> - if (!node.isDropped() && !node.isAcked() && >>>>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) { >>>>> + if ((!node.isDropped() || >>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() && >>>>> + node.getLockOwner() == null) { >>>>> msgContext.setMessageReference(node); >>>>> if (sub.matches(node, msgContext)) { >>>>> sub.add(node); >>>>> @@ -940,7 +945,11 @@ >>>>> dispatchLock.lock(); >>>>> try{ >>>>> >>>>> - final int toPageIn = getMaxPageSize() - >>>>> pagedInMessages.size(); >>>>> + int toPageIn = getMaxPageSize() - pagedInMessages.size(); >>>>> + if (LAZY_DISPATCH) { >>>>> + // Only page in the minimum number of messages which can >>>>> be dispatched immediately. >>>>> + toPageIn = Math.min(getConsumerMessageCountBeforeFull(), >>>>> toPageIn); >>>>> + } >>>>> if ((force || !consumers.isEmpty()) && toPageIn > 0) { >>>>> messages.setMaxBatchSize(toPageIn); >>>>> int count = 0; >>>>> @@ -976,12 +985,25 @@ >>>>> } >>>>> return result; >>>>> } >>>>> + >>>>> + private int getConsumerMessageCountBeforeFull() throws >>>>> Exception { >>>>> + int total = 0; >>>>> + synchronized (consumers) { >>>>> + for (Subscription s : consumers) { >>>>> + if (s instanceof PrefetchSubscription) { >>>>> + total += ((PrefetchSubscription)s).countBeforeFull(); >>>>> + } >>>>> + } >>>>> + } >>>>> + return total; >>>>> + } >>>>> >>>>> private void doDispatch(List list) throws >>>>> Exception { >>>>> >>>>> if (list != null) { >>>>> synchronized (consumers) { >>>>> for (MessageReference node : list) { >>>>> + >>>>> Subscription target = null; >>>>> List targets = null; >>>>> for (Subscription s : consumers) { >>> >>> >>> -- >>> Cheers, >>> David >>> >>> Nuix Pty Ltd >>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699 >>> Web: http://www.nuix.com Fax: +61 2 9212 6902 > > -- Cheers, David Nuix Pty Ltd Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699 Web: http://www.nuix.com Fax: +61 2 9212 6902