activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Manuel Teira <mte...@tid.es>
Subject Broker get stuck with high amount of persistent messages for a given destination
Date Fri, 16 Oct 2009 10:29:50 GMT
Hello.

We are embedding an activemq 4.2 broker in our application, running on a 
Sun JVM 5, and using an Oracle 10g database as persistent messaging 
store (without journal, since we are using a JDBC master-slave cluster).
In a pair of occasions, we were unable to get the broker started in our 
production environment. Analyzing the situation, it seems that the cause 
was the great amount of persistent messages for a given queue (over 
75.000), what was blocking the attempt to create consumers on that 
destination.

Trying to reproduce the problem with a more up to date activemq version, 
we set up a 5.2.0 standalone broker to resemble our scenario, and a 
client that tries to create a set of consumers on the loaded queue. We 
exported the table with messages from our production environment and 
filled the test scenario database with it. I'm attaching both the 
activemq configuration and the client used for testing.

What we found was:

The broker started normally. Since no attempt to consume messages from 
the "loaded" destination was performed.
Once the client started, we observed that:
 - We tried to start 10 consumers on the queue.  Only one of those 
consumers got the session.createConsumer(), others were blocked (note 
that all of them are sharing the same connection)
 - The only started consumer was unable to get any message, it was 
blocked in the getMessage() attempt.
 - The broker seems to be trying to load all the messages from the 
database, showing the following  stack dump (just the involved threads):

"ActiveMQ Transport: tcp:///127.0.0.1:33554" daemon prio=4 
tid=0x00ac2ef8 nid=0x3c waiting on condition [0xaae7e000..0xaae7fbf0]
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:118)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:681)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:711)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1041)
        at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:184)
        at 
java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:256)
        at 
org.apache.activemq.broker.region.Queue.addSubscription(Queue.java:217)
        at 
org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:275)
        - locked <0xe35bbb40> (a java.lang.Object)
        at 
org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:372)
        at 
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
        at 
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
        at 
org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:83)
        at 
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
        at 
org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:93)
        at 
org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:541)
        at 
org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:345)
        at 
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
        at 
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
        at 
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
        at 
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
        at 
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
        - locked <0xb8a0d5d0> (a 
org.apache.activemq.transport.InactivityMonitor$1)
        at 
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
        at 
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
        at 
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
        at java.lang.Thread.run(Thread.java:595)

"QueueThread:queue://TaskManagerQueue" daemon prio=10 tid=0x005e1bd8 
nid=0x42 runnable [0xaac7e000..0xaac7faf0]
        at oracle.jdbc.driver.T2CStatement.t2cDefineFetch(Native Method)
        at 
oracle.jdbc.driver.T2CPreparedStatement.doDefineFetch(T2CPreparedStatement.java:827)
        at 
oracle.jdbc.driver.T2CPreparedStatement.executeForRows(T2CPreparedStatement.java:768)
        at 
oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1062)
        at 
oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1132)
        at 
oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3285)
        at 
oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3329)
        - locked <0xe37a09a0> (a oracle.jdbc.driver.T2CPreparedStatement)
        - locked <0xe379c398> (a oracle.jdbc.driver.T2CConnection)
        at 
org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
        at 
org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
        at 
org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecoverNextMessages(DefaultJDBCAdapter.java:709)
        at 
org.apache.activemq.store.jdbc.JDBCMessageStore.recoverNextMessages(JDBCMessageStore.java:230)
        at 
org.apache.activemq.store.ProxyMessageStore.recoverNextMessages(ProxyMessageStore.java:83)
        at 
org.apache.activemq.broker.region.cursors.QueueStorePrefetch.doFillBatch(QueueStorePrefetch.java:75)
        at 
org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:227)
        - locked <0xb8a12f30> (a 
org.apache.activemq.broker.region.cursors.QueueStorePrefetch)
        at 
org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:100)
        at 
org.apache.activemq.broker.region.cursors.StoreQueueCursor.reset(StoreQueueCursor.java:157)
        - locked <0xb8a13648> (a 
org.apache.activemq.broker.region.cursors.StoreQueueCursor)
        at org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1179)
        - locked <0xb8a13648> (a 
org.apache.activemq.broker.region.cursors.StoreQueueCursor)
        at 
org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:1308)
        at org.apache.activemq.broker.region.Queue.iterate(Queue.java:1011)
        - locked <0xb8a13748> (a org.apache.activemq.broker.region.Queue$2)
        at 
org.apache.activemq.thread.DeterministicTaskRunner.runTask(DeterministicTaskRunner.java:84)
        at 
org.apache.activemq.thread.DeterministicTaskRunner$1.run(DeterministicTaskRunner.java:41)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
        at java.lang.Thread.run(Thread.java:595)
 

We waited for 9 or 10 hours without apparent changes: Only one consumer 
got, the other threads waiting, and no message consumed.
In the production environment, the only way to recover the situation we 
found, was to move message from ACTIVEMQ_MSGS to another table, start 
the broker with ACTIVEMQ_MSGS empty, and create an application to 
deserialize messages from the table and inject them to the queue using 
the JMS API.

We consider that over 10 hours of service unavailability is a serious 
problem. The problem seems to be related with the broker trying to get 
all the messages from the table at once whenever a consumer for a given 
destination is created. Is this a known problem? Is there any way to 
improve the situation?

Best regards.


--
Manuel.


Mime
View raw message