activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rob Davies <rajdav...@gmail.com>
Subject Re: Queue performance from recent changes
Date Wed, 20 Feb 2008 11:13:53 GMT
Hi David,

thanks for the great feedback - will try your patch and see how it  
works!

cheers,

Rob
On 20 Feb 2008, at 06:31, David Sitsky wrote:

> Hi Rob,
>
> I like the new changes, but with the changes as they are, for my  
> application for one of my benchmarks, it takes twice as long to  
> complete.
>
> I believe the culprit for this is that when the new code can't find  
> a consumer which is not full, the broker chooses the consumer with  
> the lowest dispatch queue size.
>
> In my application, since I have a prefetch size of 1, and have  
> longish-running transactions, the dispatch queue size is not  
> indicative of the current load for that consumer.  As a result, I  
> think this is what is responsible for poor load-balancing in my case.
>
> For applications which commit() after each processed message, I am  
> sure this wouldn't be the case.  In some ways, reverting to the old  
> behaviour of adding the pending message to all consumers might lead  
> to better load balancing with this code.
>
> However - I think it is better if the consumers can decide when they  
> want more messages rather than the broker pushing messages at them?  
> I've attached a patch which demonstrates this.  When LAZY_DISPATCH  
> is set to true (set via a system property for now for testing  
> purposes) this changes the behaviour slightly.
>
> The basic idea is pageInMessages() only pages in the minimum number  
> of messages that can be dispatched immediately to non-full  
> consumers. Whenever a consumer acks a message, which updates its  
> prefetch size, we make sure Queue.wakeup() is called so that the  
> consumer will receive new messages.
>
> With this change in effect - I see slightly faster or almost the  
> same times with the previous benchmark.  However memory usage on the  
> broker is far better, as the pending queues for each consumer is  
> either 0 or very small.
>
> What do you think?  I guess there are better ways of doing this.
>
> I am doing a large overnight run with 16 consumers, so we'll see how  
> the  performance goes.
>
> You'll also notice in the patch, that in Queue.addSubscriber(), I  
> thought there didn't seem to be any need for adding a message to a  
> new consumer if the message has already been locked by another  
> consumer?
>
> Cheers,
> David
>
> Rob Davies wrote:
>> Hi David,
>> please let us know if these changes helps/hinders your app!
>> cheers,
>> Rob
>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>> If what I said above is true, then the immediately above if  
>>>>> statement needs to be moved outside its enclosing if - otherwise  
>>>>> it only gets executed when targets != null.  We'd want this to  
>>>>> execute if we found a matching target wouldn't we?
>>>> Don't think so? We only want the message going to  one  
>>>> subscription? I may have misunderstood what you mean!
>>> Yes - ignore what I said, I had my wires crossed.
>>>
>>> Cheers,
>>> David
>>>
>
>
> -- 
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280  
> 0699
> Web: http://www.nuix.com                            Fax: +61 2 9212  
> 6902
> Index: activemq-core/src/main/java/org/apache/activemq/broker/region/ 
> PrefetchSubscription.java
> ===================================================================
> --- activemq-core/src/main/java/org/apache/activemq/broker/region/ 
> PrefetchSubscription.java	(revision 628917)
> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/ 
> PrefetchSubscription.java	(working copy)
> @@ -160,6 +160,8 @@
>     public  void acknowledge(final ConnectionContext context,final  
> MessageAck ack) throws Exception {
>         // Handle the standard acknowledgment case.
>         boolean callDispatchMatched = false;
> +	Queue queue = null;
> +	
>         synchronized(dispatchLock) {
>             if (ack.isStandardAck()) {
>                 // Acknowledge all dispatched messages up till the  
> message id of
> @@ -223,8 +225,12 @@
>                                 prefetchExtension = Math.max(0,
>                                         prefetchExtension - (index +  
> 1));
>                             }
> +			    if (queue == null)
> +			    {
> +				queue = (Queue)node.getRegionDestination();
> +			    }
>                             callDispatchMatched = true;
> -                            break;
> +			    break;
>                         }
>                     }
>                 }
> @@ -253,6 +259,10 @@
>                     if  
> (ack.getLastMessageId().equals(node.getMessageId())) {
>                         prefetchExtension =  
> Math.max(prefetchExtension,
>                                 index + 1);
> +                        if (queue == null)
> +                        {
> +                            queue =  
> (Queue)node.getRegionDestination();
> +                        }
>                         callDispatchMatched = true;
>                         break;
>                     }
> @@ -279,6 +289,10 @@
>                     if (inAckRange) {
>                         node.incrementRedeliveryCounter();
>                         if  
> (ack.getLastMessageId().equals(messageId)) {
> +			    if (queue == null)
> +			    {
> +				queue = (Queue)node.getRegionDestination();
> +			    }
>                             callDispatchMatched = true;
>                             break;
>                         }
> @@ -320,6 +334,10 @@
>                         if  
> (ack.getLastMessageId().equals(messageId)) {
>                             prefetchExtension = Math.max(0,  
> prefetchExtension
>                                     - (index + 1));
> +			    if (queue == null)
> +			    {
> +				queue = (Queue)node.getRegionDestination();
> +			    }
>                             callDispatchMatched = true;
>                             break;
>                         }
> @@ -336,6 +354,9 @@
>             }
>         }
>         if (callDispatchMatched) {
> +	    if (Queue.LAZY_DISPATCH) {
> +		queue.wakeup();
> +	    }
>             dispatchPending();
>         } else {
>             if (isSlave()) {
> Index: activemq-core/src/main/java/org/apache/activemq/broker/region/ 
> Queue.java
> ===================================================================
> --- activemq-core/src/main/java/org/apache/activemq/broker/region/ 
> Queue.java	(revision 628917)
> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/ 
> Queue.java	(working copy)
> @@ -75,6 +75,8 @@
>  * @version $Revision: 1.28 $
>  */
> public class Queue extends BaseDestination implements Task {
> +    public static final boolean LAZY_DISPATCH =
> +	Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch",  
> "true"));
>     private final Log log;
>     private final List<Subscription> consumers = new  
> ArrayList<Subscription>(50);
>     private PendingMessageCursor messages;
> @@ -212,12 +214,12 @@
>             synchronized (pagedInMessages) {
>                 // Add all the matching messages in the queue to the
>                 // subscription.
> -
>                 for (Iterator<MessageReference> i =  
> pagedInMessages.values()
>                         .iterator(); i.hasNext();) {
>                     QueueMessageReference node =  
> (QueueMessageReference) i
>                             .next();
> -                    if (!node.isDropped() && !node.isAcked() && (! 
> node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
> +                    if ((!node.isDropped() ||  
> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
> +			node.getLockOwner() == null) {
>                         msgContext.setMessageReference(node);
>                         if (sub.matches(node, msgContext)) {
>                             sub.add(node);
> @@ -940,7 +945,11 @@
>         dispatchLock.lock();
>         try{
>
> -            final int toPageIn = getMaxPageSize() -  
> pagedInMessages.size();
> +            int toPageIn = getMaxPageSize() - pagedInMessages.size();
> +	    if (LAZY_DISPATCH) {
> +		// Only page in the minimum number of messages which can be  
> dispatched immediately.
> +		toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
> +	    }
>             if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>                 messages.setMaxBatchSize(toPageIn);
>                 int count = 0;
> @@ -976,12 +985,25 @@
>         }
>         return result;
>     }
> +
> +    private int getConsumerMessageCountBeforeFull() throws  
> Exception {
> +	int total = 0;
> +        synchronized (consumers) {
> +            for (Subscription s : consumers) {
> +		if (s instanceof PrefetchSubscription) {
> +		    total += ((PrefetchSubscription)s).countBeforeFull();
> +		}
> +	    }
> +	}
> +	return total;
> +    }
>
>     private void doDispatch(List<MessageReference> list) throws  
> Exception {
>
>         if (list != null) {
>             synchronized (consumers) {
>                 for (MessageReference node : list) {
> +
>                     Subscription target = null;
>                     List<Subscription> targets = null;
>                     for (Subscription s : consumers) {


Mime
View raw message