Return-Path: Delivered-To: apmail-activemq-users-archive@www.apache.org Received: (qmail 66577 invoked from network); 20 Dec 2007 07:10:42 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 20 Dec 2007 07:10:42 -0000 Received: (qmail 9062 invoked by uid 500); 20 Dec 2007 07:10:30 -0000 Delivered-To: apmail-activemq-users-archive@activemq.apache.org Received: (qmail 9045 invoked by uid 500); 20 Dec 2007 07:10:29 -0000 Mailing-List: contact users-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@activemq.apache.org Delivered-To: mailing list users@activemq.apache.org Received: (qmail 9030 invoked by uid 99); 20 Dec 2007 07:10:29 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Dec 2007 23:10:29 -0800 X-ASF-Spam-Status: No, hits=-0.0 required=10.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of rajdavies@gmail.com designates 209.85.134.186 as permitted sender) Received: from [209.85.134.186] (HELO mu-out-0910.google.com) (209.85.134.186) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Dec 2007 07:10:05 +0000 Received: by mu-out-0910.google.com with SMTP id i2so5985284mue.2 for ; Wed, 19 Dec 2007 23:10:07 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=domainkey-signature:received:received:message-id:from:to:in-reply-to:content-type:content-transfer-encoding:mime-version:subject:date:references:x-mailer; bh=mpVbUMZYU9SzwVJyQY3/Dv4m4Nqh2UKbEJVuFlVaRJk=; b=PL0KUigB6D52lg/9Byhexwx5APK/qRS5N1rqMoKPbKPQke7U+13xjTO1MQTG8a4vKHyIwyX8K82uqkOwDRjAtGdTwsF00/bSOOP9PaKmVuUzD2tpEssKp0RO7OkmymwSRZx5wrWAY7D2VW/9PC7UaVTjAMPwtYLQRqVtKVLs7ns= DomainKey-Signature: a=rsa-sha1; c=nofws; d=gmail.com; s=gamma; h=message-id:from:to:in-reply-to:content-type:content-transfer-encoding:mime-version:subject:date:references:x-mailer; b=BPHrBU+obvG9RvV6Zy7Z15VAUNaeP2mFZja/4k2e97waNtACzxWj0UPEUp27KM/0yWi5EwmqLq/osIbFGrTYu/i2gyYDL7HXauliIcnmevdap+sFPY+/kVMZX9m5SBHMThi9tZSSiGbjY5/IpJWYKeYdGMODVcf7GpSpUpNaGwQ= Received: by 10.78.167.12 with SMTP id p12mr13470496hue.20.1198134606032; Wed, 19 Dec 2007 23:10:06 -0800 (PST) Received: from ?192.168.1.73? ( [86.130.77.176]) by mx.google.com with ESMTPS id i3sm21006430nfh.2007.12.19.23.10.04 (version=TLSv1/SSLv3 cipher=OTHER); Wed, 19 Dec 2007 23:10:04 -0800 (PST) Message-Id: <75DD5DB4-6332-496C-88E5-3CCC78A6C0F8@gmail.com> From: Rob Davies To: users@activemq.apache.org In-Reply-To: <476A11C4.1050802@nuix.com> Content-Type: text/plain; charset=US-ASCII; format=flowed; delsp=yes Content-Transfer-Encoding: 7bit Mime-Version: 1.0 (Apple Message framework v915) Subject: Re: Message dispatch performance using ActiveMQ 5 on application using 4 quad-cores machines Date: Thu, 20 Dec 2007 07:10:02 +0000 References: <476A11C4.1050802@nuix.com> X-Mailer: Apple Mail (2.915) X-Virus-Checked: Checked by ClamAV on apache.org Hi David, Your application should be i/o bound with a pre-fetch of 0 (effectively pul every message from the broker) - so something else is going on Could you send a full stack trace ? thanks, Rob On Dec 20, 2007, at 6:55 AM, David Sitsky wrote: > Hi, > > I have simplistically speaking, an application using ActiveMQ 5 > which has a queue called work-items. > > A master JVM process puts work items onto the queue (this process > also runs the embedded JMS broker), and external worker JVM > processes read these items using receive() with a timeout of 2 > seconds. > > I have 4 quad-cores machines running this application, one runs the > master program as described, and the other three quad-core machines > each runs a worker JVM process, all connected with gigabit ethernet, > all machines are running Vista and Java 1.6. > > I have 4 separate threads within each worker JVM processing data, > each thread with two connections to the broker (one for sending > data, one for receiving data) which at the time seemed to give the > best results. Each connection has their own associated session > object to avoid any "contention". > > I am using a prefetch size of 0, as have found this gives the best > performance in this application, as each work item's payload can > have an unpredictable amount of processing time. Data is sent and > received using transactions, and the messages are non-persistent. > > Using JMX, I often see the queue size can easily grow to around > 10,000 - 20,000 elements, which is as expected, but often, I see the > worker threads sitting waiting for messages to be dispatched to them > (verified using thread dumps). I also see the master process is > consuming a lot of CPU, which perhaps explains why it is unable to > keep up. > > I've read various documents on the wiki, and have tried a lot of > different settings, but it seems that the broker is unable to > deliver these messages fast enough to the consumers, or something > else is holding it back. > > Here are some relevant thread snapshots when the broker is > grinding... if anybody has some ideas on what I might be doing > wrong, I'd love some suggestions. I have gotten carried away with > creating too many connections? > > I am a bit worried by PooledTaskRunner iterating over this large > number of messages often... > > "ActiveMQ Transport: tcp:///192.168.222.75:49197" daemon prio=4 > RUNNABLE > at > org > .apache > .activemq > .broker.region.RegionBroker.getBrokerService(RegionBroker.java:625) > at > org > .apache > .activemq.broker.BrokerFilter.getBrokerService(BrokerFilter.java:245) > at > org > .apache > .activemq.broker.BrokerFilter.getBrokerService(BrokerFilter.java:245) > at > org > .apache > .activemq.broker.BrokerFilter.getBrokerService(BrokerFilter.java:245) > at > org > .apache > .activemq > .broker > .MutableBrokerFilter.getBrokerService(MutableBrokerFilter.java:261) > at > org > .apache > .activemq.broker.ConnectionContext.isSlave(ConnectionContext.java:265) > at > org > .apache > .activemq > .broker > .region.AbstractSubscription.isSlave(AbstractSubscription.java:118) > at > org > .apache > .activemq > .broker.region.PrefetchSubscription.isFull(PrefetchSubscription.java: > 361) > at > org > .apache > .activemq > .broker > .region > .PrefetchSubscription.dispatchMatched(PrefetchSubscription.java:452) > at > org > .apache > .activemq > .broker > .region.PrefetchSubscription.pullMessage(PrefetchSubscription.java:90) > at > org > .apache > .activemq > .broker.region.AbstractRegion.messagePull(AbstractRegion.java:348) > at > org > .apache > .activemq.broker.region.RegionBroker.messagePull(RegionBroker.java: > 434) > at > org > .apache.activemq.broker.BrokerFilter.messagePull(BrokerFilter.java:77) > at > org > .apache.activemq.broker.BrokerFilter.messagePull(BrokerFilter.java:77) > at > org > .apache.activemq.broker.BrokerFilter.messagePull(BrokerFilter.java:77) > at > org > .apache > .activemq > .broker.MutableBrokerFilter.messagePull(MutableBrokerFilter.java:245) > at > org > .apache > .activemq > .broker > .TransportConnection.processMessagePull(TransportConnection.java:444) > at org.apache.activemq.command.MessagePull.visit(MessagePull.java:43) > at > org > .apache > .activemq > .broker.TransportConnection.service(TransportConnection.java:280) > at org.apache.activemq.broker.TransportConnection > $1.onCommand(TransportConnection.java:177) > at > org > .apache > .activemq.transport.TransportFilter.onCommand(TransportFilter.java:67) > at > org > .apache > .activemq > .transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java: > 134) > at > org > .apache > .activemq > .transport.InactivityMonitor.onCommand(InactivityMonitor.java:150) > at > org > .apache > .activemq.transport.TransportSupport.doConsume(TransportSupport.java: > 83) > at > org > .apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java: > 185) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java: > 172) > at java.lang.Thread.run(Unknown Source) > > "ActiveMQ Task" daemon prio=5 RUNNABLE > at > org > .apache > .activemq > .broker.region.PrefetchSubscription.isFull(PrefetchSubscription.java: > 361) > at > org > .apache > .activemq > .broker > .region > .PrefetchSubscription.dispatchMatched(PrefetchSubscription.java:452) > at > org > .apache > .activemq > .broker.region.PrefetchSubscription.add(PrefetchSubscription.java:141) > at > org > .apache > .activemq > .broker > .region > .policy > .RoundRobinDispatchPolicy.dispatch(RoundRobinDispatchPolicy.java:70) > at org.apache.activemq.broker.region.Queue.doDispatch(Queue.java: > 1059) > at > org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java: > 1072) > at org.apache.activemq.broker.region.Queue.iterate(Queue.java:945) > at > org > .apache > .activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:118) > at org.apache.activemq.thread.PooledTaskRunner > $1.run(PooledTaskRunner.java:42) > at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown > Source) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > at java.lang.Thread.run(Unknown Source) > > "ActiveMQ Transport: tcp:///192.168.222.74:49196" daemon prio=4 > BLOCKED > at org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java: > 449) > at org.apache.activemq.broker.region.Queue.send(Queue.java:441) > at > org > .apache > .activemq.broker.region.AbstractRegion.send(AbstractRegion.java:328) > at > org > .apache.activemq.broker.region.RegionBroker.send(RegionBroker.java: > 402) > at > org > .apache > .activemq.broker.TransactionBroker.send(TransactionBroker.java:224) > at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java: > 125) > at > org > .apache > .activemq > .broker > .CompositeDestinationBroker.send(CompositeDestinationBroker.java:95) > at > org > .apache > .activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java: > 135) > at > org > .apache > .activemq > .broker.TransportConnection.processMessage(TransportConnection.java: > 433) > at > org > .apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java: > 623) > at > org > .apache > .activemq > .broker.TransportConnection.service(TransportConnection.java:280) > at org.apache.activemq.broker.TransportConnection > $1.onCommand(TransportConnection.java:177) > at > org > .apache > .activemq.transport.TransportFilter.onCommand(TransportFilter.java:67) > at > org > .apache > .activemq > .transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java: > 134) > at > org > .apache > .activemq > .transport.InactivityMonitor.onCommand(InactivityMonitor.java:150) > at > org > .apache > .activemq.transport.TransportSupport.doConsume(TransportSupport.java: > 83) > at > org > .apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java: > 185) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java: > 172) > at java.lang.Thread.run(Unknown Source) > > > -- > Cheers, > David > > Nuix Pty Ltd > Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 > 0699 > Web: http://www.nuix.com Fax: +61 2 9212 > 6902