activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexandre Normand <alexandre.norm...@opower.com>
Subject Intermittent problem: only a few message consumers receiving/processing messages
Date Mon, 25 Mar 2013 17:22:44 GMT
Hi, 
We've been having this issue 3 times now where we have only a few message consumers processing
messages. Since our usage pattern is a low-number of messages but with long processing times,
we effectively have contention because we aren't using more than a few message consumers to
process messages. Our max number of listener threads is 12 and each time this happened, we
ran thread dumps that showed us that only a few (and in one case, a single) consumers were
actually doing work. The same thread dumps showed that we had indeed a few other listener
threads waiting for messages. I'm assuming that because all of the listeners weren't actually
busy, this prevented additional listener threads to be created, making matters even worse.


The problem seems to be on the client side of ActiveMQ/Spring since the server does eventually
dispatch messages to the remaining "working" listeners. One thing important to note is that
we have prefetching disabled although the DefaultMessageListenerContainer's cache level is
still at the default value of CACHE_CONSUMER. Also, restarting the jetty instance that is
the client "fixes" the issue. 

Does anyone have any ideas as to what this could be? 

Here's an extract of the thead dump I referred to (in this one, only one thread, lowPriorityJobStarterContainer-3,
was effectively doing any work). The relevant thread group names are *JobStarterContainer:
2013-03-15 01:13:46 Full thread dump Java HotSpot(TM) 64-Bit Server VM (16.3-b01 mixed mode):
... "process reaper" daemon prio=10 tid=0x00007fca84002800 nid=0x4519 runnable [0x00007fcaf0248000]
java.lang.Thread.State: RUNNABLE at java.lang.UNIXProcess.waitForProcessExit(Native Method)
at java.lang.UNIXProcess.access$900(UNIXProcess.java:20) at java.lang.UNIXProcess$1$1.run(UNIXProcess.java:132)
"ActiveMQ Session Task-7001" prio=10 tid=0x00007fca88003800 nid=0x22a0 waiting on condition
[0x00007fcaf1b61000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native
Method) - parking to wait for <0x00007fcb6d36c740> (a java.util.concurrent.SynchronousQueue$TransferStack)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
at java.util.concurrent.SynchronousQueue.
poll(SynchronousQueue.java:874) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) at java.lang.Thread.run(Thread.java:619)
"ActiveMQ Connection Executor: tcp://broker-a.va.opower.it:61616" prio=10 tid=0x00007fca88007800
nid=0xc37 waiting on condition [0x00007fcaf1d63000] java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00007fcb6d36cab8> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecut
or.java:907) at java.lang.Thread.run(Thread.java:619) "ActiveMQ InactivityMonitor WriteCheckTimer"
daemon prio=10 tid=0x00007fca88001800 nid=0xc35 in Object.wait() [0x00007fca53af9000] java.lang.Thread.State:
TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.util.TimerThread.mainLoop(Timer.java:509)
- locked <0x00007fcb6de2d518> (a java.util.TaskQueue) at java.util.TimerThread.run(Timer.java:462)
"ActiveMQ InactivityMonitor ReadCheckTimer" daemon prio=10 tid=0x00007fca88005800 nid=0xc34
in Object.wait() [0x00007fca53bfa000] java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method) at java.util.TimerThread.mainLoop(Timer.java:509)
- locked <0x00007fcb6de2b5f8> (a java.util.TaskQueue) at java.util.TimerThread.run(Timer.java:462)
"ActiveMQ Transport: tcp://broker-a.va.opower.it/10.20.64.131:61616@52598" prio=10 tid=0x00007fcac8009000
nid=0xc2a runnable [0x00007fcaf0e54000] java.lang.Thread.State: RUNNABLE at java.ne
t.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:129)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:604) at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:589) at java.io.DataInputStream.readInt(DataInputStream.java:370)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:275) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
at java.lang.Thread.run(Thread.java:619) "ActiveMQConnection[ID:bwpr001.va.opower.it-43015-1362931933661-1:1]
Scheduler" daemon prio=10 tid=0x00007fca
c4001800 nid=0x79fb in Object.wait() [0x00007fcaf1c62000] java.lang.Thread.State: WAITING
(on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485)
at java.util.TimerThread.mainLoop(Timer.java:483) - locked <0x00007fcb6ddec2e8> (a java.util.TaskQueue)
at java.util.TimerThread.run(Timer.java:462) "defaultPriorityJobStarterContainer-5" prio=10
tid=0x00007fca7c001800 nid=0x22c5 in Object.wait() [0x00007fcaf0b51000] java.lang.Thread.State:
WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485)
at org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:87)
- locked <0x00007fcb6d5b0628> (a java.lang.Object) at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:470)
at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:592) at
org.springframework.jms.listener.AbstractPollingMessageLis
tenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:431) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:311)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:264)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1071)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1063)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:960)
at java.lang.Thread.run(Thread.java:619) "defaultPriorityJobStarterContainer-4" prio=10 tid=0x00007fcac8007800
nid=0x23b in Object.wait() [0x00007fcaf0d53000] java.lang.Threa
d.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485)
at org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:87)
- locked <0x00007fcb6d5b2628> (a java.lang.Object) at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:470)
at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:592) at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:431)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:311)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:264)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeLis
tener(DefaultMessageListenerContainer.java:1071) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1063)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:960)
at java.lang.Thread.run(Thread.java:619) "lowPriorityJobStarterContainer-4" prio=10 tid=0x00007fcaac005800
nid=0x6859 in Object.wait() [0x00007fcaf175d000] java.lang.Thread.State: WAITING (on object
monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485)
at org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:87)
- locked <0x00007fcb6d510d90> (a java.lang.Object) at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:470)
at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:592) at
org.springfra
mework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:431)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:311)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:264)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1071)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1063)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:960)
at java.lang.Thread.run(Thread.java:619) "lowPriorityJobStarterContainer-3" prio=10 tid=0x00007fcaac001000
nid=0x65f2 in Object.w
ait() [0x00007fcaf0f54000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native
Method) at java.lang.Object.wait(Object.java:485) at java.lang.UNIXProcess.waitFor(UNIXProcess.java:165)
- locked <0x00007fcb6de715d0> (a java.lang.UNIXProcess) at org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:347)
at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:160) at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:147)
at opower.batch.agent.JobStarter.executeCommandLineInDirectory(JobStarter.java:108) at opower.batch.agent.JobStarter.startJob(JobStarter.java:55)
at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597) at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:273)
at org.springframework.jms.listener.adapter.MessageListenerAdap
ter.invokeListenerMethod(MessageListenerAdapter.java:463) at org.springframework.jms.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:355)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:537)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:497)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:326)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:264)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1071)

at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1063)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:960)
at java.lang.Thread.run(Thread.java:619) "DestroyJavaVM" prio=10 tid=0x00007fcbb0007800 nid=0x3ce3
waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "pool-1-thread-1
- Acceptor0 Ajp13SocketConnector@0.0.0.0:8080" prio=10 tid=0x00007fcbb0921000 nid=0x3d1d runnable
[0x00007fcaf185e000] java.lang.Thread.State: RUNNABLE at java.net.PlainSocketImpl.socketAccept(Native
Method) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:390) - locked <0x00007fcb6d3889c8>
(a java.net.SocksSocketImpl) at java.net.ServerSocket.implAccept(ServerSocket.java:453) at
java.net.ServerSocket.accept(ServerSocket.java:421) at org.mortbay.jetty.bio.SocketConnector.accept(SocketConnector.j
ava:99) at org.mortbay.jetty.AbstractConnector$Acceptor.run(AbstractConnector.java:707) at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619) "highPriorityJobStarterContainer-1" prio=10 tid=0x00007fcbb022b000
nid=0x3d1b in Object.wait() [0x00007fcaf1a60000] java.lang.Thread.State: WAITING (on object
monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485)
at org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:87)
- locked <0x00007fcb6d38a7e0> (a java.lang.Object) at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:470)
at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:592) at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPolling
MessageListenerContainer.java:431) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:311)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:264)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1071)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1063)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:960)
at java.lang.Thread.run(Thread.java:619) "lowPriorityJobStarterContainer-1" prio=10 tid=0x00007fcbb0175800
nid=0x3d0f in Object.wait() [0x00007fcaf2066000] java.lang.Thread.State: WAITING (on object
monitor) at java.lan
g.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485) at org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:87)
- locked <0x00007fcb6d38cef8> (a java.lang.Object) at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:470)
at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:592) at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:431)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:311)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:264)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1071)
 at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1063)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:960)
at java.lang.Thread.run(Thread.java:619) "FileWatchdog" daemon prio=10 tid=0x00007fcbb0455000
nid=0x3d0a waiting on condition [0x00007fcb4c1b5000] java.lang.Thread.State: TIMED_WAITING
(sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.log4j.helpers.FileWatchdog.run(FileWatchdog.java:104)
"Timer-0" daemon prio=10 tid=0x00007fcbb0aa5800 nid=0x3d09 in Object.wait() [0x00007fcb4c2b6000]
java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native
Method) at java.util.TimerThread.mainLoop(Timer.java:509) - locked <0x00007fcb6d002268>
(a java.util.TaskQueue) at java.util.TimerThread.run(Timer.java:462) "Low Memory Detector"
daemon prio=10 tid=0x00007
fcbb00d2000 nid=0x3cfc runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE "CompilerThread1"
daemon prio=10 tid=0x00007fcbb00d0000 nid=0x3cfb waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE "CompilerThread0" daemon prio=10 tid=0x00007fcbb00cd000 nid=0x3cfa
waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "Signal Dispatcher"
daemon prio=10 tid=0x00007fcbb00cb000 nid=0x3cf9 runnable [0x0000000000000000] java.lang.Thread.State:
RUNNABLE "Finalizer" daemon prio=10 tid=0x00007fcbb00ab800 nid=0x3cf8 in Object.wait() [0x00007fcb4cffe000]
java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118) - locked <0x00007fcb6d016348>
(a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159) "Reference Handler" daemon
prio=10 tid=0x0
0007fcbb00a9800 nid=0x3cf7 in Object.wait() [0x00007fcbb4110000] java.lang.Thread.State: WAITING
(on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116) - locked <0x00007fcb6d0040d0>
(a java.lang.ref.Reference$Lock) "VM Thread" prio=10 tid=0x00007fcbb00a5800 nid=0x3cf6 runnable
"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007fcbb001a800 nid=0x3ce4 runnable "GC task
thread#1 (ParallelGC)" prio=10 tid=0x00007fcbb001c000 nid=0x3ce5 runnable "GC task thread#2
(ParallelGC)" prio=10 tid=0x00007fcbb001e000 nid=0x3ce6 runnable "GC task thread#3 (ParallelGC)"
prio=10 tid=0x00007fcbb0020000 nid=0x3ce7 runnable "GC task thread#4 (ParallelGC)" prio=10
tid=0x00007fcbb0021800 nid=0x3ce8 runnable "GC task thread#5 (ParallelGC)" prio=10 tid=0x00007fcbb0023800
nid=0x3ce9 runnable "GC task thread#6 (ParallelGC)" prio=10 tid=0x00007fcbb0025800 nid=0x3cea
runnable "GC task thread#7 (Paral
lelGC)" prio=10 tid=0x00007fcbb0027000 nid=0x3ceb runnable "GC task thread#8 (ParallelGC)"
prio=10 tid=0x00007fcbb0029000 nid=0x3cec runnable "GC task thread#9 (ParallelGC)" prio=10
tid=0x00007fcbb002b000 nid=0x3ced runnable "GC task thread#10 (ParallelGC)" prio=10 tid=0x00007fcbb002c800
nid=0x3cee runnable "GC task thread#11 (ParallelGC)" prio=10 tid=0x00007fcbb002e800 nid=0x3cef
runnable "GC task thread#12 (ParallelGC)" prio=10 tid=0x00007fcbb0030000 nid=0x3cf0 runnable
"GC task thread#13 (ParallelGC)" prio=10 tid=0x00007fcbb0032000 nid=0x3cf1 runnable "GC task
thread#14 (ParallelGC)" prio=10 tid=0x00007fcbb0034000 nid=0x3cf2 runnable "GC task thread#15
(ParallelGC)" prio=10 tid=0x00007fcbb0035800 nid=0x3cf3 runnable "GC task thread#16 (ParallelGC)"
prio=10 tid=0x00007fcbb0037800 nid=0x3cf4 runnable "GC task thread#17 (ParallelGC)" prio=10
tid=0x00007fcbb0039800 nid=0x3cf5 runnable "VM Periodic Task Thread" prio=10 tid=0x00007fcbb00d5000
nid=0x3cfd waiting on condition JNI global r
eferences: 700 


Thanks, 

-- 
Alexandre Normand


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message