activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "matteo rulli (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQ-5260) Cross talk over duplex network connection can lead to blocking (alternative take)
Date Fri, 26 Sep 2014 14:55:34 GMT

    [ https://issues.apache.org/jira/browse/AMQ-5260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14149239#comment-14149239
] 

matteo rulli commented on AMQ-5260:
-----------------------------------

Thank you for your prompt reply! Now I understand the role of MutexTransport. By the way,
I noticed that the deadlock involves two MutexTransport instances that wrap *VMTransport(s)*.
Apparently if I apply the following hack
{code:java}
--- MutexTransport.java	Thu Jun 05 14:48:36 2014
+++ MutexTransport.edited.java	Fri Sep 26 09:11:41 2014
@@ -19,17 +19,31 @@
 import java.io.IOException;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Thread safe Transport Filter that serializes calls to and from the Transport Stack.
  */
 public class MutexTransport extends TransportFilter {
 
-    private final ReentrantLock writeLock = new ReentrantLock();
+	private static final Logger LOG = LoggerFactory.getLogger(MutexTransport.class);
+    private final static ReentrantLock vmWriteLock = new ReentrantLock();
+    private ReentrantLock writeLock;
     private boolean syncOnCommand;
 
     public MutexTransport(Transport next) {
         super(next);
         this.syncOnCommand = false;
+        
+        writeLock = null;
+        if(next != null && next.toString().startsWith("vm://")){
+        	writeLock = vmWriteLock;
+        	LOG.error("#@mrul# vm transport with mutex: " + next);
+        }else{ 
+        	writeLock = new ReentrantLock();
+        	LOG.error("#@mrul# non-vm transport with mutex: " + next);
+        }
     }
 
     public MutexTransport(Transport next, boolean syncOnCommand) {
{code}

the deadlock disappears. I noticed that the network bridge create many VMTransport of the
form vm://brokerName#<number> using basically three different thread paths and all these
local VMTransports trigger many intertwined command exchanges that somehow determine the deadlock:

h1. VMTransports creation PATH 1
{noformat}
java.lang.Thread.getStackTrace(Unknown Source)
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.transport.vm.VMTransportServer$1.<init>(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.network.NetworkConnector.createLocalTransport(NetworkConnector.java:154)
org.apache.activemq.network.DiscoveryNetworkConnector.onServiceAdd(DiscoveryNetworkConnector.java:136)
org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent.start(SimpleDiscoveryAgent.java:89)
org.apache.activemq.network.DiscoveryNetworkConnector.handleStart(DiscoveryNetworkConnector.java:205)
org.apache.activemq.network.NetworkConnector$1.doStart(NetworkConnector.java:59)
org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)
org.apache.activemq.network.NetworkConnector.start(NetworkConnector.java:159)
org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2501)
org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:693)
org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:659)
org.apache.activemq.broker.BrokerService.start(BrokerService.java:595)
org.apache.activemq.JmsMultipleBrokersTestSupport.startAllBrokers(JmsMultipleBrokersTestSupport.java:277)
{noformat}

h1. VMTransports creation PATH 2
{noformat}
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.transport.vm.VMTransportServer$1.<init>(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.network.NetworkBridgeFactory.createLocalTransport(NetworkBridgeFactory.java:80)
org.apache.activemq.network.DemandForwardingBridgeSupport.start(DemandForwardingBridgeSupport.java:184)
org.apache.activemq.network.DiscoveryNetworkConnector.onServiceAdd(DiscoveryNetworkConnector.java:152)
org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent.start(SimpleDiscoveryAgent.java:89)
org.apache.activemq.network.DiscoveryNetworkConnector.handleStart(DiscoveryNetworkConnector.java:205)
org.apache.activemq.network.NetworkConnector$1.doStart(NetworkConnector.java:59)
org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)
org.apache.activemq.network.NetworkConnector.start(NetworkConnector.java:159)
org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2501)
org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:693)
org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:659)
org.apache.activemq.broker.BrokerService.start(BrokerService.java:595)
org.apache.activemq.JmsMultipleBrokersTestSupport.startAllBrokers(JmsMultipleBrokersTestSupport.java:277)
{noformat}

h1. VMTransports creation PATH 3
{noformat}
java.lang.Thread.getStackTrace(Unknown Source)
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.transport.vm.VMTransportServer$1.<init>(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.network.NetworkBridgeFactory.createLocalTransport(NetworkBridgeFactory.java:80)
org.apache.activemq.broker.TransportConnection.processBrokerInfo(TransportConnection.java:1321)
org.apache.activemq.command.BrokerInfo.visit(BrokerInfo.java:126)
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:61)
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
org.apache.activemq.transport.nio.NIOTransport.serviceRead(NIOTransport.java:138)
org.apache.activemq.transport.nio.NIOTransport$1.onSelect(NIOTransport.java:69)
org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
java.lang.Thread.run(Unknown Source)
{noformat}

h1. VMTransports creation PATH 3, take 2 (cyclic???)
{noformat}
org.apache.activemq.broker.TransportConnection.processBrokerInfo(TransportConnection.java:1338)
java.lang.Thread.getStackTrace(Unknown Source)
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.command.BrokerInfo.visit(BrokerInfo.java:126)            <------------------------------
!!!
org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:88)
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:61)
org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
org.apache.activemq.network.NetworkBridgeFactory.createLocalTransport(NetworkBridgeFactory.java:80)
org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
org.apache.activemq.broker.TransportConnection.processBrokerInfo(TransportConnection.java:1321)
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
org.apache.activemq.command.BrokerInfo.visit(BrokerInfo.java:126)             <------------------------------
!!!
org.apache.activemq.transport.nio.NIOTransport.serviceRead(NIOTransport.java:138)
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
org.apache.activemq.transport.nio.NIOTransport$1.onSelect(NIOTransport.java:69)
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:61)
org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
java.lang.Thread.run(Unknown Source)
{noformat}

So, as a tentative fix I tried to globally lock VMTransport usage under the assumption that
VM transport should be negligible in number with respect to other transports (tcp, nio, etc.)
and thus the static lock in VM trans. should not constitute a big performace bottleneck; I
added a new constructor in _*MutexTransport*_ in this way:
{code}
public class MutexTransport extends TransportFilter {
	// ...
	private final static ReentrantLock staticWriteLock = new ReentrantLock();
	private ReentrantLock writeLock = new ReentrantLock();
	// ...
	public MutexTransport(Transport next, boolean syncOnCommand, boolean useStaticLock) {
		this(next, syncOnCommand);
		if(useStaticLock)
			writeLock = staticWriteLock;
	}
{code}
and I modified the _*org.apache.activemq.transport.vm.VMTransportServer.configure(Transport)*_
implementation in this way:
{code}
public static Transport configure(Transport transport) {
    transport = new MutexTransport(transport, false, true); // use static locks for VMTransport
    transport = new ResponseCorrelator(transport);
    return transport;
}
{code}

Apparently this solve the problem (anyway I'm going to perform additional tests to double
check this). Is it in your opinion a reasonable approach or is it just a _quick and dirty_
solution?

> Cross talk over duplex network connection can lead to blocking (alternative take)
> ---------------------------------------------------------------------------------
>
>                 Key: AMQ-5260
>                 URL: https://issues.apache.org/jira/browse/AMQ-5260
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.9.0, 5.10.0
>            Reporter: matteo rulli
>         Attachments: AMQ5260AdvancedTest.java, AMQ_5260.patch, AMQ_5260_2.patch, AMQ_5260_3.patch,
deadlock.jpg, debug.jpg
>
>
> Pretty the same description with respect to AMQ-4328. 
> ----
> !deadlock.jpg!
> h2. Stacktraces:
> Stacktrace no.1:
> {noformat}
> Name: ActiveMQ NIO Worker 12
> State: BLOCKED on java.net.URI@1bae2b28 owned by: ActiveMQ Transport: tcp:///10.0.1.219:61616@57789
> Total blocked: 2  Total waited: 67
> Stack trace: 
>  org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:714)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:581)
> org.apache.activemq.network.DemandForwardingBridgeSupport$3.onCommand(DemandForwardingBridgeSupport.java:191)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
> org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> org.apache.activemq.transport.nio.NIOTransport.serviceRead(NIOTransport.java:138)
> org.apache.activemq.transport.nio.NIOTransport$1.onSelect(NIOTransport.java:69)
> org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
> org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> java.lang.Thread.run(Unknown Source)
> {noformat}
> ----
> stack trace no.2
> {noformat}
> Name: ActiveMQ Transport: tcp:///10.0.1.219:61616@57789
> State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@3cdbfa3e owned
by: ActiveMQ BrokerService[master2] Task-4
> Total blocked: 19  Total waited: 3
> Stack trace: 
>  sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(Unknown Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(Unknown Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(Unknown Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(Unknown Source)
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(Unknown Source)
> java.util.concurrent.locks.ReentrantLock.lock(Unknown Source)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66)
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1339)
> org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:858)
> org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:818)
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:151)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.vm.VMTransport.doDispatch(VMTransport.java:138)
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:127)
>    - locked java.util.concurrent.atomic.AtomicBoolean@689389da
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:104)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
> org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)
> org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)
> org.apache.activemq.network.DemandForwardingBridgeSupport.addSubscription(DemandForwardingBridgeSupport.java:856)
> org.apache.activemq.network.DemandForwardingBridgeSupport.addConsumerInfo(DemandForwardingBridgeSupport.java:1128)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:714)
>    - locked java.net.URI@1bae2b28
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:581)
> org.apache.activemq.network.DemandForwardingBridgeSupport$3.onCommand(DemandForwardingBridgeSupport.java:191)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.failover.FailoverTransport$3.onCommand(FailoverTransport.java:196)
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
> org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
> java.lang.Thread.run(Unknown Source)
> {noformat}
> ----
> stack trace no.3
> {noformat}
> Name: ActiveMQ BrokerService[master2] Task-4
> State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@717bb9c owned
by: ActiveMQ Transport: tcp:///10.0.1.219:61616@57789
> Total blocked: 19  Total waited: 117
> Stack trace: 
>  sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(Unknown Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(Unknown Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(Unknown Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(Unknown Source)
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(Unknown Source)
> java.util.concurrent.locks.ReentrantLock.lock(Unknown Source)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66)
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(DemandForwardingBridgeSupport.java:921)
> org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:173)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.vm.VMTransport.doDispatch(VMTransport.java:138)
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:127)
>    - locked java.util.concurrent.atomic.AtomicBoolean@453bb109
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:104)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1339)
> org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:858)
> org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:904)
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:129)
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:47)
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message