Return-Path: X-Original-To: apmail-activemq-users-archive@www.apache.org Delivered-To: apmail-activemq-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AEEBD10EDC for ; Tue, 24 Sep 2013 17:47:47 +0000 (UTC) Received: (qmail 39205 invoked by uid 500); 24 Sep 2013 17:47:46 -0000 Delivered-To: apmail-activemq-users-archive@activemq.apache.org Received: (qmail 39087 invoked by uid 500); 24 Sep 2013 17:47:45 -0000 Mailing-List: contact users-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@activemq.apache.org Delivered-To: mailing list users@activemq.apache.org Received: (qmail 39079 invoked by uid 99); 24 Sep 2013 17:47:43 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Sep 2013 17:47:43 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,NORMAL_HTTP_TO_IP,RCVD_IN_DNSWL_LOW,SPF_PASS,WEIRD_PORT X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of super.reader.jeff@gmail.com designates 209.85.216.195 as permitted sender) Received: from [209.85.216.195] (HELO mail-qc0-f195.google.com) (209.85.216.195) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Sep 2013 17:47:34 +0000 Received: by mail-qc0-f195.google.com with SMTP id u18so1198857qcx.10 for ; Tue, 24 Sep 2013 10:47:13 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:date:message-id:subject:from:to:content-type; bh=+2SJIxEzd1UqQALovLqb7WOQoMRHlWGG7VI7YPJCFGs=; b=cUZk0JMe/6FHvZB3bJZi46r4n8ESCxz1iacPuGsZAyigl3hAjfiJ7F/0TmFYPf/HJj QoINj6UZT09c/AZ/RiiHy+u+VYR22IzAXFBIqTrT+B4GiU9SLXrAV/WHFQ5POv2MURvG aJETk0gKsuTtbNzX4c9wK1znjrOCgkRYWq/3S1HKsIuOPZTnQ3afApKhqhSd7p4wMr63 4+JufbCmvTIqk0P8gZnoa2a/deudOhi837hg+eJB5pWdDpAifHfRe7KThIlUPKSytvkb /W2L1vmBfhnCvvdDGQX/c/pHZaTEyh/0p9n9kwzXrQEW9n/ijoNHBmg/D2+k65fqvzLy JZBg== MIME-Version: 1.0 X-Received: by 10.49.27.226 with SMTP id w2mr27632250qeg.32.1380044829315; Tue, 24 Sep 2013 10:47:09 -0700 (PDT) Received: by 10.229.29.69 with HTTP; Tue, 24 Sep 2013 10:47:09 -0700 (PDT) Date: Tue, 24 Sep 2013 13:47:09 -0400 Message-ID: Subject: [ActiveMQ 5.5.1] - Communication got stuck between Producer and Consumer From: Jeff Huang To: users@activemq.apache.org Content-Type: multipart/alternative; boundary=047d7bdc0c72d35b2604e724b9e6 X-Virus-Checked: Checked by ClamAV on apache.org --047d7bdc0c72d35b2604e724b9e6 Content-Type: text/plain; charset=ISO-8859-1 I have a product built upon ActiveMQ 5.5.1. System architecture looks like this: ---------------------------------------- --------------------- | Central Server ---> Central Broker | -------------- | Broker A1 <--- A1 | ---------------------------------------- --------------------- \------------- | Broker A2 <--- A2 | --------------------- \------------- | Broker A3 <--- A3 | --------------------- Central Broker talks to Central Server using vm://localhost, so as Ai to Broker Ai. And Broker Ai talks to Central Broker using tcp://:61616, duplex. Most of time, the cluster works fine. But sometimes, messages can't go through. That means, if A1 sends a message out, Central Server can't see it; if Central Server sends a message out, none of the A1, A2, or A3 can see it. We don't have Ai talk to Aj, so we can't tell whether they can see each other. Using netstat, we could confirm that the connections were established. Restarting Ai won't fix the problem. We have to restart the Central Broker. We saw this situation quite sometime, but since it is not reproducable, we don't know what is the root cause. But recently, we noticed that it happened with heavy work load, with and without producer control. By looking into the stack trace, I found this: "Producer Thread" prio=10 tid=0x00007fd97c016800 nid=0x47eb waiting on condition [0x00007fd9777f6000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000037eeecdd0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374) at org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40) at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87) at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1284) at org.apache.activemq.TransactionContext.syncSendPacketWithInterruptionHandling(TransactionContext.java:707) at org.apache.activemq.TransactionContext.commit(TransactionContext.java:299) at org.apache.activemq.ActiveMQSession.commit(ActiveMQSession.java:560) at jeff.huang.Producer.commitBatch(Producer.java:480) This stack trace log led me to AMQ-1927: https://issues.apache.org/jira/browse/AMQ-1927 But I can't tell this is exactly the same issue. There are other parts of the stack trace that I find interesting. Since it is a production system, I am sorry I can't simply copy and paste the whole stack trace here. But I really need some help to explain something. 1. "ActiveMQ Transport: tcp:///10.130.156.167:38407" daemon prio=10 tid=0x00007fda08592000 nid=0x4f3c waiting for monitor entry [0x00007fd96d5d3000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:423) - waiting to lock <0x0000000402c800d0> (a org.apache.activemq.broker.region.Topic) at org.apache.activemq.broker.region.Topic.send(Topic.java:404) at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:365) at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:523) at org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:515) at org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:446) at org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:106) at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89) at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89) at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:95) at org.apache.activemq.broker.util.LoggingBrokerPlugin.addConsumer(LoggingBrokerPlugin.java:187) at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:95) at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:546) at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:349) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:306) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179) at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69) at org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:121) at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:112) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) - locked <0x000000028576f460> (a java.lang.Object) at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) at org.apache.activemq.network.DemandForwardingBridgeSupport.addSubscription(DemandForwardingBridgeSupport.java:672) at org.apache.activemq.network.DemandForwardingBridgeSupport.addConsumerInfo(DemandForwardingBridgeSupport.java:1025) at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:521) at org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:165) at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113) at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:227) - locked <0x0000000285687e40> (a org.apache.activemq.transport.InactivityMonitor$1) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202) at java.lang.Thread.run(Thread.java:722) >From the above stack, I would like to guess 1) TcpTransport polls out one message from 10.130.156.167, which is A1 in the topology diagram. 2) DemandForwardingBridgeSupport finds out it is an Advisory message, so it calls addConsumerInfo() to handle it. 3) It finds the way to the VMTransport and tries to dispath it. 4) But later on, it reaches LoggingBrokerPlugin.addConsumer(). We do configured LoggingBrokerPlugin in our system like this: 5) After a dozen of hop, it finally reaches Topic.send() and wait for the mutex lock to send out a message. I am not familiar with Advisory messages. But if this is a message that A1 advises Central Broker to add some information, which should the Central Broker send it out again? 2. "ActiveMQ Transport: tcp:///10.130.156.173:48405" daemon prio=10 tid=0x00007fd9d800a000 nid=0x4283 waiting for monitor entry [0x00007fdb3ccca000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:39) - waiting to lock <0x0000000443b38e40> (a java.lang.Object) at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1270) at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:815) at org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:775) at org.apache.activemq.broker.region.TopicSubscription.dispatch(TopicSubscription.java:525) at org.apache.activemq.broker.region.TopicSubscription.add(TopicSubscription.java:99) at org.apache.activemq.broker.region.policy.SimpleDispatchPolicy.dispatch(SimpleDispatchPolicy.java:48) at org.apache.activemq.broker.region.Topic.dispatch(Topic.java:634) at org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:468) - locked <0x0000000402c800d0> (a org.apache.activemq.broker.region.Topic) at org.apache.activemq.broker.region.Topic.send(Topic.java:404) at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:365) at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:523) at org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:515) at org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:446) at org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:441) at org.apache.activemq.advisory.AdvisoryBroker.addDestinationInfo(AdvisoryBroker.java:166) at org.apache.activemq.broker.BrokerFilter.addDestinationInfo(BrokerFilter.java:217) at org.apache.activemq.broker.BrokerFilter.addDestinationInfo(BrokerFilter.java:217) at org.apache.activemq.broker.MutableBrokerFilter.addDestinationInfo(MutableBrokerFilter.java:223) at org.apache.activemq.broker.util.LoggingBrokerPlugin.addDestinationInfo(LoggingBrokerPlugin.java:465) at org.apache.activemq.broker.MutableBrokerFilter.addDestinationInfo(MutableBrokerFilter.java:223) at org.apache.activemq.broker.TransportConnection.processAddDestination(TransportConnection.java:480) at org.apache.activemq.command.DestinationInfo.visit(DestinationInfo.java:122) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:306) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179) at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69) at org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:121) at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:112) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) - locked <0x0000000454dcaeb8> (a java.lang.Object) at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:644) at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:497) at org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:165) at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113) at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:227) - locked <0x0000000454dcc760> (a org.apache.activemq.transport.InactivityMonitor$1) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202) at java.lang.Thread.run(Thread.java:722) This is another piece of stack that related to part 1. Part 2 was the place that locked part 1 for lock id 0x0000000402c800d0. I can see that part 2 is basically the same as part 1 except two things: 1) It tries to "addDestinationInfo" instead of "addConsumerInfo". 2) It successfully acquired the lock but wait for another one: 0x0000000443b38e40. 3. "BrokerService[localhost] Task-18298" daemon prio=10 tid=0x00007fda0858e000 nid=0x4fce waiting on condition [0x00007fd95dbdb000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000004845e8888> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:94) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) - locked <0x0000000443b38e40> (a java.lang.Object) at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1270) at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:815) at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:851) at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:127) at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) By searching the lock id, I found part 3. It is not difficult to locate the source code here: if (peer.started) { if (peer.async) { peer.getMessageQueue().put(command); peer.wakeup(); } else { transportListener = peer.transportListener; } And it was blocked at peer.getMessageQueue().put(command). Obviously, it is waiting for the notFull condition, which means that the BlockingQueue was full. And here is the place that the blocking queue was created: private LinkedBlockingQueue getMessageQueue() { synchronized (lazyInitMutext) { if (messageQueue == null) { messageQueue = new LinkedBlockingQueue(this.asyncQueueDepth); } return messageQueue; } } It is a queue of 2000 by default. I can't find other direct related parts in my stack trace, although there are others like this: "ActiveMQ Session Task-75724" prio=10 tid=0x00007fda086ec000 nid=0x5007 waiting for monitor entry [0x00007fd95afaf000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:39) - waiting to lock <0x00000004439b2d10> (a java.lang.Object) at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1265) at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1259) at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1863) at org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2029) at org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2024) at org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:871) - locked <0x0000000439262a88> (a java.util.LinkedList) at org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:1232) - locked <0x00000004392571c8> (a java.lang.Object) at org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:134) at org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:205) at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:127) at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) With the above information, I would like to guess a possible reason: Is it possible that my system somehow filled up VMTransport.messageQueue, then blocked all Advisory messages. With all the advisory messages blocked, now new consumers could be added into the system, which in turn caused the messageQueue blocked forever? If this is true, I got another question: why did the system need NEW consumer in the first place? When the system was running, consumers were supposed at their positions. Those messages were supposed to be consumed eventually. Is it possible that the problem was caused by dynamic Queues of dynamic Topics? With dynamic Queues of dynamic Topics, the system has to wait for a consumer to bind. However, for some reason, the Queue of the Topic were flooded before the advisory message arrived at. Then the Queue or Topic will be blocked forever, which in turn blocked VMTransport, and then blocked the whole cluster. I am not an ActiveMQ expert and I am not familiar with the implementation. Those guess could be far away from being correct. But please give me some advice and help me to find the correct way to solve the problem. Thank you very much. Jeff --047d7bdc0c72d35b2604e724b9e6--