activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stirling Chow (JIRA)" <j...@apache.org>
Subject [jira] Created: (AMQ-3127) Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.
Date Mon, 10 Jan 2011 21:47:45 GMT
Network bridge causes deadlock on queue/topic when message dispatch and consumer registration
overlap.
------------------------------------------------------------------------------------------------------

                 Key: AMQ-3127
                 URL: https://issues.apache.org/jira/browse/AMQ-3127
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 5.4.2
            Reporter: Stirling Chow
            Priority: Critical


Symptom
=======
We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using the DiscoveryNetworkConnector
and SimpleDiscoveryAgent.  The brokers share a number of topics and queues.  Periodically,
we have a catastrophic (cause still uknown) network outage that only affects the outbound
bridges from one of the 7 brokers.  The affected broker detects the outage, stops the existing
6 outbound bridges, and starts 6 new outbound bridges.  Frequently, we find that the outbound
bridges appear to be recreated properly, but messages produced by the affected broker to *some*
of its shared queues/topics are no longer dispatched to the remote brokers.

We have verified that the cause of this issue exists in AMQ 5.4.2.

Cause
=====
Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService
threads, which was dispatching a message across an outbound bridge, and a transport thread
(e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound
bridge:

Daemon Thread [BrokerService[broker1] Task] (Suspended)	
	owns: Object  (id=104)	
	owns: Object  (id=105)	
	owns: Object  (id=106)	
	owns: Queue$3  (id=107)	
	waiting for: Object  (id=108)	
		owned by: Daemon Thread [VMTransport] (Running)	
	MutexTransport.oneway(Object) line: 40	
	ResponseCorrelator.oneway(Object) line: 60	
	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line:
738	
	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
	ResponseCorrelator.onCommand(Object) line: 116	
	MutexTransport(TransportFilter).onCommand(Object) line: 69	
	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
	VMTransport.oneway(Object) line: 113	
	MutexTransport.oneway(Object) line: 40	
	ResponseCorrelator.oneway(Object) line: 60	
	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
	Queue.pageInMessages(boolean) line: 1898	
	Queue.iterate() line: 1425	
	PooledTaskRunner.runTask() line: 122	
	PooledTaskRunner$1.run() line: 43	
	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
	ThreadPoolExecutor$Worker.run() line: 908	
	Thread.run() line: 662	

Daemon Thread [VMTransport] (Suspended)	
	owns: Object  (id=499)	
	owns: RegionBroker$1  (id=205)	
		waited by: Daemon Thread [VMTransport] (Running)	
		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
	owns: Object  (id=108)	
		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
	owns: URI  (id=500)	
	Unsafe.park(boolean, long) line: not available [native method]	
	LockSupport.park(Object) line: 158	
	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt() line:
811	
	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node,
int) line: 842	
	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178	
	ReentrantReadWriteLock$WriteLock.lock() line: 807	
	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line: 290

	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line: 444

	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line:
89	
	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 95

	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line: 550

	ConsumerInfo.visit(CommandVisitor) line: 349	
	
Specifically, a message had been produced to one of the shared queues and was being dispatched
to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired
the pagedInPendingDispatchLock lock from Queue.java:

    private void doDispatch(List<QueueMessageReference> list) throws Exception {
        boolean doWakeUp = false;

        pagedInPendingDispatchLock.writeLock().lock();
	
BrokerService had sent the message to the remote broker was then acknowledging the local transport
in DemandForwardingBridgeSupport.java:

    protected void serviceLocalCommand(Command command) {
    ...
                        if (!message.isResponseRequired()) {
                            
                            // If the message was originally sent using async
                            // send, we will preserve that QOS
                            // by bridging it using an async send (small chance
                            // of message loss).
                            try {
                                remoteBroker.oneway(message);
                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE,
1));

Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the write
mutex in MutexTransport.java:

    public void oneway(Object command) throws IOException {
        synchronized (writeMutex) {
            next.oneway(command);
        }
    }

So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock was
trying to acquire MutexTransport.writeMutex.

At the same time, a new remote consumer was being registered through the same outbound bridge
through which the aforementioned dispatch was ocurring.  The bridge's remote transport listener
thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:

    protected void addSubscription(DemandSubscription sub) throws IOException {
        if (sub != null) {
            localBroker.oneway(sub.getLocalInfo());
        }
    }

Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.
 Registration of consumers to a queue is synchronized with the dispatching of messages, as
shown in Queue.java:

    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception
{
        super.addSubscription(context, sub);
        // synchronize with dispatch method so that no new messages are sent
        // while setting up a subscription. avoid out of order messages,
        // duplicates, etc.
        pagedInPendingDispatchLock.writeLock().lock();

So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex
and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with
BrokerService.

Solution
======
Deadlock can be avoided by making the local transport asynchronous, which would allow the
remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload
the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included
a unit test that passes with this change.

There is no clear reason why the local transport is asynchronous.  This is enforced by BrokerService.java
when it starts the network connectors:

    protected void startAllConnectors() throws Exception {
....
            URI uri = getVmConnectorURI();
            Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
            map.put("network", "true");
            map.put("async", "false");

This change was made by the following checkin, but no rational was given:

Revision: 553094
Author: rajdavies
Date: 11:33:48 PM, July 3, 2007
Message:
set async=false for network connectors
----
Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java

Addendum
=========
We've included a unit test that demonstrates the deadlock 100% of the time on our systems.
 Since this is a timing issue, you may need to run the unit test several times to create the
deadlock.  Also note that two specific configurations must exist to create the deadlock:

1) The bridge must have conduit subscriptions disabled; this is so that there can be an existing
subscription across the bridge to which messages are being dispatched while at the same time
another subscription is being added.
2) The message producers must be transactionalized; this is so that the message dispatches
require a response by the dispatch thread (i.e., BrokerService).

If either of these conditions is not present, deadlock (at least through this recreation)
does not occur.	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	




Through further testing 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message