camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Krzysztof Burghardt <krzysz...@burghardt.pl>
Subject Problem with transactional message forwarding between two ActiveMQ brokers
Date Fri, 05 Mar 2010 15:46:20 GMT
Dear Users,

I have configured Camel (v2.0) to forward messages from one broker to
another. Now I'm looking for solution for messages disappearing from
queue (consumed by Camel) while destination broker is down. I prepared
config with transaction manager (below), but it did not work as I
expect:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
       http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
       http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
       http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
    ">

    <bean id="jmsConnectionFactory1"
class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>

    <bean id="jmsConnectionFactory2"
class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61717"/>
    </bean>

    <bean id="jmsTransactionManager1"
class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory" ref="jmsConnectionFactory1"/>
    </bean>

    <bean id="jmsConfig1"
class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="jmsConnectionFactory1"/>
        <property name="transactionManager" ref="jmsTransactionManager1"/>
        <property name="transacted" value="true"/>
        <property name="transactedInOut" value="true"/>
        <property name="concurrentConsumers" value="1"/>
    </bean>

    <bean id="jmsTransactionManager2"
class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory" ref="jmsConnectionFactory2"/>
    </bean>

    <bean id="jmsConfig2"
class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="jmsConnectionFactory2"/>
        <property name="transactionManager" ref="jmsTransactionManager2"/>
        <property name="transacted" value="true"/>
        <property name="transactedInOut" value="true"/>
        <property name="concurrentConsumers" value="1"/>
    </bean>

    <bean id="activemq1" class="org.apache.camel.component.jms.JmsComponent">
        <property name="configuration" ref="jmsConfig1"/>
    </bean>

    <bean id="activemq2" class="org.apache.camel.component.jms.JmsComponent">
        <property name="configuration" ref="jmsConfig2"/>
    </bean>

    <bean id="PROPAGATION_REQUIRED_POLICY_1"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
        <constructor-arg>
            <bean
class="org.springframework.transaction.support.TransactionTemplate">
                <property name="transactionManager"
ref="jmsTransactionManager1"/>
            </bean>
        </constructor-arg>
    </bean>

    <bean id="PROPAGATION_REQUIRED_POLICY_2"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
        <constructor-arg>
            <bean
class="org.springframework.transaction.support.TransactionTemplate">
                <property name="transactionManager"
ref="jmsTransactionManager2"/>
            </bean>
        </constructor-arg>
    </bean>

    <camelContext id="camel1" xmlns="http://camel.apache.org/schema/spring">
        <route id="camel1-route1">
            <from uri="activemq1:queue:in1?transacted=true"/>
            <transacted/>
            <policy ref="PROPAGATION_REQUIRED_POLICY_1"/>
            <doTry>
                <to uri="activemq2:queue:out1?transacted=true"/>
                <doCatch>
                    <exception>java.lang.Exception</exception>
                    <rollback/>
                </doCatch>
            </doTry>
        </route>
        <route id="camel1-route2">
            <from uri="activemq2:queue:in2?transacted=true"/>
            <transacted/>
            <policy ref="PROPAGATION_REQUIRED_POLICY_2"/>
            <doTry>
                <to uri="activemq1:queue:out2?transacted=true"/>
                <doCatch>
                    <exception>java.lang.Exception</exception>
                    <rollback/>
                </doCatch>
            </doTry>
        </route>
    </camelContext>

</beans>

Messages from input queue are consumed even when destination queue is
unavailable (broker down). In Camel log I found some errors like this
(one for each input message):

ERROR | org.apache.camel.RollbackExchangeException: Intended rollback
on the exchange: Exchange[Message: Enter some text here for the
message body...]
 WARN | Execution of JMS message listener failed
org.apache.camel.spring.spi.TransactedRuntimeCamelException:
org.apache.camel.RollbackExchangeException: Intended rollback on the
exchange: Exchange[Message: Enter some text here for the message
body...]
        at org.apache.camel.spring.spi.TransactionErrorHandler.wrapTransactedRuntimeException(TransactionErrorHandler.java:171)
        at org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:121)

Any suggestions welcome.

Best regards,
-- 
Krzysztof Burghardt <krzysztof@burghardt.pl>
http://www.burghardt.pl/

Mime
View raw message