activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Norbert Pfistner <npfist...@picturesafe.de>
Subject Re: Q Consumers stop receiving messages
Date Mon, 04 May 2009 08:37:07 GMT
Hi,

we're experiencing the same problem.

We also use ActiveMQ 5.1 with jdbc persistant adapter and we use
and a queuePrefetch=1 on consumer side.

Our Broker waits at the same Unsafe.park as shown in jconsole:
Name: QueueThread:queue://NEW_FILE
State: WAITING
Total blocked: 120  Total waited: 15.149

Stack trace:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:118)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1841)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:359)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:470)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:674)
java.lang.Thread.run(Thread.java:595)


Also jconsole states a QueueSize=6 but browsing the queue via jconsoe 
shows no entries at all. Having a direct look at the DB the 6 messages 
exist in the activemq msg table.




shridharv schrieb:
> Hi,
> Was this issue figured out ?
> We have the same problem, wherein a consumer stops receiving further
> messages, as it has not committed after receiving around 1000 messages.  We
> cannot commit until we get a set of messages and hence the consume is
> waiting indefinitely.
> 
> Any suggestions welcome.
> AmQ 5.2 in Windows.
> -----------------------------------------------
> 
> 
> IBeaumont wrote:
>> I'm sending a mixture of persistent and non persistent messages.
>>
>> I have seen a negative queue count before so will look at the trunk.
>>
>> Can't see anything unusal in JConsole, but I have just downlaoded the
>> source and in the ActiveMQMessageConsumer.receiveNoWait there are these
>> lines...
>>         if (info.getPrefetchSize() == 0) {
>>             md = dequeue(-1); // We let the broker let us know when we
>>             // timeout.
>>         } else {
>>             md = dequeue(0);
>>         }
>>
>> So that explains why my consumes seem to hang when things go wrong -
>> because they wait forever because I have a prefetchSize of zero.
>>
>> I've now changed me code so that instead of using receiveNoWait, I'll use
>> "receive" with a short timeout.  Hopefully then things will timeout and
>> messages will contiue to be processed.  I'll then need to find out if all
>> any messages get skipped.   ALthough that doesn't help find the cause.
>>
>>
>>
>>
>>
>> Gary Tully wrote:
>>> Your use case sounds typical and reasonable but just to understand,
>>> are you sending persistent and non persistent messages to the same
>>> queue?
>>>
>>> Can you check in Jconsole, in the ActiveMQ BrokerView - Queues and
>>> Subscriptions:
>>> Do the queue metrics (size, dispatched) look as expected?
>>> A negative queue size is indicative of a issue that was resolved on
>>> trunk recently. If you see that you could try a snapshot.
>>>
>>> Re: prefetch==0. ok, yea, you need that if you want to depende on
>>> redelivered(). tThat is a known issue, prefetched messages that are
>>> not consumed are deemed redelivered when they are dispatched to the
>>> next consumer.
>>> ActiveMQ redelivered means delivered to a amq consumer, not delivered
>>> to the end user. see:
>>> https://issues.apache.org/activemq/browse/AMQ-1952
>>>
>>> 2009/1/13 IBeaumont <IBeaumont@categoric.com>:
>>>> Hi Gary,
>>>> Thanks for the feedback.
>>>> As you can see from my stack traces earlier in jconsole, the consumers
>>>> are
>>>> stuck on pulling messages from the queues, they are in:
>>>> org.apache.activemq.MessageDispatchChannel.dequeue
>>>>
>>>>
>>>> The reason I've ended up with the settings I have as so far they seem to
>>>> be
>>>> give me the desired results (apart from this last problem).  Maybe it
>>>> would
>>>> be easier to explain the behaviur I need and someone might be able to
>>>> advise
>>>> on the best settings...
>>>>
>>>> So most messages sent are persistent, and I need to process every
>>>> message -
>>>> I can't afford to drop any.  I may have slow "consumers", and I might be
>>>> putting messages on to the queues at a far greater rate than they can
>>>> get
>>>> processed (I want any available harddisk space to be used for message
>>>> storage).  There are a a number of consumers and producers, and a number
>>>> of
>>>> different queues.
>>>>
>>>>
>>>> The pre-fetch size was set to zero to overcome an issue I was having
>>>> with
>>>> redelivery count.  Our application uses the JMS redelivered to see if it
>>>> has
>>>> already tried processing the message.  Using a pre-fetch size bigger
>>>> than 0
>>>> was causing me some issues with this.  Out of interest, do you know what
>>>> happens to messages that are pre-fetched and then the consumer dies.  Do
>>>> they get flagged as redelivered when a new consumer picks them up?
>>>>
>>>>
>>>>
>>>>
>>>> Gary Tully wrote:
>>>>> A good place to start is with JCosole, to have a look at the queues
>>>>> and consumers to see how many messages are currently in the queues.
>>>>>
>>>>>> The broker is configured with producerFlowControl="false", using
TCP, a
>>>>>> pre-fetch size of 0 and I've also set sendFailIfNoSpace="true".
>>>>>>
>>>>> The sendFailIfNoSpace is disabled if producerFlowControl==false, so
>>>>> your producers will hang if memory for the queues are exhausted.
>>>>> sendFailIfNoSpace is considered a flowControl mechanism that does not
>>>>> block so it needs producerFlowControl to be enabled (the default) and
>>>>> no producer window to be set (also the default).
>>>>>
>>>>> Just a thought, but is it possible that the consumers are blocking
>>>>> when trying to "stick messages" back on a queue?
>>>>>
>>>>> Out of interest, why are you using prefetch==0?
>>>>>
>>>>> 2009/1/13 IBeaumont <IBeaumont@categoric.com>:
>>>>>> I've got a fairly complex app, that takes msgs, processing them and
>>>>>> sticks
>>>>>> them on the same or different queues.
>>>>>>
>>>>>> The queues are pre-loaded with persistent messages before the
>>>>>> application
>>>>>> starts (50000).  Once it starts processing things work fine for a
>>>>>> while
>>>>>> and
>>>>>> then the consumers stop receiving any messages.  I have a 4 consumers
>>>>>> for
>>>>>> each queue, and there are 3 different queues.
>>>>>>
>>>>>> Looking at jconsole all consumes are waiting here:
>>>>>>
>>>>>> State: WAITING on java.lang.Object@1801d4a
>>>>>> Total blocked: 20  Total waited: 158
>>>>>>
>>>>>> Stack trace:
>>>>>> java.lang.Object.wait(Native Method)
>>>>>> java.lang.Object.wait(Object.java:485)
>>>>>> org.apache.activemq.MessageDispatchChannel.dequeue(MessageDispatchChannel.java:75)
>>>>>> org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:412)
>>>>>> org.apache.activemq.ActiveMQMessageConsumer.receiveNoWait(ActiveMQMessageConsumer.java:560)
>>>>>> com.xalert.server.queuing.SessionManager.getAlert(SessionManager.java:236)
>>>>>>
>>>>>>
>>>>>> If I look at the broker (and I'm not really sure what to look at
>>>>>> here),
>>>>>> the
>>>>>> thread for my queue that should be dispatching messages looks like
>>>>>> this
>>>>>> Name: QueueThread:queue://csPIQ
>>>>>> State: WAITING on
>>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@b0a518
>>>>>> Total blocked: 2,365  Total waited: 6,717
>>>>>>
>>>>>> Stack trace:
>>>>>> sun.misc.Unsafe.park(Native Method)
>>>>>> java.util.concurrent.locks.LockSupport.park(Unknown Source)
>>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
>>>>>> Source)
>>>>>> java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
>>>>>> java.util.concurrent.ThreadPoolExecutor.getTask(Unknown Source)
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>>>>>> java.lang.Thread.run(Unknown Source)
>>>>>>
>>>>>> The broker is configured with producerFlowControl="false", using
TCP,
>>>>>> a
>>>>>> pre-fetch size of 0 and I've also set sendFailIfNoSpace="true".
>>>>>>
>>>>>> Any ideas on what my problem is or how/where I look in ActiveMQ to
>>>>>> find
>>>>>> the
>>>>>> cause, or if the problem is with the consumer.
>>>>>>
>>>>>> TIA
>>>>>> Ian
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://www.nabble.com/Q-Consumers-stop-receiving-messages-tp21438163p21438163.html
>>>>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> http://blog.garytully.com
>>>>>
>>>>> Open Source SOA
>>>>> http://FUSESource.com
>>>>>
>>>>>
>>>> --
>>>> View this message in context:
>>>> http://www.nabble.com/Q-Consumers-stop-receiving-messages-tp21438163p21440240.html
>>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>>
>>>>
>>>
>>>
>>> -- 
>>> http://blog.garytully.com
>>>
>>> Open Source SOA
>>> http://FUSESource.com
>>>
>>>
>>
> 

-- 

Dipl.-Ing. Norbert Pfistner
Softwareentwicklung

picturesafe GmbH
Simon-von-Utrecht-Straße 31-37
D-20359 Hamburg
http://www.picturesafe.de

fon: +49 40 374127 901
fax: +49 40 374127 999
npfistner@picturesafe.de

Sitz der Gesellschaft: Hannover
Geschäftsführer: Herbert Wirth
HR: Amtsgericht Hannover HR B 53 366

Mime
View raw message