activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Klosowski <ccelax2...@LiveNation.com>
Subject RE: ActiveMQ Transport Threads Getting Blocked
Date Tue, 27 Aug 2013 08:46:52 GMT
It appears that the socket.write will just block and not timeout.  Socket.setSoTimeout() will
not affect it.  That explains how it can get stuck indefinitely.  Why is another question.
 I'm going to request this machine get rebooted.  Other brokers in the broker networks that
we have aren't being affected.

It looks like the MutexTransport instance is created per transport connection and there are
no other threads blocked on that particular instance (no memory leak there).  The remaining
threads are blocked on a call to doMessageSend, which is synchronized, it appears to guarantee
dispatch ordering.  That is where the issue lies, no other thread can dispatch/send a message.
 

Here is the signature:

    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message
message)
            throws IOException, Exception {

Here is what the remaining blocked threads look like:

"ActiveMQ Transport: ssl:///10.73.103.145:55822" - Thread t@7223
   java.lang.Thread.State: BLOCKED
	at org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:444)
	- waiting to lock <3bfdff07> (a org.apache.activemq.broker.region.Topic) owned by "ActiveMQ
Transport: ssl:///10.73.111.127:49523" t@7246
	at org.apache.activemq.broker.region.Topic.send(Topic.java:425)
	at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:365)
	at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:518)
	at org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:491)
	at org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:422)
	at org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:119)
	at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89)
	at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89)
	at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89)
	at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89)
	at com.ticketmaster.activemq.security.TktmBrokerFilter.addConsumer(TktmBrokerFilter.java:499)
	at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89)
	at com.ticketmaster.activemq.security.IdentityBrokerFilter.addConsumer(IdentityBrokerFilter.java:581)
	at com.ticketmaster.activemq.reliability.TempDestReliabilityFilter.addConsumer(TempDestReliabilityFilter.java:209)
	at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89)
	at com.ticketmaster.activemq.hostgroups.HostGroupsFilter.addConsumer(HostGroupsFilter.java:82)
	at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89)
	at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89)
	at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:95)
	at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:551)
	at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:349)
	at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:312)
	at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:186)
	at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
	at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
	at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:228)
	- locked <1349bfa8> (a org.apache.activemq.transport.InactivityMonitor$1)
	at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
	at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:91)
	at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
	at java.lang.Thread.run(Thread.java:662)

   Locked ownable synchronizers:
	- None


I took a look at the second call stack.

It seems that 

    /**
     * A one way asynchronous send
     */
    public void oneway(Object command) throws IOException {
        checkStarted();
        wireFormat.marshal(command, dataOut);
        dataOut.flush();
    }

-----Original Message-----
From: David Klosowski 
Sent: Tuesday, August 27, 2013 12:37 AM
To: users@activemq.apache.org
Subject: RE: ActiveMQ Transport Threads Getting Blocked

The peculiar thing is that this pretty much locks up the whole broker.  There are 71 transport
threads that are blocked.  

If it's due to sync sends being performed by the broker, there are two issues that I can tell

1)	Why would 71 transport threads get blocked on the broker trying to send back a response
message to a single producer connection?
2)	Wouldn't a socket timeout be reached at some point? (I don't see any such timeout being
hit, but inspecting the code to see if there are any timeouts set)



-----Original Message-----
From: Christian Posta [mailto:christian.posta@gmail.com]
Sent: Monday, August 26, 2013 8:03 PM
To: users@activemq.apache.org
Subject: Re: ActiveMQ Transport Threads Getting Blocked

Yep, looks like you might have some slow topic consumers.


On Mon, Aug 26, 2013 at 7:50 PM, davidk <davidklosowski@livenation.com>wrote:

> I'm having issues with one of our activemq servers.  Many of the 
> transport threads are blocking after the server comes up and clients connect to it.
>
> The ActiveMQ version is 5.4.2 with quite a few back ported patches.
>
> There are many blocked transport threads and the snippet below just 
> shows the blocked thread and the running thread that is blocking it.
> Judging by the snippet from below, it looks like the Transport thread 
> on
> ssl:///10.73.216.103:51640 has the transport mutex and is not 
> releasing it.
> It looks to be writing to a socket as a result of a dispatch but 
> somehow stuck.
>
> Does anyone have suggestions as to what may be causing this?
>
> "ActiveMQ Transport: ssl:///10.73.111.127:49523" - Thread t@7246
>    java.lang.Thread.State: BLOCKED
>         at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
>         - waiting to lock <7fed4b35> (a java.lang.Object) owned by 
> "ActiveMQ
> Transport: ssl:///10.73.216.103:51640" t@663
>         at
>
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1284)
>         at
>
> org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:822)
>         at
>
> org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:782)
>         at
>
> org.apache.activemq.broker.region.TopicSubscription.dispatch(TopicSubscription.java:525)
>         at
>
> org.apache.activemq.broker.region.TopicSubscription.add(TopicSubscription.java:99)
>         at
>
> org.apache.activemq.broker.region.policy.SimpleDispatchPolicy.dispatch(SimpleDispatchPolicy.java:48)
>         at org.apache.activemq.broker.region.Topic.dispatch(Topic.java:657)
>         at
> org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:489)
>         - locked <3bfdff07> (a org.apache.activemq.broker.region.Topic)
>         at org.apache.activemq.broker.region.Topic.send(Topic.java:425)
>         at
>
> org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:365)
>         at
> org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:518)
>         at
>
> org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:491)
>         at
>
> org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:422)
>         at
>
> org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:417)
>         at
>
> org.apache.activemq.advisory.AdvisoryBroker.addDestinationInfo(AdvisoryBroker.java:186)
>         at
>
> org.apache.activemq.broker.BrokerFilter.addDestinationInfo(BrokerFilter.java:221)
>         at
>
> org.apache.activemq.broker.BrokerFilter.addDestinationInfo(BrokerFilter.java:221)
>         at
>
> org.apache.activemq.broker.BrokerFilter.addDestinationInfo(BrokerFilter.java:221)
>         at
>
> org.apache.activemq.broker.BrokerFilter.addDestinationInfo(BrokerFilter.java:221)
>         at
>
> org.apache.activemq.broker.BrokerFilter.addDestinationInfo(BrokerFilter.java:221)
>         at
>
> com.ticketmaster.activemq.security.IdentityBrokerFilter.addDestinationInfo(IdentityBrokerFilter.java:473)
>         at
>
> org.apache.activemq.broker.BrokerFilter.addDestinationInfo(BrokerFilter.java:221)
>         at
>
> org.apache.activemq.broker.BrokerFilter.addDestinationInfo(BrokerFilter.java:221)
>         at
>
> org.apache.activemq.broker.BrokerFilter.addDestinationInfo(BrokerFilter.java:221)
>         at
>
> org.apache.activemq.broker.BrokerFilter.addDestinationInfo(BrokerFilter.java:221)
>         at
>
> org.apache.activemq.broker.MutableBrokerFilter.addDestinationInfo(MutableBrokerFilter.java:227)
>         at
>
> org.apache.activemq.broker.TransportConnection.processAddDestination(TransportConnection.java:485)
>         at
> org.apache.activemq.command.DestinationInfo.visit(DestinationInfo.java:122)
>         at
>
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:312)
>         at
>
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:186)
>         at
>
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
>         at
>
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
>         at
>
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:228)
>         - locked <5f718b0b> (a
> org.apache.activemq.transport.InactivityMonitor$1)
>         at
>
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
>         at
>
> org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:91)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
>         at java.lang.Thread.run(Thread.java:662)
>
>    Locked ownable synchronizers:
>         - None
>
>
> "ActiveMQ Transport: ssl:///10.73.216.103:51640" - Thread t@663
>    java.lang.Thread.State: RUNNABLE
>         at java.net.SocketOutputStream.socketWrite0(Native Method)
>         at
> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
>         at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
>         at
>
> com.sun.net.ssl.internal.ssl.OutputRecord.writeBuffer(OutputRecord.java:358)
>         at
> com.sun.net.ssl.internal.ssl.OutputRecord.write(OutputRecord.java:346)
>         at
>
> com.sun.net.ssl.internal.ssl.SSLSocketImpl.writeRecordInternal(SSLSocketImpl.java:781)
>         at
>
> com.sun.net.ssl.internal.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:753)
>         at
>
> com.sun.net.ssl.internal.ssl.AppOutputStream.write(AppOutputStream.java:100)
>         - locked <48a49bda> (a
> com.sun.net.ssl.internal.ssl.AppOutputStream)
>         at
>
> org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115)
>         at java.io.DataOutputStream.flush(DataOutputStream.java:106)
>         at
>
> org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:182)
>         at
>
> org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:255)
>         - locked <2ef42a78> (a java.util.concurrent.atomic.AtomicBoolean)
>         at
>
> org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
>         at
>
> org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
>         at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
>         - locked <7fed4b35> (a java.lang.Object)
>         at
>
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1284)
>         at
>
> org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:822)
>         at
>
> org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:782)
>         at
>
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)
>         at
>
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
>         at
>
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
>         at
>
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:228)
>         - locked <2010445d> (a
> org.apache.activemq.transport.InactivityMonitor$1)
>         at
>
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
>         at
>
> org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:91)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
>         at java.lang.Thread.run(Thread.java:662)
>
>    Locked ownable synchronizers:
>         - locked <bf8461d> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/ActiveMQ-Transport-Threads-Getti
> ng-Blocked-tp4670703.html Sent from the ActiveMQ - User mailing list 
> archive at Nabble.com.
>



--
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Mime
View raw message