Return-Path: X-Original-To: apmail-activemq-users-archive@www.apache.org Delivered-To: apmail-activemq-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EA7128BA6 for ; Tue, 16 Aug 2011 16:47:40 +0000 (UTC) Received: (qmail 69737 invoked by uid 500); 16 Aug 2011 16:47:40 -0000 Delivered-To: apmail-activemq-users-archive@activemq.apache.org Received: (qmail 69604 invoked by uid 500); 16 Aug 2011 16:47:39 -0000 Mailing-List: contact users-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@activemq.apache.org Delivered-To: mailing list users@activemq.apache.org Received: (qmail 69595 invoked by uid 99); 16 Aug 2011 16:47:39 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Aug 2011 16:47:39 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=FREEMAIL_FROM,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS,T_TO_NO_BRKTS_FREEMAIL X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of herve.barrault@gmail.com designates 209.85.210.175 as permitted sender) Received: from [209.85.210.175] (HELO mail-iy0-f175.google.com) (209.85.210.175) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Aug 2011 16:47:33 +0000 Received: by iyn15 with SMTP id 15so151751iyn.34 for ; Tue, 16 Aug 2011 09:47:12 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=4nrfMYGkTp0aIJwV4pU+6YpKdu+8CRSu4ukWVYYKoic=; b=RCL/pTCmlI7KiIcqaBjzMKuoNvK2R9U1PNhwHxE6/oTWEluhEUnGym6C6HwLtHfYp6 3JzVQTECAGnd1FLJ316QToQ2MYnVbWxv5zXvhLRI9aLiIQccip3olvK+c8Ho+E317SyD T/wv9zCf5pcRj1sewakrLC5O52NMAK/JSf3oA= MIME-Version: 1.0 Received: by 10.42.39.210 with SMTP id i18mr5688887ice.53.1313513232036; Tue, 16 Aug 2011 09:47:12 -0700 (PDT) Received: by 10.42.241.5 with HTTP; Tue, 16 Aug 2011 09:47:11 -0700 (PDT) In-Reply-To: References: Date: Tue, 16 Aug 2011 18:47:11 +0200 Message-ID: Subject: Re: ActiveMQ performances and JDBC persistence From: =?ISO-8859-1?Q?Herv=E9_BARRAULT?= To: users@activemq.apache.org Content-Type: multipart/alternative; boundary=90e6ba6e855e9a739604aaa22172 --90e6ba6e855e9a739604aaa22172 Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable Hi, I have the following configuration : 2 producers send message in a queue using camel producer template. I have one consumer which finally send 2 messages in 2 different queues. (using camel route) and for those queues i have for each 1 consumer. Globally, It works with 3 queues. I am ActiveMQ 5.4.0 using vm transport and jdbc persistence. (flow control is disabled in this case). When trying some performance tests : When the 2 producers are working it seems ok (bad performances but no contention). When the producers stop (i have a lot of enqueued messages), I see thread contention for VMTransport threads. I see 5 live threads (these threads are changing). Stacks at 04:36:09 PM (uptime 1:33:37) VMTransport [RUNNABLE, IN_NATIVE] CPU time: 0:01 java.net.SocketInputStream.socketRead0(FileDescriptor, byte[], int, int, int) java.net.SocketInputStream.read(byte[], int, int) org.postgresql.core.VisibleBufferedInputStream.readMore(int) org.postgresql.core.VisibleBufferedInputStream.ensureBytes(int) org.postgresql.core.VisibleBufferedInputStream.read() org.postgresql.core.PGStream.ReceiveChar() org.postgresql.core.v3.QueryExecutorImpl.processResults(ResultHandler, int) org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList, ResultHandler, int, int, int) org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(Quer= y) org.postgresql.jdbc2.AbstractJdbc2Connection.commit() org.apache.commons.dbcp.DelegatingConnection.commit() org.apache.commons.dbcp.PoolingDataSource$PoolGuardConnectionWrapper.commit= () org.apache.activemq.store.jdbc.TransactionContext.commit() org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.commitTransaction(Con= nectionContext) org.apache.activemq.store.memory.MemoryTransactionStore$Tx.commit() org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionI= d, boolean, Runnable, Runnable) org.apache.activemq.transaction.LocalTransaction.commit(boolean) org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionCo= ntext, TransactionId, boolean) org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(Connection= Context, TransactionId, boolean) org.apache.activemq.broker.TransportConnection.processCommitTransactionOneP= hase(TransactionInfo) org.apache.activemq.command.TransactionInfo.visit(CommandVisitor) org.apache.activemq.broker.TransportConnection.service(Command) org.apache.activemq.broker.TransportConnection$1.onCommand(Object) org.apache.activemq.transport.ResponseCorrelator.onCommand(Object) org.apache.activemq.transport.TransportFilter.onCommand(Object) org.apache.activemq.transport.vm.VMTransport.iterate() org.apache.activemq.thread.PooledTaskRunner.runTask() org.apache.activemq.thread.PooledTaskRunner$1.run() java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable) java.util.concurrent.ThreadPoolExecutor$Worker.run() java.lang.Thread.run() VMTransport [BLOCKED] CPU time: 0:18 org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionI= d, boolean, Runnable, Runnable) org.apache.activemq.transaction.LocalTransaction.commit(boolean) org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionCo= ntext, TransactionId, boolean) org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(Connection= Context, TransactionId, boolean) org.apache.activemq.broker.TransportConnection.processCommitTransactionOneP= hase(TransactionInfo) org.apache.activemq.command.TransactionInfo.visit(CommandVisitor) org.apache.activemq.broker.TransportConnection.service(Command) org.apache.activemq.broker.TransportConnection$1.onCommand(Object) org.apache.activemq.transport.ResponseCorrelator.onCommand(Object) org.apache.activemq.transport.TransportFilter.onCommand(Object) org.apache.activemq.transport.vm.VMTransport.iterate() org.apache.activemq.thread.PooledTaskRunner.runTask() org.apache.activemq.thread.PooledTaskRunner$1.run() java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable) java.util.concurrent.ThreadPoolExecutor$Worker.run() java.lang.Thread.run() VMTransport [BLOCKED] CPU time: 0:01 org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionI= d, boolean, Runnable, Runnable) org.apache.activemq.transaction.LocalTransaction.commit(boolean) org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionCo= ntext, TransactionId, boolean) org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(Connection= Context, TransactionId, boolean) org.apache.activemq.broker.TransportConnection.processCommitTransactionOneP= hase(TransactionInfo) org.apache.activemq.command.TransactionInfo.visit(CommandVisitor) org.apache.activemq.broker.TransportConnection.service(Command) org.apache.activemq.broker.TransportConnection$1.onCommand(Object) org.apache.activemq.transport.ResponseCorrelator.onCommand(Object) org.apache.activemq.transport.TransportFilter.onCommand(Object) org.apache.activemq.transport.vm.VMTransport.iterate() org.apache.activemq.thread.PooledTaskRunner.runTask() org.apache.activemq.thread.PooledTaskRunner$1.run() java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable) java.util.concurrent.ThreadPoolExecutor$Worker.run() java.lang.Thread.run() VMTransport [WAITING] CPU time: 0:09 sun.misc.Unsafe.park(boolean, long) java.util.concurrent.locks.LockSupport.parkNanos(Object, long) java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(Synchronou= sQueue$TransferStack$SNode, boolean, long) java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object, boolean, long) java.util.concurrent.SynchronousQueue.poll(long, TimeUnit) java.util.concurrent.ThreadPoolExecutor.getTask() java.util.concurrent.ThreadPoolExecutor$Worker.run() java.lang.Thread.run() VMTransport [WAITING] CPU time: 0:01 sun.misc.Unsafe.park(boolean, long) java.util.concurrent.locks.LockSupport.parkNanos(Object, long) java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(Synchronou= sQueue$TransferStack$SNode, boolean, long) java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object, boolean, long) java.util.concurrent.SynchronousQueue.poll(long, TimeUnit) java.util.concurrent.ThreadPoolExecutor.getTask() java.util.concurrent.ThreadPoolExecutor$Worker.run() java.lang.Thread.run() I have some difficulties to understand why i have 2 BLOCKED threads and 1 Running in this case. Thanks for help. 2011/8/10 Herv=E9 BARRAULT > Hi, i have done another experiment : > I have tried something to see the influence of messages on ActiveMQ > behavior. > > I try to send 20 messages per seconds during 20 seconds into the broker a= nd > use a processor to dequeue the messages. > If there is no other messages, it is ok > If i create a false queue with 16k messages it is always working > If i create a false queue with 24k messages it takes about 25 seconds to = do > all the job. > If i create a false queue with 32k messages it takes about 55 seconds to = do > all the job. > > Is there something to do to avoid that a queue has an influence on other > queues? > Regards > > > > 2011/8/1 Herv=E9 BARRAULT > >> Hi, >> I have looked to the purge mechanism to try to understand why it takes s= o >> much time to clean a queue. >> >> I have noticed something : >> >> #### >> CLASS org.apache.activemq.store.jdbc.Statements >> public String getFindMessageSequenceIdStatement() { >> if (findMessageSequenceIdStatement =3D=3D null) { >> findMessageSequenceIdStatement =3D "SELECT ID, PRIORITY FROM " + >> getFullMessageTableName() >> + " WHERE MSGID_PROD=3D? AND MSGID_SEQ=3D? AND CONTAINER=3D?= "; >> } >> return findMessageSequenceIdStatement; >> } >> >> public String getRemoveMessageStatement() { >> if (removeMessageStatement =3D=3D null) { >> removeMessageStatement =3D "DELETE FROM " + >> getFullMessageTableName() + " WHERE ID=3D?"; >> } >> return removeMessageStatement; >> } >> ### >> >> ### >> CLASS org.apache.activemq.store.jdbc.JDBCMessageStore >> >> private long getStoreSequenceIdForMessageId(MessageId messageId) throws >> IOException { >> long result =3D -1; >> TransactionContext c =3D persistenceAdapter.getTransactionContex= t(); >> try { >> result =3D adapter.getStoreSequenceId(c, destination, >> messageId)[0]; >> } catch (SQLException e) { >> JDBCPersistenceAdapter.log("JDBC Failure: ", e); >> throw IOExceptionSupport.create("Failed to get store >> sequenceId for messageId: " + messageId +", on: " + destination + ". Rea= son: >> " + e, e); >> } finally { >> c.close(); >> } >> return result; >> } >> >> public void removeMessage(ConnectionContext context, MessageAck ack) >> throws IOException { >> >> long seq =3D getStoreSequenceIdForMessageId(ack.getLastMessageId= ()); >> >> // Get a connection and remove the message from the DB >> TransactionContext c =3D >> persistenceAdapter.getTransactionContext(context); >> try { >> adapter.doRemoveMessage(c, seq); >> } catch (SQLException e) { >> JDBCPersistenceAdapter.log("JDBC Failure: ", e); >> throw IOExceptionSupport.create("Failed to broker message: "= + >> ack.getLastMessageId() + " in container: " + e, e); >> } finally { >> c.close(); >> } >> } >> ### >> >> Could it be better to use only one request to remove a row, which is : >> "DELETE FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=3D? AND >> MSGID_SEQ=3D? AND CONTAINER=3D?" ? >> or is there a case where it is not working ? >> >> Could it be better to create an index based on MSGID_PROD, MSGID_SEQ and= CONTAINER >> (this index could be UNIQUE ?) ? >> >> I have checked that it takes 1min30 to dequeue my 16000 messages using t= he >> purge method with these two modifications. I noticed that the page size = is >> 200 messages and it waits every 200 row to read from the database to be = able >> to purge it, i never noticed this limit before. >> >> So it increases the rate from about 40 msgs/sec to about 170 msgs/sec to >> purge a queue (does it can also apply to other components reading the qu= eue >> ?). >> >> I don't have enough JMS broker and database mechanism knowledges to say = if >> it is a good or a bad idea. >> >> Anyone can help me for this ? >> >> Thanks for answers. >> Herv=E9 >> >> >> 2011/7/29 Herv=E9 BARRAULT >> >>> Hi, >>> I am using ActiveMQ 5.4.0 with persistence (using an oracle 11g R2 >>> server) and i am doing some performance tests. >>> >>> I'm sending 16000 messages through web services (using one port) and it >>> takes about 1 min to manage all messages and fill the JMS queue. >>> >>> When i use a consumer to dequeue this queue (without adding new message= s) >>> and fill another one, it takes about 6 min and 30 seconds. I was expect= ing >>> that i have some code which slow down the consumption. >>> >>> But i have tried to use the purge method and it takes the same time. >>> >>> Is there a way to increase the consumption rate ? >>> >>> Thanks for answers. >>> >> >> > --90e6ba6e855e9a739604aaa22172--