activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Richard Wagg (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQ-5077) Improve performance of ConcurrentStoreAndDispatch
Date Mon, 31 Mar 2014 16:53:16 GMT

    [ https://issues.apache.org/jira/browse/AMQ-5077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13955378#comment-13955378
] 

Richard Wagg commented on AMQ-5077:
-----------------------------------

Calling  
connectionFactory.setProducerWindowSize() 
With sizes varying from 10k to 10Mb has no effect on the throughput i can attain. All stack
traces i take of the producer catch it in code like:
{noformat}
"main" prio=10 tid=0x000000000bc3b000 nid=0x4109 runnable [0x0000000041ebe000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
        at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115)
        at java.io.DataOutputStream.flush(DataOutputStream.java:123)
        at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:176)
        at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:304)
        at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:286)
        at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
        at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
        at org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:658)
        - locked <0x000000050f60c5e8> (a java.lang.Object)
        at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
        at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
        at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1321)
        at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1315)
        at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1853)
        - locked <0x000000050f60c668> (a java.lang.Object)
        at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:289)
        at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:224)
        at org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessageProducerSupport.java:269)

{noformat}


My understanding of flow control & the producer window size: 

Client side: 
- window size is set.
- Before each send, current size of all messages in flight is checked to see if window is
exceeded. 
- if producerWindow.waitForSpace() doesn't block, then the message is sent. 
- After the message is sent, the producer in flight size is incremented by the message size
(and decremented when the ack is received). 

Broker side:
- Each queue has a memory limit set, as well as overall memory limit and disk store limit.

- For each message dispatched for a given queue, each of these limits is checked. 
- if any limit is set and sendFailIfNoSpace is set to true, the producer should get an exception
sent back. 

In none of my tests have i caught any thread stuck inside the flow control handling logic.
In all cases they're inside network code - producer side as above, broker side in something
like: 
{noformat}
"ActiveMQ NIO Worker 29" daemon prio=10 tid=0x000000001775d000 nid=0x6a0d runnable [0x000000004473d000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000638b62430> (a org.apache.activemq.store.kahadb.KahaDBStore$StoreQueueTask$InnerFutureTask)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:425)
        at java.util.concurrent.FutureTask.get(FutureTask.java:187)
        at org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:942)
        at org.apache.activemq.broker.region.Queue.send(Queue.java:727)
        at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:395)
        at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:441)
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:297)
        at org.apache.activemq.broker.region.virtual.CompositeDestinationFilter.send(CompositeDestinationFilter.java:86)
        at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:395)
        at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:441)
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:297)
        at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
        at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:307)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:147)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:147)
        at org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:206)
        at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:152)
        at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:496)
        at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:756)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
        at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.nio.NIOTransport.serviceRead(NIOTransport.java:138)
        at org.apache.activemq.transport.nio.NIOTransport$1.onSelect(NIOTransport.java:69)
        at org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
        at org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)

   Locked ownable synchronizers:
        - <0x00000006723b8748> (a java.util.concurrent.ThreadPoolExecutor$Worker)
{noformat}


I think we're exhausting TCP level message in flight limits long before producer flow control
or window sizes become relevant. We can't take advantage a lot of the higher level settings
like async sends, producer message windows or async writes to the diskstore, because on the
broker side, all the overhead associated with persisting messages to a queue is done on the
NIO worker thread, which quickly becomes the blocking factor. 
Are there any options that i'm missing, to take more of the work done off the NIO threads?




> Improve performance of ConcurrentStoreAndDispatch
> -------------------------------------------------
>
>                 Key: AMQ-5077
>                 URL: https://issues.apache.org/jira/browse/AMQ-5077
>             Project: ActiveMQ
>          Issue Type: Wish
>          Components: Message Store
>    Affects Versions: 5.9.0
>         Environment: 5.9.0.redhat-610343
>            Reporter: Jason Shepherd
>            Assignee: Gary Tully
>         Attachments: Test combinations.xlsx, compDesPerf.tar.gz, topicRouting.zip
>
>
> We have publishers publishing to a topic which has 5 topic -> queue routings, and
gets a max message rate attainable of ~833 messages/sec, with each message around 5k in size.
> To test this i set up a JMS config with topic queues:
> Topic
> TopicRouted.1
> ...
> TopicRouted.11
> Each topic has an increasing number of routings to queues, and a client is set up to
subscribe to all the queues.
> Rough message rates:
> routings messages/sec
> 0 2500
> 1 1428
> 2 2000
> 3 1428
> 4 1111
> 5 833
> This occurs whether the broker config has producerFlowControl="false" set to true or
false , and KahaDB disk synching is turned off. We also tried experimenting with concurrentStoreAndDispatch,
but that didn't seem to help. LevelDB didn't give any notable performance improvement either.
> We also have asyncSend enabled on the producer, and have a requirement to use persistent
messages. We have also experimented with sending messages in a transaction, but that hasn't
really helped.
> It seems like producer throughput rate across all queue destinations, all connections
and all publisher machines is limited by something on the broker, through a mechanism which
is not producer flow control. I think the prime suspect is still contention on the index.
> We did some test with Yourkit profiler.
> Profiler was attached to broker at startup, allowed to run and then a topic publisher
was started, routing to 5 queues. 
> Profiler statistics were reset, the publisher allowed to run for 60 seconds, and then
profiling snapshot was taken. During that time, ~9600 messages were logged as being sent for
a rate of ~160/sec.
> This ties in roughly with the invocation counts recorded in the snapshot (i think) -
~43k calls. 
> From what i can work out, in the snapshot (filtering everything but org.apache.activemq.store.kahadb),

> For the 60 second sample period, 
> 24.8 seconds elapsed in org.apache.activemq.store.kahadb.KahaDbTransactionStore$1.removeAsyncMessage(ConnectionContext,
MessageAck).
> 18.3 seconds elapsed in org.apache.activemq.store.kahadb.KahaDbTransactionStore$1.asyncAddQueueMessage(ConnectionContext,
Message, boolean).
> From these, a further large portion of the time is spent inside MessageDatabase:
> org.apache.activemq.store.kahadb.MessageDatabase.process(KahaRemoveMessageCommand, Location)
- 10 secs elapsed
> org.apache.activemq.store.kahadb.MessageDatabase.process(KahaAddMessageCommand, Location)
- 8.5 secs elapsed.
> As both of these lock on indexLock.writeLock(), and both take place on the NIO transport
threads, i think this accounts for at least some of the message throughput limits. As messages
are added and removed from the index one by one, regardless of sync type settings, this adds
a fair amount of overhead. 
> While we're not synchronising on writes to disk, we are performing work on the NIO worker
thread which can block on locks, and could account for the behaviour we've seen client side.

> To Reproduce:
> 1. Install a broker and use the attached configuration.
> 2. Use the 5.8.0 example ant script to consume from the queues, TopicQueueRouted.1 -
5. eg:
>    ant consumer -Durl=tcp://localhost:61616 -Dsubject=TopicQueueRouted.1 -Duser=admin
-Dpassword=admin -Dmax=-1
> 3. Use the modified version of 5.8.0 example ant script (attached) to send messages to
topics, TopicRouted.1 - 5, eg:
>    ant producer -Durl='tcp://localhost:61616?jms.useAsyncSend=true&wireFormat.tightEncodingEnabled=false&keepAlive=true&wireFormat.maxInactivityDuration=60000&socketBufferSize=32768'
-Dsubject=TopicRouted.1 -Duser=admin -Dpassword=admin -Dmax=1 -Dtopic=true -DsleepTime=0 -Dmax=10000
-DmessageSize=5000
> This modified version of the script prints the number of messages per second and prints
it to the console.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message