activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pavel <pag...@gmail.com>
Subject Re: ActiveMQ reconnect issue with consumer.prefetchSize=0
Date Wed, 17 Feb 2010 10:42:28 GMT
Submitted https://issues.apache.org/activemq/browse/AMQ-2612 & attached test
case.

Thanks,
Pavel

On Tue, Feb 16, 2010 at 4:04 PM, Pavel <pagrus@gmail.com> wrote:

> Hi Gary,
>
> prefetch=1 alone would not work for my case;
>
> We build CXF web services with JMS transport on top of ActiveMQ.
>
> And syncronous cyclic "(client)->A->B->A" calls sometimes lead to deadlock:
> "B->A" message gets into prefetch buffer of the very same consumer that
> currently handles "(client)->A" message.
> We have load tests that highlight this issue. And polling consumer was a
> workaround until we figured out the reconnect implications.
>
> I'll try to rework my example into unit test and then file a jira as you
> suggested.
>
> Thanks,
> Pavel
>
>
> On Mon, Feb 15, 2010 at 10:13 PM, Gary Tully <gary.tully@gmail.com> wrote:
>
>> Have not looked into the test case yet, but when prefetch=0, a consumer is
>> polling for messages. The broker will not actively dispatch messages to
>> that
>> consumer. On each call to receive, the  consumer sends a pull command to
>> the
>> broker and then waits for a message dispatch. If failover occurs between
>> the
>> send of the pull and before the dispatch, the receive will remain blocked
>> as
>> the pull command will be lost and not replayed.
>>
>> One solution is to have the state tracker track message pull commands so
>> that they can be replayed. Afaik that is not there at the moment. Can you
>> raise an enhancement request for that?
>>
>> For your use case though, is prefetch=1 an option?
>>
>> On 15 February 2010 17:35, Pavel <pagrus@gmail.com> wrote:
>>
>> > Here is another example, this time activemq + Spring; no CXF at all.
>> >
>> > Any ideas?
>> >
>> > Thanks,
>> > Pavel
>> >
>> >
>> > On Sat, Feb 13, 2010 at 3:04 PM, Pavel <pagrus@gmail.com> wrote:
>> >
>> >> Hi,
>> >>
>> >> I'm hitting an issue with activeMQ reconnect.
>> >>
>> >> We are using CXF and SOAP over JMS with ActiveMQ. Also, at some point
>> we
>> >> started using consumer.prefetchSize=0. This is to prevent deadlocks
>> when
>> >> doing cyclic synchronous webservice calls, like
>> A.foo()->B.bar()->A.baz().
>> >>
>> >> So, attached is a small example, echo-like webservice called over JMS
>> >> every 2 seconds.
>> >> ActiveMQ 5.3 or Fuse 5.3.0.5.
>> >>
>> >> activemq.broker.url=failover:(tcp://localhost:61616)
>> >>
>> jms.destinationQueueName=SAMPLE___SERVICE___QUEUE?consumer.prefetchSize=0
>> >>
>> >>
>> >> All works well, until AMQ restart. Once restart happens, client seems
>> to
>> >> reconnect (I can see consumer in a web console), but starts timing out
>> [1].
>> >> AMQ logs contain several messages like this:
>> >> DEBUG | SAMPLE___SERVICE___QUEUE toPageIn: 1, Inflight: 0,
>> >> pagedInMessages.size 5
>> >>
>> >> at some point this follows by a bunch of
>> >> DEBUG | Message expired Message
>> >> ID:EPBYMINW0436-4688-1266064267343-0:0:22:1:1 dropped=false acked=false
>> >> locked=false
>> >>
>> >>
>> >> After application restart things are back to normal - until the next
>> >> reconnect.
>> >>
>> >> If I simply start consumers with AMQ down, then start AMQ - connect
>> works
>> >> fine and messages start flowing. So it looks like the issue is with
>> >> reconnect only.
>> >>
>> >> Am I hitting some known issue? Are there solutions/workarounds for
>> that?
>> >>
>> >>
>> >> [1] Client stacktrace.
>> >> Caused by: java.lang.RuntimeException: Timeout receiving message with
>> >> correlationId e10ec6062aec42d899e3c36d1171c6530000000000000032
>> >>         at
>> >>
>> org.apache.cxf.transport.jms.JMSConduit.sendExchange(JMSConduit.java:206)
>> >>         at
>> >>
>> org.apache.cxf.transport.jms.JMSOutputStream.doClose(JMSOutputStream.java:56)
>> >>         at
>> >> org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:185)
>> >>         at
>> >>
>> org.apache.cxf.io.CacheAndWriteOutputStream.postClose(CacheAndWriteOutputStream.java:47)
>> >>         at
>> >> org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:188)
>> >>         at
>> >> org.apache.cxf.transport.AbstractConduit.close(AbstractConduit.java:66)
>> >>         at
>> >>
>> org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor.handleMessage(MessageSenderInterceptor.java:62)
>> >>         at
>> >>
>> org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:236)
>> >>         at
>> org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:472)
>> >>         at
>> org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:302)
>> >>         at
>> org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:254)
>> >>         at
>> >> org.apache.cxf.frontend.ClientProxy.invokeSync(ClientProxy.java:73)
>> >>         at
>> >> org.apache.cxf.jaxws.JaxWsClientProxy.invoke(JaxWsClientProxy.java:123)
>> >>
>> >>
>> >> Thanks,
>> >> Pavel
>> >
>> >
>> >
>> >
>>
>>
>> --
>> http://blog.garytully.com
>>
>> Open Source Integration
>> http://fusesource.com
>>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message