activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Timothy Bish (Closed) (JIRA)" <j...@apache.org>
Subject [jira] [Closed] (AMQ-3127) Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.
Date Thu, 09 Feb 2012 16:55:59 GMT

     [ https://issues.apache.org/jira/browse/AMQ-3127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Timothy Bish closed AMQ-3127.
-----------------------------

    Resolution: Cannot Reproduce

Ran the test repeatedly and could not reproduce using the latest trunk code.
                
> Network bridge causes deadlock on queue/topic when message dispatch and consumer registration
overlap.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3127
>                 URL: https://issues.apache.org/jira/browse/AMQ-3127
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.4.2
>            Reporter: Stirling Chow
>            Priority: Critical
>         Attachments: AMQ-3127.diff, BridgeDeadlockTest.java
>
>
> Symptom
> =======
> We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using
the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The brokers share a number of topics
and queues.  Periodically, we have a catastrophic (cause still uknown) network outage that
only affects the outbound bridges from one of the 7 brokers.  The affected broker detects
the outage, stops the existing 6 outbound bridges, and starts 6 new outbound bridges.  Frequently,
we find that the outbound bridges appear to be recreated properly, but messages produced by
the affected broker to *some* of its shared queues/topics are no longer dispatched to the
remote brokers.
> We have verified that the cause of this issue exists in AMQ 5.4.2.
> Cause
> =====
> Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService
threads, which was dispatching a message across an outbound bridge, and a transport thread
(e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound
bridge:
> Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: Object  (id=104)	
> 	owns: Object  (id=105)	
> 	owns: Object  (id=106)	
> 	owns: Queue$3  (id=107)	
> 	waiting for: Object  (id=108)	
> 		owned by: Daemon Thread [VMTransport] (Running)	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line:
738	
> 	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
> 	ResponseCorrelator.onCommand(Object) line: 116	
> 	MutexTransport(TransportFilter).onCommand(Object) line: 69	
> 	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
> 	VMTransport.oneway(Object) line: 113	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
> 	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
> 	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
> 	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
> 	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
> 	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
> 	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
> 	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
> 	Queue.pageInMessages(boolean) line: 1898	
> 	Queue.iterate() line: 1425	
> 	PooledTaskRunner.runTask() line: 122	
> 	PooledTaskRunner$1.run() line: 43	
> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
> 	ThreadPoolExecutor$Worker.run() line: 908	
> 	Thread.run() line: 662	
> Daemon Thread [VMTransport] (Suspended)	
> 	owns: Object  (id=499)	
> 	owns: RegionBroker$1  (id=205)	
> 		waited by: Daemon Thread [VMTransport] (Running)	
> 		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
> 	owns: Object  (id=108)	
> 		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: URI  (id=500)	
> 	Unsafe.park(boolean, long) line: not available [native method]	
> 	LockSupport.park(Object) line: 158	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt()
line: 811	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node,
int) line: 842	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178

> 	ReentrantReadWriteLock$WriteLock.lock() line: 807	
> 	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
> 	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line:
290	
> 	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line:
444	
> 	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
> 	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
> 	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo)
line: 89	
> 	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89

> 	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line:
95	
> 	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line:
550	
> 	ConsumerInfo.visit(CommandVisitor) line: 349	
> 	
> Specifically, a message had been produced to one of the shared queues and was being dispatched
to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired
the pagedInPendingDispatchLock lock from Queue.java:
>     private void doDispatch(List<QueueMessageReference> list) throws Exception
{
>         boolean doWakeUp = false;
>         pagedInPendingDispatchLock.writeLock().lock();
> 	
> BrokerService had sent the message to the remote broker was then acknowledging the local
transport in DemandForwardingBridgeSupport.java:
>     protected void serviceLocalCommand(Command command) {
>     ...
>                         if (!message.isResponseRequired()) {
>                             
>                             // If the message was originally sent using async
>                             // send, we will preserve that QOS
>                             // by bridging it using an async send (small chance
>                             // of message loss).
>                             try {
>                                 remoteBroker.oneway(message);
>                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE,
1));
> Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the
write mutex in MutexTransport.java:
>     public void oneway(Object command) throws IOException {
>         synchronized (writeMutex) {
>             next.oneway(command);
>         }
>     }
> So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock
was trying to acquire MutexTransport.writeMutex.
> At the same time, a new remote consumer was being registered through the same outbound
bridge through which the aforementioned dispatch was ocurring.  The bridge's remote transport
listener thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:
>     protected void addSubscription(DemandSubscription sub) throws IOException {
>         if (sub != null) {
>             localBroker.oneway(sub.getLocalInfo());
>         }
>     }
> Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.
 Registration of consumers to a queue is synchronized with the dispatching of messages, as
shown in Queue.java:
>     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception
{
>         super.addSubscription(context, sub);
>         // synchronize with dispatch method so that no new messages are sent
>         // while setting up a subscription. avoid out of order messages,
>         // duplicates, etc.
>         pagedInPendingDispatchLock.writeLock().lock();
> So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex
and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with
BrokerService.
> Solution
> ======
> Deadlock can be avoided by making the local transport asynchronous, which would allow
the remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload
the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included
a unit test that passes with this change.
> There is no clear reason why the local transport is asynchronous.  This is enforced by
BrokerService.java when it starts the network connectors:
>     protected void startAllConnectors() throws Exception {
> ....
>             URI uri = getVmConnectorURI();
>             Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
>             map.put("network", "true");
>             map.put("async", "false");
> This change was made by the following checkin, but no rational was given:
> Revision: 553094
> Author: rajdavies
> Date: 11:33:48 PM, July 3, 2007
> Message:
> set async=false for network connectors
> ----
> Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
> Addendum
> =========
> We've included a unit test that demonstrates the deadlock 100% of the time on our systems.
 Since this is a timing issue, you may need to run the unit test several times to create the
deadlock.  Also note that three specific configurations must exist to create the deadlock:
> 1) The bridge must have conduit subscriptions disabled; this is so that there can be
an existing subscription across the bridge to which messages are being dispatched while at
the same time another subscription is being added.
> 2) The bridge must be configured to dispatch synchronously; this is so that message subscriptions
are are dispatched by the same thread that accesses the queue.
> 3) The message producers must be transactionalized; this is so that the message dispatches
require a response by the dispatch thread (i.e., BrokerService).
> If any of these conditions is not present, deadlock (at least through this recreation)
does not occur.	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> Through further testing 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message