camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leen Toelen <toe...@gmail.com>
Subject Re: Reconnect topic subscriber after connection failure
Date Thu, 25 Mar 2010 11:22:37 GMT
Hi,

I am performing some tests, and noticed that when I create the route in xml
(with the subscriptionDurable=true parameter), the subscribers come online
after restart of activeMQ. When I use the fluent api the subscriber goes
offline.


This code goes offline when ActiveMQ restarts
public void configure() throws Exception {
ActiveMQConnectionFactory connectionFactory =
lookup(ActiveMQConnectionFactory.class);
JmsTransactionManager jmsTransactionManager =
lookup(JmsTransactionManager.class);

JmsEndpoint topic = (JmsEndpoint)
endpoint("activemq:topic:MyTopic?ClientId=Id&DurableSubscriptionName=Name&subscriptionDurable=true");
topic.setRecoveryInterval(5000);// 5 seconds
topic.setMaxConcurrentConsumers(1);
topic.setConnectionFactory(connectionFactory);
topic.setTransactionManager(jmsTransactionManager);

JmsEndpoint toStore = (JmsEndpoint) endpoint("activemq:queue:ToStore");
toStore .setConnectionFactory(connectionFactory);
toStore .setTransactionManager(jmsTransactionManager);

errorHandler(loggingErrorHandler("com.meucci.camel.assemblytopic")
.level(LoggingLevel.WARN));

from(topic).transacted("PROPAGATION_REQUIRED").to(toStore );
}

While this survives a restart

<route id="AssemblyTopicToAssemblyStoreQueue">
<from
uri="activemq:topic:Topic?clientId=ID&amp;durableSubscriptionName=Name&amp;subscriptionDurable=true"
/>
<transacted ref="PROPAGATION_REQUIRED" />
<to uri="activemq:queue:ToStore"></to>
</route>

Leen

On Thu, Mar 25, 2010 at 11:41 AM, Charles Moulliard <cmoulliard@gmail.com>wrote:

> I think that you must define the subscription as durable like that :
>
>
> from(activemq:queue:example1?durableSubscriptionName=TopicReader1&subscriptionDurable=true").to("....");
>
>
> Charles Moulliard
> Senior Enterprise Architect
> Apache Camel Committer
>
> *****************************
> blog : http://cmoulliard.blogspot.com
> twitter : http://twitter.com/cmoulliard
> Linkedlin : http://www.linkedin.com/in/charlesmoulliard
>
> Apache Camel Group :
> http://www.linkedin.com/groups?home=&gid=2447439&trk=anet_ug_hm
>
>
> On Thu, Mar 25, 2010 at 11:29 AM, Leen Toelen <toelen@gmail.com> wrote:
>
> > Hi Charles,
> >
> > the behavior might be correct but not what I expected :-)
> > No really, is there a way to configure spring/camel to restart the topic
> > subscriber when the connection is refreshed? I would like to  have
> messages
> > forwarded all the time, even after restarts of the ActiveMQ broker or
> > network problems.
> >
> > Regards,
> > Leen
> >
> > On Thu, Mar 25, 2010 at 11:23 AM, Charles Moulliard <
> cmoulliard@gmail.com
> > >wrote:
> >
> > > Leen,
> > >
> > > The behavior observed is correct because messages could not forwarded
> > till
> > > that a jms client (= camel jms endpoint) consume the messages from the
> > > topic.
> > >
> > > Kind regards,
> > >
> > > Charles Moulliard
> > > Senior Enterprise Architect
> > > Apache Camel Committer
> > >
> > > *****************************
> > > blog : http://cmoulliard.blogspot.com
> > > twitter : http://twitter.com/cmoulliard
> > > Linkedlin : http://www.linkedin.com/in/charlesmoulliard
> > >
> > > Apache Camel Group :
> > > http://www.linkedin.com/groups?home=&gid=2447439&trk=anet_ug_hm
> > >
> > >
> > > On Thu, Mar 25, 2010 at 11:19 AM, Leen Toelen <toelen@gmail.com>
> wrote:
> > >
> > > > Hi,
> > > >
> > > > I have the following topic subscriber
> > > >
> > > > public void configure() throws Exception {
> > > > ActiveMQConnectionFactory connectionFactory =
> > > > lookup(ActiveMQConnectionFactory.class);
> > > > JmsTransactionManager jmsTransactionManager =
> > > > lookup(JmsTransactionManager.class);
> > > >
> > > > JmsEndpoint topic = (JmsEndpoint) endpoint("activemq:topic:MyTopic");
> > > > topic.setClientId("SomeClientID");
> > > > topic.setDurableSubscriptionName("SomeName");
> > > > topic.setRecoveryInterval(5000);// 5 seconds
> > > > topic.setMaxConcurrentConsumers(1);
> > > > topic.setConnectionFactory(connectionFactory);
> > > > topic.setTransactionManager(jmsTransactionManager);
> > > >
> > > > JmsEndpoint toStore = (JmsEndpoint)
> endpoint("activemq:queue:ToStore");
> > > > toStore.setConnectionFactory(connectionFactory);
> > > > toStore.setTransactionManager(jmsTransactionManager);
> > > >
> > > > from(topic).
> > > >  transacted("PROPAGATION_REQUIRED")
> > > >  .to(toStore);
> > > > }
> > > >
> > > > And this is a part of the application context
> > > >
> > > > <!-- setup JMS connection factory -->
> > > > <bean id="jmsConnectionFactory"
> > > > class="org.apache.activemq.ActiveMQConnectionFactory">
> > > > <property name="brokerURL" value="tcp://localhost:61616" />
> > > > </bean>
> > > >
> > > > <!-- setup spring jms TX manager -->
> > > > <bean id="jmsTransactionManager"
> > > > class="org.springframework.jms.connection.JmsTransactionManager">
> > > > <property name="connectionFactory" ref="jmsConnectionFactory" />
> > > > </bean>
> > > >
> > > > <beans:bean id="activemq"
> > > > class="org.apache.camel.component.activemq.ActiveMQComponent">
> > > > <property name="connectionFactory" ref="jmsConnectionFactory" />
> > > > <property name="transacted" value="true" />
> > > > <property name="transactionManager" ref="jmsTransactionManager" />
> > > > </beans:bean>
> > > >
> > > > This works fine, and topic messages are being forwarded to the queue.
> > But
> > > > when I restart the ActiveMQ broker (which is on the same machine but
> > > > running
> > > > as a different process), topic messages are no longer being forwarded
> > to
> > > > the
> > > > queue until I restart the camel process.
> > > >
> > > > What is the recommended way to configure activemq endpoints in camel
> > > using
> > > > the java dsl?
> > > >
> > > > Regards,
> > > > Leen
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message