activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christian Posta <christian.po...@gmail.com>
Subject Re: [ActiveMQ 5.5.1] - Communication got stuck between Producer and Consumer
Date Wed, 25 Sep 2013 13:12:28 GMT
in-line i hope to at least explain what's possibly going on in each thread,
maybe that will be enough since you know your topology and
producer/consumer configs and use cases better that we do:


On Mon, Sep 23, 2013 at 7:17 PM, Jeff.Huang <super.reader.jeff@gmail.com>wrote:

> I have a product built upon ActiveMQ 5.5.1. System architecture looks like
> this:
>
>   ----------------------------------------
> ---------------------
> | Central Server ---> Central Broker | -------------- | Broker A1 <--- A1 |
>   ----------------------------------------
> ---------------------
>                                                         \------------- |
> Broker A2 <--- A2 |
>
> ---------------------
>                                                         \------------- |
> Broker A3 <--- A3 |
>
> ---------------------
>
> Central Broker talks to Central Server using vm://localhost, so as Ai to
> Broker Ai. And Broker Ai talks to Central Broker using tcp://<central
> server>:61616, duplex.
>
> Most of time, the cluster works fine. But sometimes, messages can't go
> through. That means, if A1 sends a message out, Central Server can't see
> it;
> if Central Server sends a message out, none of the A1, A2, or A3 can see
> it.
> We don't have Ai talk to Aj, so we can't tell whether they can see each
> other.
>
> Using netstat, we could confirm that the connections were established.
> Restarting Ai won't fix the problem. We have to restart the Central Broker.
>
> We saw this situation quite sometime, but since it is not reproducable, we
> don't know what is the root cause. But recently, we noticed that it
> happened
> with heavy work load, with and without producer control.
>
> By looking into the stack trace, I found this:
>
> "Producer Thread" prio=10 tid=0x00007fd97c016800 nid=0x47eb waiting on
> condition [0x00007fd9777f6000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x000000037eeecdd0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
>         at
> java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374)
>         at
>
> org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40)
>         at
>
> org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87)
>         at
>
> org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1284)
>         at
>
> org.apache.activemq.TransactionContext.syncSendPacketWithInterruptionHandling(TransactionContext.java:707)
>         at
> org.apache.activemq.TransactionContext.commit(TransactionContext.java:299)
>         at
> org.apache.activemq.ActiveMQSession.commit(ActiveMQSession.java:560)
>         at jeff.huang.Producer.commitBatch(Producer.java:480)
>

This looks like one of your producers? Trying to commit a tx. This is
supposed to be a synchronous operation on the client, and the client will
only unblock when it has received an ack from the broker. The broker will
not ack until the tx has been successfully committed. Are there exceptions
in your log? or have you run into producer flow control? Is this producer
sending to a topic or queue?


>
> This stack trace log led me to AMQ-1927:
>
> https://issues.apache.org/jira/browse/AMQ-1927
>
> But I can't tell this is exactly the same issue.
>
> There are other parts of the stack trace that I find interesting. Since it
> is a production system, I am sorry I can't simply copy and paste the whole
> stack trace here. But I really need some help to explain something.
>
> 1.
>
> "ActiveMQ Transport: tcp:///10.130.156.167:38407" daemon prio=10
> tid=0x00007fda08592000 nid=0x4f3c waiting for monitor entry
> [0x00007fd96d5d3000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at
> org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:423)
>         - waiting to lock <0x0000000402c800d0> (a
> org.apache.activemq.broker.region.Topic)
>         at org.apache.activemq.broker.region.Topic.send(Topic.java:404)
>         at
>
> org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:365)
>         at
> org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:523)
>         at
>
> org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:515)
>         at
>
> org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:446)
>         at
>
> org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:106)
>         at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89)
>         at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89)
>         at
>
> org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:95)
>         at
>
> org.apache.activemq.broker.util.LoggingBrokerPlugin.addConsumer(LoggingBrokerPlugin.java:187)
>         at
>
> org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:95)
>         at
>
> org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:546)
>         at
> org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:349)
>         at
>
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:306)
>         at
>
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
>         at
>
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
>         at
>
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
>         at
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:121)
>         at
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:112)
>         at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
>         - locked <0x000000028576f460> (a java.lang.Object)
>         at
>
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>         at
>
> org.apache.activemq.network.DemandForwardingBridgeSupport.addSubscription(DemandForwardingBridgeSupport.java:672)
>         at
>
> org.apache.activemq.network.DemandForwardingBridgeSupport.addConsumerInfo(DemandForwardingBridgeSupport.java:1025)
>         at
>
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:521)
>         at
>
> org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:165)
>         at
>
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
>         at
>
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
>         at
>
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
>         at
>
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:227)
>         - locked <0x0000000285687e40> (a
> org.apache.activemq.transport.InactivityMonitor$1)
>         at
>
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
>         at java.lang.Thread.run(Thread.java:722)
>
> From the above stack, I would like to guess
>

The broker has fired an advisory because a new consumer has been added. But
you're contended around the lock to send messages because some other thread
has the lock (dead lock possibly)?



>
> 1) TcpTransport polls out one message from 10.130.156.167, which is A1 in
> the topology diagram.
> 2) DemandForwardingBridgeSupport finds out it is an Advisory message, so it
> calls addConsumerInfo() to handle it.
> 3) It finds the way to the VMTransport and tries to dispath it.
> 4) But later on, it reaches LoggingBrokerPlugin.addConsumer(). We do
> configured LoggingBrokerPlugin in our system like this:
>
>         <plugins>
>             <loggingBrokerPlugin logAll="false" logConnectionEvents="true"
>                                  logMessageEvents="true" />
>         </plugins>
>
> 5) After a dozen of hop, it finally reaches Topic.send() and wait for the
> mutex lock to send out a message.
>
> I am not familiar with Advisory messages. But if this is a message that A1
> advises Central Broker to add some information, which should the Central
> Broker send it out again?
>
> 2.
>
> "ActiveMQ Transport: tcp:///10.130.156.173:48405" daemon prio=10
> tid=0x00007fd9d800a000 nid=0x4283 waiting for monitor entry
> [0x00007fdb3ccca000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:39)
>         - waiting to lock <0x0000000443b38e40> (a java.lang.Object)
>         at
>
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>         at
>
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1270)
>         at
>
> org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:815)
>         at
>
> org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:775)
>         at
>
> org.apache.activemq.broker.region.TopicSubscription.dispatch(TopicSubscription.java:525)
>         at
>
> org.apache.activemq.broker.region.TopicSubscription.add(TopicSubscription.java:99)
>         at
>
> org.apache.activemq.broker.region.policy.SimpleDispatchPolicy.dispatch(SimpleDispatchPolicy.java:48)
>         at org.apache.activemq.broker.region.Topic.dispatch(Topic.java:634)
>         at
> org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:468)
>         - locked <0x0000000402c800d0> (a
> org.apache.activemq.broker.region.Topic)
>         at org.apache.activemq.broker.region.Topic.send(Topic.java:404)
>         at
>
> org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:365)
>         at
> org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:523)
>         at
>
> org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:515)
>         at
>
> org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:446)
>         at
>
> org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:441)
>         at
>
> org.apache.activemq.advisory.AdvisoryBroker.addDestinationInfo(AdvisoryBroker.java:166)
>         at
>
> org.apache.activemq.broker.BrokerFilter.addDestinationInfo(BrokerFilter.java:217)
>         at
>
> org.apache.activemq.broker.BrokerFilter.addDestinationInfo(BrokerFilter.java:217)
>         at
>
> org.apache.activemq.broker.MutableBrokerFilter.addDestinationInfo(MutableBrokerFilter.java:223)
>         at
>
> org.apache.activemq.broker.util.LoggingBrokerPlugin.addDestinationInfo(LoggingBrokerPlugin.java:465)
>         at
>
> org.apache.activemq.broker.MutableBrokerFilter.addDestinationInfo(MutableBrokerFilter.java:223)
>         at
>
> org.apache.activemq.broker.TransportConnection.processAddDestination(TransportConnection.java:480)
>         at
> org.apache.activemq.command.DestinationInfo.visit(DestinationInfo.java:122)
>         at
>
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:306)
>         at
>
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
>         at
>
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
>         at
>
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
>         at
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:121)
>         at
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:112)
>         at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
>         - locked <0x0000000454dcaeb8> (a java.lang.Object)
>         at
>
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>         at
>
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:644)
>         at
>
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:497)
>         at
>
> org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:165)
>         at
>
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
>         at
>
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
>         at
>
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
>         at
>
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:227)
>         - locked <0x0000000454dcc760> (a
> org.apache.activemq.transport.InactivityMonitor$1)
>         at
>
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
>         at java.lang.Thread.run(Thread.java:722)
>

The network bridge saw an "add destination" advisory, so added it, and is
now trying to fire another advisory. This is the thread that's blocking the
previous thread as it's having trouble sending.Maybe another thread is
using this same transport and is in the middle of a write?


>
> This is another piece of stack that related to part 1. Part 2 was the place
> that locked part 1 for lock id 0x0000000402c800d0.
>
> I can see that part 2 is basically the same as part 1 except two things:
>
> 1) It tries to "addDestinationInfo" instead of "addConsumerInfo".
> 2) It successfully acquired the lock but wait for another one:
> 0x0000000443b38e40.
>
> 3.
>
> "BrokerService[localhost] Task-18298" daemon prio=10 tid=0x00007fda0858e000
> nid=0x4fce waiting on condition [0x00007fd95dbdb000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000004845e8888> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
>         at
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
>         at
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:94)
>         at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
>         - locked <0x0000000443b38e40> (a java.lang.Object)
>         at
>
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>         at
>
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1270)
>         at
>
> org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:815)
>         at
>
> org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:851)
>         at
>
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:127)
>         at
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:722)
>

This thread is trying to put a message onto its vm-queue as you've noted.



>
> By searching the lock id, I found part 3. It is not difficult to locate the
> source code here:
>
>             if (peer.started) {
>                 if (peer.async) {
>                     peer.getMessageQueue().put(command);
>                     peer.wakeup();
>                 } else {
>                     transportListener = peer.transportListener;
>                 }
>
> And it was blocked at peer.getMessageQueue().put(command). Obviously, it is
> waiting for the notFull condition, which means that the BlockingQueue was
> full. And here is the place that the blocking queue was created:
>
>     private LinkedBlockingQueue getMessageQueue() {
>         synchronized (lazyInitMutext) {
>             if (messageQueue == null) {
>                 messageQueue = new
> LinkedBlockingQueue(this.asyncQueueDepth);
>             }
>             return messageQueue;
>         }
>     }
>
> It is a queue of 2000 by default.
>
> I can't find other direct related parts in my stack trace, although there
> are others like this:
>
> "ActiveMQ Session Task-75724" prio=10 tid=0x00007fda086ec000 nid=0x5007
> waiting for monitor entry [0x00007fd95afaf000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:39)
>         - waiting to lock <0x00000004439b2d10> (a java.lang.Object)
>         at
>
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>         at
>
> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1265)
>         at
>
> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1259)
>         at
>
> org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1863)
>         at
> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2029)
>         at
> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2024)
>         at
>
> org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:871)
>         - locked <0x0000000439262a88> (a java.util.LinkedList)
>         at
>
> org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:1232)
>         - locked <0x00000004392571c8> (a java.lang.Object)
>         at
>
> org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:134)
>         at
>
> org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:205)
>         at
>
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:127)
>         at
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:722)
>
> With the above information, I would like to guess a possible reason:
>
> Is it possible that my system somehow filled up VMTransport.messageQueue,
> then blocked all Advisory messages. With all the advisory messages blocked,
> now new consumers could be added into the system, which in turn caused the
> messageQueue blocked forever?
>
> If this is true, I got another question: why did the system need NEW
> consumer in the first place? When the system was running, consumers were
> supposed at their positions. Those messages were supposed to be consumed
> eventually.
>
> Is it possible that the problem was caused by dynamic Queues of dynamic
> Topics? With dynamic Queues of dynamic Topics, the system has to wait for a
> consumer to bind. However, for some reason, the Queue of the Topic were
> flooded before the advisory message arrived at. Then the Queue or Topic
> will
> be blocked forever, which in turn blocked VMTransport, and then blocked the
> whole cluster.
>
> I am not an ActiveMQ expert and I am not familiar with the implementation.
> Those guess could be far away from being correct. But please give me some
> advice and help me to find the correct way to solve the problem.
>
> Thank you very much.
>
> Jeff
>
>
>
>
>
>
>
>
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/ActiveMQ-5-5-1-Communication-got-stuck-between-Producer-and-Consumer-tp4671756.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message