activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Manuel Teira <mte...@tid.es>
Subject Re: Broker get stuck with high amount of persistent messages for a given destination
Date Tue, 03 Nov 2009 07:45:33 GMT
Hello again.
Let me insist on this problem, since it's hitting our production 
environment every other day. Anytime that, for whatever reason, one of 
the system queues goes over ~60.000 pending persistent messages, we are 
not able to restart it properly, suffering the problems I've tried to 
detail in the former message.
Could you be so kind to take a look on it? If you need any further 
information, please, make me know.

Best regards.


Manuel Teira escribió:
> Hello.
>
> We are embedding an activemq 4.2 broker in our application, running on a 
> Sun JVM 5, and using an Oracle 10g database as persistent messaging 
> store (without journal, since we are using a JDBC master-slave cluster).
> In a pair of occasions, we were unable to get the broker started in our 
> production environment. Analyzing the situation, it seems that the cause 
> was the great amount of persistent messages for a given queue (over 
> 75.000), what was blocking the attempt to create consumers on that 
> destination.
>
> Trying to reproduce the problem with a more up to date activemq version, 
> we set up a 5.2.0 standalone broker to resemble our scenario, and a 
> client that tries to create a set of consumers on the loaded queue. We 
> exported the table with messages from our production environment and 
> filled the test scenario database with it. I'm attaching both the 
> activemq configuration and the client used for testing.
>
> What we found was:
>
> The broker started normally. Since no attempt to consume messages from 
> the "loaded" destination was performed.
> Once the client started, we observed that:
>  - We tried to start 10 consumers on the queue.  Only one of those 
> consumers got the session.createConsumer(), others were blocked (note 
> that all of them are sharing the same connection)
>  - The only started consumer was unable to get any message, it was 
> blocked in the getMessage() attempt.
>  - The broker seems to be trying to load all the messages from the 
> database, showing the following  stack dump (just the involved threads):
>
> "ActiveMQ Transport: tcp:///127.0.0.1:33554" daemon prio=4 
> tid=0x00ac2ef8 nid=0x3c waiting on condition [0xaae7e000..0xaae7fbf0]
>         at sun.misc.Unsafe.park(Native Method)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:118)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:681)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:711)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1041)
>         at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:184)
>         at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:256)
>         at 
> org.apache.activemq.broker.region.Queue.addSubscription(Queue.java:217)
>         at 
> org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:275)
>         - locked <0xe35bbb40> (a java.lang.Object)
>         at 
> org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:372)
>         at 
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>         at 
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>         at 
> org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:83)
>         at 
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>         at 
> org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:93)
>         at 
> org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:541)
>         at 
> org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:345)
>         at 
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
>         at 
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
>         at 
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>         at 
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
>         at 
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
>         - locked <0xb8a0d5d0> (a 
> org.apache.activemq.transport.InactivityMonitor$1)
>         at 
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>         at 
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
>         at 
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
>         at java.lang.Thread.run(Thread.java:595)
>
> "QueueThread:queue://TaskManagerQueue" daemon prio=10 tid=0x005e1bd8 
> nid=0x42 runnable [0xaac7e000..0xaac7faf0]
>         at oracle.jdbc.driver.T2CStatement.t2cDefineFetch(Native Method)
>         at 
> oracle.jdbc.driver.T2CPreparedStatement.doDefineFetch(T2CPreparedStatement.java:827)
>         at 
> oracle.jdbc.driver.T2CPreparedStatement.executeForRows(T2CPreparedStatement.java:768)
>         at 
> oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1062)
>         at 
> oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1132)
>         at 
> oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3285)
>         at 
> oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3329)
>         - locked <0xe37a09a0> (a oracle.jdbc.driver.T2CPreparedStatement)
>         - locked <0xe379c398> (a oracle.jdbc.driver.T2CConnection)
>         at 
> org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
>         at 
> org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
>         at 
> org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecoverNextMessages(DefaultJDBCAdapter.java:709)
>         at 
> org.apache.activemq.store.jdbc.JDBCMessageStore.recoverNextMessages(JDBCMessageStore.java:230)
>         at 
> org.apache.activemq.store.ProxyMessageStore.recoverNextMessages(ProxyMessageStore.java:83)
>         at 
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch.doFillBatch(QueueStorePrefetch.java:75)
>         at 
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:227)
>         - locked <0xb8a12f30> (a 
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch)
>         at 
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:100)
>         at 
> org.apache.activemq.broker.region.cursors.StoreQueueCursor.reset(StoreQueueCursor.java:157)
>         - locked <0xb8a13648> (a 
> org.apache.activemq.broker.region.cursors.StoreQueueCursor)
>         at org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1179)
>         - locked <0xb8a13648> (a 
> org.apache.activemq.broker.region.cursors.StoreQueueCursor)
>         at 
> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:1308)
>         at org.apache.activemq.broker.region.Queue.iterate(Queue.java:1011)
>         - locked <0xb8a13748> (a org.apache.activemq.broker.region.Queue$2)
>         at 
> org.apache.activemq.thread.DeterministicTaskRunner.runTask(DeterministicTaskRunner.java:84)
>         at 
> org.apache.activemq.thread.DeterministicTaskRunner$1.run(DeterministicTaskRunner.java:41)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
>         at java.lang.Thread.run(Thread.java:595)
>  
>
> We waited for 9 or 10 hours without apparent changes: Only one consumer 
> got, the other threads waiting, and no message consumed.
> In the production environment, the only way to recover the situation we 
> found, was to move message from ACTIVEMQ_MSGS to another table, start 
> the broker with ACTIVEMQ_MSGS empty, and create an application to 
> deserialize messages from the table and inject them to the queue using 
> the JMS API.
>
> We consider that over 10 hours of service unavailability is a serious 
> problem. The problem seems to be related with the broker trying to get 
> all the messages from the table at once whenever a consumer for a given 
> destination is created. Is this a known problem? Is there any way to 
> improve the situation?
>
> Best regards.
>
>
> --
> Manuel.
>
>   


Mime
View raw message