activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Tully <gary.tu...@gmail.com>
Subject Re: Broker get stuck with high amount of persistent messages for a given destination
Date Thu, 22 Oct 2009 10:33:53 GMT
I wonder if there is also clean up thread (ActiveMQ Cleanup Timer) in the
mix, not sure about 4.2 but on trunk that is currently a fixed periodic task
rather than a fixed delay task and it could end up hogging the jdbc
connection.

2009/10/16 Manuel Teira <mteira@tid.es>

> Hello.
>
> We are embedding an activemq 4.2 broker in our application, running on a
> Sun JVM 5, and using an Oracle 10g database as persistent messaging store
> (without journal, since we are using a JDBC master-slave cluster).
> In a pair of occasions, we were unable to get the broker started in our
> production environment. Analyzing the situation, it seems that the cause was
> the great amount of persistent messages for a given queue (over 75.000),
> what was blocking the attempt to create consumers on that destination.
>
> Trying to reproduce the problem with a more up to date activemq version, we
> set up a 5.2.0 standalone broker to resemble our scenario, and a client that
> tries to create a set of consumers on the loaded queue. We exported the
> table with messages from our production environment and filled the test
> scenario database with it. I'm attaching both the activemq configuration and
> the client used for testing.
>
> What we found was:
>
> The broker started normally. Since no attempt to consume messages from the
> "loaded" destination was performed.
> Once the client started, we observed that:
> - We tried to start 10 consumers on the queue.  Only one of those consumers
> got the session.createConsumer(), others were blocked (note that all of them
> are sharing the same connection)
> - The only started consumer was unable to get any message, it was blocked
> in the getMessage() attempt.
> - The broker seems to be trying to load all the messages from the database,
> showing the following  stack dump (just the involved threads):
>
> "ActiveMQ Transport: tcp:///127.0.0.1:33554" daemon prio=4 tid=0x00ac2ef8
> nid=0x3c waiting on condition [0xaae7e000..0xaae7fbf0]
>       at sun.misc.Unsafe.park(Native Method)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:118)
>       at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:681)
>       at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:711)
>       at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1041)
>       at
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:184)
>       at
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:256)
>       at
> org.apache.activemq.broker.region.Queue.addSubscription(Queue.java:217)
>       at
> org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:275)
>       - locked <0xe35bbb40> (a java.lang.Object)
>       at
> org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:372)
>       at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>       at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>       at
> org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:83)
>       at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>       at
> org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:93)
>       at
> org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:541)
>       at
> org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:345)
>       at
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
>       at
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
>       at
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>       at
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
>       at
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
>       - locked <0xb8a0d5d0> (a
> org.apache.activemq.transport.InactivityMonitor$1)
>       at
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>       at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
>       at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
>       at java.lang.Thread.run(Thread.java:595)
>
> "QueueThread:queue://TaskManagerQueue" daemon prio=10 tid=0x005e1bd8
> nid=0x42 runnable [0xaac7e000..0xaac7faf0]
>       at oracle.jdbc.driver.T2CStatement.t2cDefineFetch(Native Method)
>       at
> oracle.jdbc.driver.T2CPreparedStatement.doDefineFetch(T2CPreparedStatement.java:827)
>       at
> oracle.jdbc.driver.T2CPreparedStatement.executeForRows(T2CPreparedStatement.java:768)
>       at
> oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1062)
>       at
> oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1132)
>       at
> oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3285)
>       at
> oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3329)
>       - locked <0xe37a09a0> (a oracle.jdbc.driver.T2CPreparedStatement)
>       - locked <0xe379c398> (a oracle.jdbc.driver.T2CConnection)
>       at
> org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
>       at
> org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
>       at
> org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecoverNextMessages(DefaultJDBCAdapter.java:709)
>       at
> org.apache.activemq.store.jdbc.JDBCMessageStore.recoverNextMessages(JDBCMessageStore.java:230)
>       at
> org.apache.activemq.store.ProxyMessageStore.recoverNextMessages(ProxyMessageStore.java:83)
>       at
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch.doFillBatch(QueueStorePrefetch.java:75)
>       at
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:227)
>       - locked <0xb8a12f30> (a
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch)
>       at
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:100)
>       at
> org.apache.activemq.broker.region.cursors.StoreQueueCursor.reset(StoreQueueCursor.java:157)
>       - locked <0xb8a13648> (a
> org.apache.activemq.broker.region.cursors.StoreQueueCursor)
>       at org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1179)
>       - locked <0xb8a13648> (a
> org.apache.activemq.broker.region.cursors.StoreQueueCursor)
>       at
> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:1308)
>       at org.apache.activemq.broker.region.Queue.iterate(Queue.java:1011)
>       - locked <0xb8a13748> (a org.apache.activemq.broker.region.Queue$2)
>       at
> org.apache.activemq.thread.DeterministicTaskRunner.runTask(DeterministicTaskRunner.java:84)
>       at
> org.apache.activemq.thread.DeterministicTaskRunner$1.run(DeterministicTaskRunner.java:41)
>       at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
>       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
>       at java.lang.Thread.run(Thread.java:595)
>
>
> We waited for 9 or 10 hours without apparent changes: Only one consumer
> got, the other threads waiting, and no message consumed.
> In the production environment, the only way to recover the situation we
> found, was to move message from ACTIVEMQ_MSGS to another table, start the
> broker with ACTIVEMQ_MSGS empty, and create an application to deserialize
> messages from the table and inject them to the queue using the JMS API.
>
> We consider that over 10 hours of service unavailability is a serious
> problem. The problem seems to be related with the broker trying to get all
> the messages from the table at once whenever a consumer for a given
> destination is created. Is this a known problem? Is there any way to improve
> the situation?
>
> Best regards.
>
>
> --
> Manuel.
>
>
> package es.tid.planb.test;
>
> import java.util.concurrent.CountDownLatch;
> import java.io.*;
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.MessageProducer;
> import javax.jms.MessageConsumer;
> import javax.jms.Session;
> import javax.jms.Queue;
> import javax.jms.TemporaryQueue;
> import javax.jms.TextMessage;
> import javax.jms.Message;
> import javax.naming.Context;
> import javax.naming.InitialContext;
> import javax.naming.NamingException;
> import java.util.Date;
> import java.util.Properties;
>
> public class MultithreadConsumer
> {
>    volatile static long consumedCount = 0;
>    static int recvTimeout = 2000;
>
>    public static void main(String args[]) throws Exception
>    {
>        if (args.length < 5) {
>            System.err.println("Usage: MultithreadConsumer "
>                               + "uri queue threads messages synchro");
>            System.exit(255);
>        }
>
>        Properties jndiProperties = new Properties();
>        jndiProperties.setProperty("java.naming.factory.initial",
>
> "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
>        jndiProperties.setProperty("java.naming.provider.url", args[0]);
>        jndiProperties.setProperty("queue." + args[1], args[1]);
>
>        Context jndiContext = new InitialContext(jndiProperties);
>        ConnectionFactory cfactory =
>            (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
>        final Destination queue = (Destination) jndiContext.lookup(args[1]);
>        final Connection conn = cfactory.createConnection();
>        final int consumerCount = Integer.parseInt(args[2]);
>        final int messages = Integer.parseInt(args[3]);
>        final boolean synchronizeConsumers =
>            "true".equals(args[4]) ? true : false;
>        final CountDownLatch consumersPrepared = new
> CountDownLatch(consumerCount);
>        final CountDownLatch consumersGo = new CountDownLatch(1);
>
>
>        System.err.println("Consumers: " + consumerCount
>                           + ", Messages: " + messages
>                           + ", Synchronized: " + synchronizeConsumers);
>
>        conn.start();
>        Thread consumers[] = new Thread[consumerCount];
>        for (int i = 0; i < consumerCount; i++) {
>            consumers[i] = new Thread(new Runnable() {
>                public void run() {
>                    Session session = null;
>                    Message msg = null;
>                    try {
>                        session = conn.createSession(true, 0);
>                        MessageConsumer consumer =
>                            session.createConsumer(queue);
>                        System.out.println("Consumer ready");
>                        if (synchronizeConsumers) {
>                            consumersPrepared.countDown();
>                            consumersGo.await();
>                        }
>                        while (consumedCount < messages) {
>                            msg = consumer.receive(recvTimeout);
>                            if (msg != null) {
>                                System.out.print("m");
>                                session.commit();
>                                ++consumedCount;
>                            } else {
>                                System.out.print("!");
>                            }
>                        }
>                        System.out.println("Consumer finishing");
>                    } catch (Exception e) {
>                        System.err.println("Exception in consumer");
>                        e.printStackTrace(System.err);
>                    }
>                    if (session != null) {
>                        try { session.close(); } catch (Exception e) {};
>                    }
>                }
>                }, "Consumer#" + i);
>        }
>        for (int i = 0; i < consumers.length; ++i) {
>            consumers[i].start();
>        }
>
>        if (synchronizeConsumers) {
>            System.out.println("Waiting for the consumers to get ready");
>            consumersPrepared.await();
>            System.out.println("All the consumers ready");
>            consumersGo.countDown();
>        }
>
>        Date before = new Date();
>
>        for (int i = 0; i < consumerCount; ++i) {
>            while (true) {
>                try {
>                    consumers[i].join();
>                    break;
>                } catch (InterruptedException ie) {}
>            }
>        }
>        Date after = new Date();
>        long elapsed = after.getTime() - before.getTime();
>        System.err.println("Consumers finished. Messages: "
>                           + consumedCount + ", " + elapsed + " ms
> elapsed");
>        System.err.println((consumedCount * 1000.0 / elapsed) + "
> messages/s");
>        conn.close();
>    }
> }
>
>
>


-- 
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message