activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From igah <weiqi...@mail.com>
Subject couldn't add consumers after existing consumers are killed (deadlock in broker)
Date Thu, 25 Jan 2007 18:53:58 GMT

i am using activemq-4.0.2.  i am running a test with 6 producers continously
sending stomp message to a queue, with 6 consumers on the same queue. if i
kill -9 all the consumers (which are all in one process), and restart the
consumers again, sometimes (about 30% of the time) the consumers cannot get
any messages anymore. if i inspect the broker, i see only one consumer
registered. 

after some digging myself, i found out the reason is that
org.apache.activemq.broker.region.Queue.addSubscription is blocked at
dispatchValve.turnOff()

the reason for that is when the previous consumers are killed, on the broker
side, in Queue.removeSubscription,  dispatchValve.turnOn() (line 241)  is
not invoked. a little bit more poking reveals that in all the failed cases,
line 233: dispatchPolicy.dispatch(context, node, msgContext, consumers)
invokes removeSubscription again, therefore causing the deadlock (i.e. the
thread that's supposed to call dispatchValve.turnOn again calls
removeSubscription which calls dispatchValve.turnOff which is blocked)

the 2nd "self" call is preceded by this transport exception in the log:
(probably because i killed the consumers)

2007-01-25 13:29:46,662 [127.0.0.1:56489] DEBUG Transport                     
- Transport failed: java.io.IOException: The transport
tcp:///127.0.0.1:56486 of type:
org.apache.activemq.transport.tcp.TcpTransport is not running.
java.io.IOException: The transport tcp:///127.0.0.1:56486 of type:
org.apache.activemq.transport.tcp.TcpTransport is not running.
        at
org.apache.activemq.transport.TransportSupport.checkStarted(TransportSupport.java:109)
        at
org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:117)
        at
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:142)
        at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:82)
        at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:87)
        at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:45)
        at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:59)
        at
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:215)
        at
org.apache.activemq.broker.AbstractConnection.processDispatch(AbstractConnection.java:722)
        at
org.apache.activemq.broker.AbstractConnection.dispatchSync(AbstractConnection.java:699)
        at
org.apache.activemq.broker.region.PrefetchSubscription.dispatch(PrefetchSubscription.java:311)
        at
org.apache.activemq.broker.region.QueueSubscription.dispatch(QueueSubscription.java:152)
        at
org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:68)
        at
org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy.dispatch(RoundRobinDispatchPolicy.java:54)
        at
org.apache.activemq.broker.region.Queue.removeSubscription(Queue.java:233)
        at
org.apache.activemq.broker.region.AbstractRegion.removeConsumer(AbstractRegion.java:208)
        at
org.apache.activemq.broker.region.RegionBroker.removeConsumer(RegionBroker.java:317)
        at
org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:102)
        at
org.apache.activemq.advisory.AdvisoryBroker.removeConsumer(AdvisoryBroker.java:210)
        at
org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:102)
        at
org.apache.activemq.broker.MutableBrokerFilter.removeConsumer(MutableBrokerFilter.java:115)
        at
org.apache.activemq.broker.AbstractConnection.processRemoveConsumer(AbstractConnection.java:553)
        at
org.apache.activemq.broker.AbstractConnection.processRemoveSession(AbstractConnection.java:590)
        at
org.apache.activemq.broker.AbstractConnection.processRemoveConnection(AbstractConnection.java:660)
        at
org.apache.activemq.broker.AbstractConnection.stop(AbstractConnection.java:169)
        at
org.apache.activemq.broker.TransportConnection.stop(TransportConnection.java:98)
        at
org.apache.activemq.broker.jmx.ManagedTransportConnection.stop(ManagedTransportConnection.java:63)
        at
org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:40)
        at
org.apache.activemq.broker.AbstractConnection.serviceTransportException(AbstractConnection.java:183)
        at
org.apache.activemq.broker.TransportConnection$1.onException(TransportConnection.java:67)
        at
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:98)
        at
org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:111)
        at
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:98)
        at
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:98)
        at
org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:130)
        at
org.apache.activemq.transport.InactivityMonitor.onException(InactivityMonitor.java:150)
        at
org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:101)
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:150)
        at java.lang.Thread.run(Thread.java:595)


and i print out the stack when the "self" call to removeSubscription is
happening:

        at
org.apache.activemq.broker.region.Queue.removeSubscription(Queue.java:181)
        at
org.apache.activemq.broker.region.AbstractRegion.removeConsumer(AbstractRegion.java:208)
        at
org.apache.activemq.broker.region.RegionBroker.removeConsumer(RegionBroker.java:317)
        at
org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:102)
        at
org.apache.activemq.advisory.AdvisoryBroker.removeConsumer(AdvisoryBroker.java:210)
        at
org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:102)
        at
org.apache.activemq.broker.MutableBrokerFilter.removeConsumer(MutableBrokerFilter.java:115)
        at
org.apache.activemq.broker.AbstractConnection.processRemoveConsumer(AbstractConnection.java:553)
        at
org.apache.activemq.broker.AbstractConnection.processRemoveSession(AbstractConnection.java:590)
        at
org.apache.activemq.broker.AbstractConnection.processRemoveConnection(AbstractConnection.java:660)
        at
org.apache.activemq.broker.AbstractConnection.stop(AbstractConnection.java:169)
        at
org.apache.activemq.broker.TransportConnection.stop(TransportConnection.java:98)
        at
org.apache.activemq.broker.jmx.ManagedTransportConnection.stop(ManagedTransportConnection.java:
63)
        at
org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:40)
        at
org.apache.activemq.broker.AbstractConnection.serviceTransportException(AbstractConnection.java:183)
        at
org.apache.activemq.broker.AbstractConnection.serviceException(AbstractConnection.java:191)
        at
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:218)
        at
org.apache.activemq.broker.AbstractConnection.processDispatch(AbstractConnection.java:722)
        at
org.apache.activemq.broker.AbstractConnection.dispatchSync(AbstractConnection.java:699)
        at
org.apache.activemq.broker.region.PrefetchSubscription.dispatch(PrefetchSubscription.java:311)
        at
org.apache.activemq.broker.region.QueueSubscription.dispatch(QueueSubscription.java:152)
        at
org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:68)
        at
org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy.dispatch(RoundRobinDispatchPolicy.java:54)
        at
org.apache.activemq.broker.region.Queue.removeSubscription(Queue.java:233)
        at
org.apache.activemq.broker.region.AbstractRegion.removeConsumer(AbstractRegion.java:208)

seems that serviceTransportException(AbstractConnection.java:183) is where
the self call is triggered. 

could someone look into this and let me know if there is simple fix for this
deadlock condition? thanks in advance. 
-- 
View this message in context: http://www.nabble.com/couldn%27t-add-consumers-after-existing-consumers-are-killed-%28deadlock-in-broker%29-tf3118423.html#a8639042
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message