activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeff Huang <super.reader.j...@gmail.com>
Subject [ActiveMQ 5.5.1] - Communication got stuck between Producer and Consumer
Date Tue, 24 Sep 2013 17:47:09 GMT
I have a product built upon ActiveMQ 5.5.1. System architecture looks like
this:

  ----------------------------------------
 ---------------------
| Central Server ---> Central Broker | -------------- | Broker A1 <--- A1 |
  ----------------------------------------
 ---------------------
                                                        \------------- |
Broker A2 <--- A2 |

 ---------------------
                                                        \------------- |
Broker A3 <--- A3 |

 ---------------------

Central Broker talks to Central Server using vm://localhost, so as Ai to
Broker Ai. And Broker Ai talks to Central Broker using tcp://<central
server>:61616, duplex.

Most of time, the cluster works fine. But sometimes, messages can't go
through. That means, if A1 sends a message out, Central Server can't see
it; if Central Server sends a message out, none of the A1, A2, or A3 can
see it. We don't have Ai talk to Aj, so we can't tell whether they can see
each other.

Using netstat, we could confirm that the connections were established.
Restarting Ai won't fix the problem. We have to restart the Central Broker.

We saw this situation quite sometime, but since it is not reproducable, we
don't know what is the root cause. But recently, we noticed that it
happened with heavy work load, with and without producer control.

By looking into the stack trace, I found this:

"Producer Thread" prio=10 tid=0x00007fd97c016800 nid=0x47eb waiting on
condition [0x00007fd9777f6000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000037eeecdd0> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
        at
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374)
        at
org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40)
        at
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87)
        at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1284)
        at
org.apache.activemq.TransactionContext.syncSendPacketWithInterruptionHandling(TransactionContext.java:707)
        at
org.apache.activemq.TransactionContext.commit(TransactionContext.java:299)
        at
org.apache.activemq.ActiveMQSession.commit(ActiveMQSession.java:560)
        at jeff.huang.Producer.commitBatch(Producer.java:480)

This stack trace log led me to AMQ-1927:

https://issues.apache.org/jira/browse/AMQ-1927

But I can't tell this is exactly the same issue.

There are other parts of the stack trace that I find interesting. Since it
is a production system, I am sorry I can't simply copy and paste the whole
stack trace here. But I really need some help to explain something.

1.

"ActiveMQ Transport: tcp:///10.130.156.167:38407" daemon prio=10
tid=0x00007fda08592000 nid=0x4f3c waiting for monitor entry
[0x00007fd96d5d3000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at
org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:423)
        - waiting to lock <0x0000000402c800d0> (a
org.apache.activemq.broker.region.Topic)
        at org.apache.activemq.broker.region.Topic.send(Topic.java:404)
        at
org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:365)
        at
org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:523)
        at
org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:515)
        at
org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:446)
        at
org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:106)
        at
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89)
        at
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89)
        at
org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:95)
        at
org.apache.activemq.broker.util.LoggingBrokerPlugin.addConsumer(LoggingBrokerPlugin.java:187)
        at
org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:95)
        at
org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:546)
        at
org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:349)
        at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:306)
        at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
        at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
        at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
        at
org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:121)
        at
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:112)
        at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
        - locked <0x000000028576f460> (a java.lang.Object)
        at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
        at
org.apache.activemq.network.DemandForwardingBridgeSupport.addSubscription(DemandForwardingBridgeSupport.java:672)
        at
org.apache.activemq.network.DemandForwardingBridgeSupport.addConsumerInfo(DemandForwardingBridgeSupport.java:1025)
        at
org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:521)
        at
org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:165)
        at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
        at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
        at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
        at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:227)
        - locked <0x0000000285687e40> (a
org.apache.activemq.transport.InactivityMonitor$1)
        at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
        at java.lang.Thread.run(Thread.java:722)

>From the above stack, I would like to guess

1) TcpTransport polls out one message from 10.130.156.167, which is A1 in
the topology diagram.
2) DemandForwardingBridgeSupport finds out it is an Advisory message, so it
calls addConsumerInfo() to handle it.
3) It finds the way to the VMTransport and tries to dispath it.
4) But later on, it reaches LoggingBrokerPlugin.addConsumer(). We do
configured LoggingBrokerPlugin in our system like this:

        <plugins>
            <loggingBrokerPlugin logAll="false" logConnectionEvents="true"
                                 logMessageEvents="true" />
        </plugins>

5) After a dozen of hop, it finally reaches Topic.send() and wait for the
mutex lock to send out a message.

I am not familiar with Advisory messages. But if this is a message that A1
advises Central Broker to add some information, which should the Central
Broker send it out again?

2.

"ActiveMQ Transport: tcp:///10.130.156.173:48405" daemon prio=10
tid=0x00007fd9d800a000 nid=0x4283 waiting for monitor entry
[0x00007fdb3ccca000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:39)
        - waiting to lock <0x0000000443b38e40> (a java.lang.Object)
        at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
        at
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1270)
        at
org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:815)
        at
org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:775)
        at
org.apache.activemq.broker.region.TopicSubscription.dispatch(TopicSubscription.java:525)
        at
org.apache.activemq.broker.region.TopicSubscription.add(TopicSubscription.java:99)
        at
org.apache.activemq.broker.region.policy.SimpleDispatchPolicy.dispatch(SimpleDispatchPolicy.java:48)
        at org.apache.activemq.broker.region.Topic.dispatch(Topic.java:634)
        at
org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:468)
        - locked <0x0000000402c800d0> (a
org.apache.activemq.broker.region.Topic)
        at org.apache.activemq.broker.region.Topic.send(Topic.java:404)
        at
org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:365)
        at
org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:523)
        at
org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:515)
        at
org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:446)
        at
org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:441)
        at
org.apache.activemq.advisory.AdvisoryBroker.addDestinationInfo(AdvisoryBroker.java:166)
        at
org.apache.activemq.broker.BrokerFilter.addDestinationInfo(BrokerFilter.java:217)
        at
org.apache.activemq.broker.BrokerFilter.addDestinationInfo(BrokerFilter.java:217)
        at
org.apache.activemq.broker.MutableBrokerFilter.addDestinationInfo(MutableBrokerFilter.java:223)
        at
org.apache.activemq.broker.util.LoggingBrokerPlugin.addDestinationInfo(LoggingBrokerPlugin.java:465)
        at
org.apache.activemq.broker.MutableBrokerFilter.addDestinationInfo(MutableBrokerFilter.java:223)
        at
org.apache.activemq.broker.TransportConnection.processAddDestination(TransportConnection.java:480)
        at
org.apache.activemq.command.DestinationInfo.visit(DestinationInfo.java:122)
        at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:306)
        at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
        at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
        at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
        at
org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:121)
        at
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:112)
        at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
        - locked <0x0000000454dcaeb8> (a java.lang.Object)
        at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
        at
org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:644)
        at
org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:497)
        at
org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:165)
        at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
        at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
        at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
        at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:227)
        - locked <0x0000000454dcc760> (a
org.apache.activemq.transport.InactivityMonitor$1)
        at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
        at java.lang.Thread.run(Thread.java:722)

This is another piece of stack that related to part 1. Part 2 was the place
that locked part 1 for lock id 0x0000000402c800d0.

I can see that part 2 is basically the same as part 1 except two things:

1) It tries to "addDestinationInfo" instead of "addConsumerInfo".
2) It successfully acquired the lock but wait for another one:
0x0000000443b38e40.

3.

"BrokerService[localhost] Task-18298" daemon prio=10 tid=0x00007fda0858e000
nid=0x4fce waiting on condition [0x00007fd95dbdb000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000004845e8888> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
        at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
        at
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:94)
        at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
        - locked <0x0000000443b38e40> (a java.lang.Object)
        at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
        at
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1270)
        at
org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:815)
        at
org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:851)
        at
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:127)
        at
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722)

By searching the lock id, I found part 3. It is not difficult to locate the
source code here:

            if (peer.started) {
                if (peer.async) {
                    peer.getMessageQueue().put(command);
                    peer.wakeup();
                } else {
                    transportListener = peer.transportListener;
                }

And it was blocked at peer.getMessageQueue().put(command). Obviously, it is
waiting for the notFull condition, which means that the BlockingQueue was
full. And here is the place that the blocking queue was created:

    private LinkedBlockingQueue getMessageQueue() {
        synchronized (lazyInitMutext) {
            if (messageQueue == null) {
                messageQueue = new
LinkedBlockingQueue(this.asyncQueueDepth);
            }
            return messageQueue;
        }
    }

It is a queue of 2000 by default.

I can't find other direct related parts in my stack trace, although there
are others like this:

"ActiveMQ Session Task-75724" prio=10 tid=0x00007fda086ec000 nid=0x5007
waiting for monitor entry [0x00007fd95afaf000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:39)
        - waiting to lock <0x00000004439b2d10> (a java.lang.Object)
        at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
        at
org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1265)
        at
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1259)
        at
org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1863)
        at
org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2029)
        at
org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2024)
        at
org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:871)
        - locked <0x0000000439262a88> (a java.util.LinkedList)
        at
org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:1232)
        - locked <0x00000004392571c8> (a java.lang.Object)
        at
org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:134)
        at
org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:205)
        at
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:127)
        at
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722)

With the above information, I would like to guess a possible reason:

Is it possible that my system somehow filled up VMTransport.messageQueue,
then blocked all Advisory messages. With all the advisory messages blocked,
now new consumers could be added into the system, which in turn caused the
messageQueue blocked forever?

If this is true, I got another question: why did the system need NEW
consumer in the first place? When the system was running, consumers were
supposed at their positions. Those messages were supposed to be consumed
eventually.

Is it possible that the problem was caused by dynamic Queues of dynamic
Topics? With dynamic Queues of dynamic Topics, the system has to wait for a
consumer to bind. However, for some reason, the Queue of the Topic were
flooded before the advisory message arrived at. Then the Queue or Topic
will be blocked forever, which in turn blocked VMTransport, and then
blocked the whole cluster.

I am not an ActiveMQ expert and I am not familiar with the implementation.
Those guess could be far away from being correct. But please give me some
advice and help me to find the correct way to solve the problem.

Thank you very much.

Jeff

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message