activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Tully <gary.tu...@gmail.com>
Subject Re: Producer blocks when trying to send async JMS messages to failed broker
Date Tue, 17 Apr 2012 11:33:42 GMT
yes, if the failure to connect/reconnect bubbles up to the
application, it will need to initiate a new connection.

fire and forget in jms terms is still mediated by the broker, and
finding a broker is currently a synchronous operation. A jms
connection that ignores the fact that it does not have a broker at the
other end is a little disrespectful of the jms contracts :-)

You have two options, both of which involve mediating access to the
target broker.

You could write a wrapper that maintains a jms connection in a
separate thread (so it does not care about a blocking connect) and
only delegates when the connection is active, discarding messages
otherwise. This sort of fire and forget wrapper would be a nice
enhancement. It could be a DiscardingProducerWrapper that uses the
org.apache.activemq.transport.TransportListener#transportResumed
callback to determine the state of the connection.

You could introduce an embedded broker in the client that could
forward messages to the target broker via a network connector. It
could expire pending messages when the network connector is down.


On 17 April 2012 11:41, Brook, James <jbrook@upcbroadband.com> wrote:
> Thanks Gary. I am still not completely sure I get it. I am using failover because I *do*
want my client to reconnect when the broker comes back up. However, when it's down I don't
want to queue messages or block anything - they can be thrown away. My messages really are
'fire and forget'. These lose their meaning if not delivered to their ultimate destination
within 2 seconds or so.
>
> My expectation was that I would be able to asynchronously send messages and never block.
I have tried using non-persistent messages, and telling the transport not to track messages.
>
> If I do what you suggest won't the failover transport wait 10ms, try to reconnect once
and then give up forever? Wouldn't that mean someone has to take manual action to reestablish
the connection?
>
> Sorry if I am missing the point.
>
> On Apr 17, 2012, at 12:32 PM, Gary Tully wrote:
>
>> configure the failover maxReconnectAttempts and the failure to connect
>> will bubble up to the application where you can ignore it.
>> The assumption of failover is that you want to 'transparently'
>> maintain the connection and block pending reconnect
>>
>> failover:(...)? maxReconnectAttempts=1
>>
>> http://activemq.apache.org/failover-transport-reference.html
>>
>> On 16 April 2012 19:52, Brook, James <jbrook@upcbroadband.com> wrote:
>>> I am trying to make sure my application is resilient. It sends JMS messages to
a remote ActiveMQ broker. Unfortunately when that broker goes down the client starts to block
even though I am using the Camel asyncSendBody method. See below for the stack trace.
>>>
>>> The ActiveMQ version is 5.3.0 although I have the same problem with the most
recent stable release.
>>>
>>> This is the argument I give to the asyncSendBody method:
>>>
>>>    jms:queue:myQueue?jmsMessageType=Text&timeToLive=5000
>>>
>>> My connection factory uses this URL:
>>>
>>>    failover:(tcp://my.ip:61613,tcp://my:ip:61613)?useExponentialBackOff=true&randomize=false&timeout=1000
>>>
>>> As the broker starts to fail I see lots of the following errors in the log:
>>>
>>>    org.apache.camel.processor.UnitOfWorkProcessor  - Caught unhandled exception
while processing ExchangeId: ID-james-upcit-ds-upc-biz-64404-1334601186430-0-38
>>> IOException: Failover timeout of 1000 ms reached
>>>
>>>   org.apache.activemq.transport.failover.FailoverTransport  - Failover timed
out after 29998ms
>>>
>>> Then after attempting to send quite a few more messages the blocking starts.
I can't go live with my application in this state and I really don't want to put another queue
in place to decouple the JMS producer. Is there something I am doing wrong? If the broker
is down I am quite happy for these messages to be lost - they lose their meaning if not delivered
immediately.
>>>
>>> Thanks
>>> James
>>>
>>> at java.lang.Object.wait(Native Method)
>>>        at org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:433)
>>>        at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
>>>        at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:74)
>>>        at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:79)
>>>        at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1244)
>>>        at org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1350)
>>>        at org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:300)
>>>        at org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:196)
>>>        at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:457)
>>>        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:185)
>>>        at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:359)
>>>        at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:313)
>>>        at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:111)
>>>        at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
>>>        at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
>>>        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
>>>        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)
>>>        at org.apache.camel.processor.UnitOfWorkProducer.process(UnitOfWorkProducer.java:63)
>>>        at org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:352)
>>>        at org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:324)
>>>        at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:223)
>>>        at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:324)
>>>        at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:169)
>>>        at org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:111)
>>>        at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:124)
>>>        at org.apache.camel.impl.DefaultProducerTemplate$14.call(DefaultProducerTemplate.java:596)
>>>        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>        at java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy.rejectedExecution(ThreadPoolExecutor.java:1746)
>>>        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767)
>>>        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658)
>>>        at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:92)
>>>        at org.apache.camel.impl.DefaultProducerTemplate.asyncSendBody(DefaultProducerTemplate.java:601)
>>>        at org.apache.camel.impl.DefaultProducerTemplate.asyncSendBody(DefaultProducerTemplate.java:458)
>>
>>
>>
>> --
>> http://fusesource.com
>> http://blog.garytully.com
>



-- 
http://fusesource.com
http://blog.garytully.com

Mime
View raw message