Hi David, Yes - actually - I tried it a few days ago. I haven't committed it because message throughput is generally lower. I will look at making it optional via a destination policy cheers, Rob On 6 Mar 2008, at 05:54, David Sitsky wrote: > Hi Rob, > > I know its been a couple of weeks. I've been using my changes for a > while and I see nice CPU and memory usage on the broker, and good > messaging performance for my application. Have you had a chance to > try it out? > > Cheers, > David > > Rob Davies wrote: >> Hi David, >> thanks for the great feedback - will try your patch and see how it >> works! >> cheers, >> Rob >> On 20 Feb 2008, at 06:31, David Sitsky wrote: >>> Hi Rob, >>> >>> I like the new changes, but with the changes as they are, for my >>> application for one of my benchmarks, it takes twice as long to >>> complete. >>> >>> I believe the culprit for this is that when the new code can't >>> find a consumer which is not full, the broker chooses the consumer >>> with the lowest dispatch queue size. >>> >>> In my application, since I have a prefetch size of 1, and have >>> longish-running transactions, the dispatch queue size is not >>> indicative of the current load for that consumer. As a result, I >>> think this is what is responsible for poor load-balancing in my >>> case. >>> >>> For applications which commit() after each processed message, I am >>> sure this wouldn't be the case. In some ways, reverting to the >>> old behaviour of adding the pending message to all consumers might >>> lead to better load balancing with this code. >>> >>> However - I think it is better if the consumers can decide when >>> they want more messages rather than the broker pushing messages at >>> them? I've attached a patch which demonstrates this. When >>> LAZY_DISPATCH is set to true (set via a system property for now >>> for testing purposes) this changes the behaviour slightly. >>> >>> The basic idea is pageInMessages() only pages in the minimum >>> number of messages that can be dispatched immediately to non-full >>> consumers. Whenever a consumer acks a message, which updates its >>> prefetch size, we make sure Queue.wakeup() is called so that the >>> consumer will receive new messages. >>> >>> With this change in effect - I see slightly faster or almost the >>> same times with the previous benchmark. However memory usage on >>> the broker is far better, as the pending queues for each consumer >>> is either 0 or very small. >>> >>> What do you think? I guess there are better ways of doing this. >>> >>> I am doing a large overnight run with 16 consumers, so we'll see >>> how the performance goes. >>> >>> You'll also notice in the patch, that in Queue.addSubscriber(), I >>> thought there didn't seem to be any need for adding a message to a >>> new consumer if the message has already been locked by another >>> consumer? >>> >>> Cheers, >>> David >>> >>> Rob Davies wrote: >>>> Hi David, >>>> please let us know if these changes helps/hinders your app! >>>> cheers, >>>> Rob >>>> On 19 Feb 2008, at 08:32, David Sitsky wrote: >>>>>>> If what I said above is true, then the immediately above if >>>>>>> statement needs to be moved outside its enclosing if - >>>>>>> otherwise it only gets executed when targets != null. We'd >>>>>>> want this to execute if we found a matching target wouldn't we? >>>>>> Don't think so? We only want the message going to one >>>>>> subscription? I may have misunderstood what you mean! >>>>> Yes - ignore what I said, I had my wires crossed. >>>>> >>>>> Cheers, >>>>> David >>>>> >>> >>> >>> -- >>> Cheers, >>> David >>> >>> Nuix Pty Ltd >>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 >>> 9280 0699 >>> Web: http://www.nuix.com Fax: +61 2 >>> 9212 6902 >>> Index: activemq-core/src/main/java/org/apache/activemq/broker/ >>> region/PrefetchSubscription.java >>> =================================================================== >>> --- activemq-core/src/main/java/org/apache/activemq/broker/region/ >>> PrefetchSubscription.java (revision 628917) >>> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/ >>> PrefetchSubscription.java (working copy) >>> @@ -160,6 +160,8 @@ >>> public void acknowledge(final ConnectionContext context,final >>> MessageAck ack) throws Exception { >>> // Handle the standard acknowledgment case. >>> boolean callDispatchMatched = false; >>> + Queue queue = null; >>> + synchronized(dispatchLock) { >>> if (ack.isStandardAck()) { >>> // Acknowledge all dispatched messages up till the >>> message id of >>> @@ -223,8 +225,12 @@ >>> prefetchExtension = Math.max(0, >>> prefetchExtension - (index >>> + 1)); >>> } >>> + if (queue == null) >>> + { >>> + queue = (Queue)node.getRegionDestination(); >>> + } >>> callDispatchMatched = true; >>> - break; >>> + break; >>> } >>> } >>> } >>> @@ -253,6 +259,10 @@ >>> if >>> (ack.getLastMessageId().equals(node.getMessageId())) { >>> prefetchExtension = >>> Math.max(prefetchExtension, >>> index + 1); >>> + if (queue == null) >>> + { >>> + queue = >>> (Queue)node.getRegionDestination(); >>> + } >>> callDispatchMatched = true; >>> break; >>> } >>> @@ -279,6 +289,10 @@ >>> if (inAckRange) { >>> node.incrementRedeliveryCounter(); >>> if >>> (ack.getLastMessageId().equals(messageId)) { >>> + if (queue == null) >>> + { >>> + queue = (Queue)node.getRegionDestination(); >>> + } >>> callDispatchMatched = true; >>> break; >>> } >>> @@ -320,6 +334,10 @@ >>> if >>> (ack.getLastMessageId().equals(messageId)) { >>> prefetchExtension = Math.max(0, >>> prefetchExtension >>> - (index + 1)); >>> + if (queue == null) >>> + { >>> + queue = (Queue)node.getRegionDestination(); >>> + } >>> callDispatchMatched = true; >>> break; >>> } >>> @@ -336,6 +354,9 @@ >>> } >>> } >>> if (callDispatchMatched) { >>> + if (Queue.LAZY_DISPATCH) { >>> + queue.wakeup(); >>> + } >>> dispatchPending(); >>> } else { >>> if (isSlave()) { >>> Index: activemq-core/src/main/java/org/apache/activemq/broker/ >>> region/Queue.java >>> =================================================================== >>> --- activemq-core/src/main/java/org/apache/activemq/broker/region/ >>> Queue.java (revision 628917) >>> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/ >>> Queue.java (working copy) >>> @@ -75,6 +75,8 @@ >>> * @version $Revision: 1.28 $ >>> */ >>> public class Queue extends BaseDestination implements Task { >>> + public static final boolean LAZY_DISPATCH = >>> + >>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch", >>> "true")); >>> private final Log log; >>> private final List consumers = new >>> ArrayList(50); >>> private PendingMessageCursor messages; >>> @@ -212,12 +214,12 @@ >>> synchronized (pagedInMessages) { >>> // Add all the matching messages in the queue to the >>> // subscription. >>> - >>> for (Iterator i = >>> pagedInMessages.values() >>> .iterator(); i.hasNext();) { >>> QueueMessageReference node = >>> (QueueMessageReference) i >>> .next(); >>> - if (!node.isDropped() && !node.isAcked() && (! >>> node.isDropped() ||sub.getConsumerInfo().isBrowser())) { >>> + if ((!node.isDropped() || >>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() && >>> + node.getLockOwner() == null) { >>> msgContext.setMessageReference(node); >>> if (sub.matches(node, msgContext)) { >>> sub.add(node); >>> @@ -940,7 +945,11 @@ >>> dispatchLock.lock(); >>> try{ >>> >>> - final int toPageIn = getMaxPageSize() - >>> pagedInMessages.size(); >>> + int toPageIn = getMaxPageSize() - >>> pagedInMessages.size(); >>> + if (LAZY_DISPATCH) { >>> + // Only page in the minimum number of messages which can >>> be dispatched immediately. >>> + toPageIn = Math.min(getConsumerMessageCountBeforeFull(), >>> toPageIn); >>> + } >>> if ((force || !consumers.isEmpty()) && toPageIn > 0) { >>> messages.setMaxBatchSize(toPageIn); >>> int count = 0; >>> @@ -976,12 +985,25 @@ >>> } >>> return result; >>> } >>> + >>> + private int getConsumerMessageCountBeforeFull() throws >>> Exception { >>> + int total = 0; >>> + synchronized (consumers) { >>> + for (Subscription s : consumers) { >>> + if (s instanceof PrefetchSubscription) { >>> + total += ((PrefetchSubscription)s).countBeforeFull(); >>> + } >>> + } >>> + } >>> + return total; >>> + } >>> >>> private void doDispatch(List list) throws >>> Exception { >>> >>> if (list != null) { >>> synchronized (consumers) { >>> for (MessageReference node : list) { >>> + >>> Subscription target = null; >>> List targets = null; >>> for (Subscription s : consumers) { > > > -- > Cheers, > David > > Nuix Pty Ltd > Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 > 0699 > Web: http://www.nuix.com Fax: +61 2 9212 > 6902