activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Strachan" <james.strac...@gmail.com>
Subject Re: XA connection and XA session in separate threads
Date Mon, 21 Aug 2006 11:39:45 GMT
On 8/21/06, Eugene Prokopiev <prokopiev@stc.donpac.ru> wrote:
> Hi,
>
> I need to use XA connection and XA session created from it in
> separate threads. Example context looks like:
>
> <beans>
>
>         <bean id="broker" class="org.apache.activemq.broker.BrokerService"
> init-method="start" destroy-method="stop">
>                 <property name="persistent" value="false"/>
>                 <property name="transportConnectorURIs">
>                         <list>
>                                 <value>tcp://localhost:5000</value>
>                         </list>
>                 </property>
>         </bean>
>
>         <bean id="jotm"
> class="org.springframework.transaction.jta.JotmFactoryBean"/>
>         <bean id="jotmTransactionManager"
> class="org.springframework.transaction.jta.JtaTransactionManager">
>                 <property name="userTransaction" ref="jotm"/>
>         </bean>
>
>         <bean id="connectionFactory"
> class="org.apache.activemq.ActiveMQXAConnectionFactory">
>                 <property name="brokerURL" value="tcp://localhost:5000" />
>         </bean>
>
>         <bean id="messageReceiver"
> class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
>          <property name="transactionManager" ref="jotmTransactionManager"/>
>          <property name="transactionAttributes">
>              <props>
>                  <prop key="*">PROPAGATION_REQUIRED</prop>
>              </props>
>          </property>
>          <property name="target">
>                 <bean class="simple.MessageReceiverSimple">
>                                 <property name="jmsTemplate">
>                                         <bean class="org.springframework.jms.core.JmsTemplate">
>                                                 <property name="connectionFactory"
ref="connectionFactory"/>
>                                                 <property name="defaultDestinationName"
value="messages.input"/>
>                                         </bean>
>                                 </property>
>                         </bean>
>          </property>
>                 <property name="proxyTargetClass" value="true"/>
>      </bean>
>
> </beans>
>
> MessageReceiverSimple.java is:
>
> public class MessageReceiverSimple {
>
>         private Log log = LogFactory.getLog(getClass());
>
>         private JmsTemplate jmsTemplate;
>
>         public void setJmsTemplate(JmsTemplate jmsTemplate) {
>                 this.jmsTemplate = jmsTemplate;
>         }
>
>         public void receive() {
>                 Thread readerThread = new Thread(new Runnable(){
>                         public void run() {
>                                 while(!Thread.currentThread().isInterrupted()) {
>                                         Message message = jmsTemplate.receive();
>                                         log.debug(message);
>                                 }
>                         }
>                 });
>                 readerThread.start();
>         }
>
> }
>
> In this example code plain JMS API can be used instead of JmsTemplate
> but it is not important in this case. Result will be the same.
>
> On running this example I got:
>
> javax.jms.JMSException: Session's XAResource has not been enlisted in a
> distributed transaction.
>         at
> org.apache.activemq.ActiveMQXASession.doStartTransaction(ActiveMQXASession.java:109)
>         at
> org.apache.activemq.ActiveMQMessageConsumer.acknowledge(ActiveMQMessageConsumer.java:711)
>         at
> org.apache.activemq.ActiveMQMessageConsumer.dispose(ActiveMQMessageConsumer.java:572)
>         at
> org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:515)
>         at
> org.springframework.jms.support.JmsUtils.closeMessageConsumer(JmsUtils.java:105)
>         at org.springframework.jms.core.JmsTemplate.doReceive(JmsTemplate.java:714)
>         at org.springframework.jms.core.JmsTemplate.doReceive(JmsTemplate.java:677)
>         at org.springframework.jms.core.JmsTemplate$9.doInJms(JmsTemplate.java:635)
>         at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:432)
>         at org.springframework.jms.core.JmsTemplate.receive(JmsTemplate.java:632)
>         at org.springframework.jms.core.JmsTemplate.receive(JmsTemplate.java:619)
>         at simple.MessageReceiverSimple$1.run(MessageReceiverSimple.java:22)
>         at java.lang.Thread.run(Thread.java:595)
>
> So, I write simple ActiveMQXAConnectionFactory wrapper for enlisting
> Session's XAResource in distributed transaction:
>
> public class ActiveMQXAConnectionFactory implements ConnectionFactory,
> XAConnectionFactory {
>
>         private XAConnectionFactory connectionFactory;
>         private JtaTransactionManager transactionManager;
>
>         public void setConnectionFactory(XAConnectionFactory connectionFactory) {
>                 this.connectionFactory = connectionFactory;
>         }
>
>         public void setTransactionManager(JtaTransactionManager
> transactionManager) {
>                 this.transactionManager = transactionManager;
>         }
>
>         public Connection createConnection() throws JMSException {
>                 return createXAConnection();
>         }
>
>         public Connection createConnection(String userName, String password)
> throws JMSException {
>                 return createXAConnection(userName, password);
>         }
>
>         public XAConnection createXAConnection() throws JMSException {
>                 XAConnection connection = connectionFactory.createXAConnection();
>                 return new ActiveMQXAConnection(connection, transactionManager);
>         }
>
>         public XAConnection createXAConnection(String userName, String
> password) throws JMSException {
>                 XAConnection connection =
> connectionFactory.createXAConnection(userName, password);
>                 return new ActiveMQXAConnection(connection, transactionManager);
>         }
>
>         public class ActiveMQXAConnection implements XAConnection {
>
>                 private XAConnection connection;
>                 private JtaTransactionManager transactionManager;
>
>                 public ActiveMQXAConnection(XAConnection connection,
> JtaTransactionManager transactionManager) {
>                         this.connection = connection;
>                         this.transactionManager = transactionManager;
>                 }
>
>                 public Session createSession(boolean transacted, int acknowledgeMode)
> throws JMSException {
>                         return createXASession();
>                 }
>
>                 public XASession createXASession() throws JMSException {
>                         XASession session = connection.createXASession();
>                         try {
>                                 transactionManager.getUserTransaction().begin();
>
> transactionManager.getTransactionManager().getTransaction().enlistResource(session.getXAResource());
>                         } catch (Exception e) {
>                                 e.printStackTrace();
>                         }
>                         return session;
>                 }
>
>                 public void close() throws JMSException {
>                         connection.close();
>                 }
>
>                 public ConnectionConsumer createConnectionConsumer(Destination arg0,
> String arg1, ServerSessionPool arg2, int arg3) throws JMSException {
>                         return connection.createConnectionConsumer(arg0, arg1, arg2,
arg3);
>                 }
>
>                 public ConnectionConsumer createDurableConnectionConsumer(Topic arg0,
> String arg1, String arg2, ServerSessionPool arg3, int arg4) throws
> JMSException {
>                         return connection.createDurableConnectionConsumer(arg0, arg1,
arg2,
> arg3, arg4);
>                 }
>
>                 public String getClientID() throws JMSException {
>                         return connection.getClientID();
>                 }
>
>                 public ExceptionListener getExceptionListener() throws JMSException {
>                         return connection.getExceptionListener();
>                 }
>
>                 public ConnectionMetaData getMetaData() throws JMSException {
>                         return connection.getMetaData();
>                 }
>
>                 public void setClientID(String arg0) throws JMSException {
>                         connection.setClientID(arg0);
>                 }
>
>                 public void setExceptionListener(ExceptionListener arg0) throws
> JMSException {
>                         connection.setExceptionListener(arg0);
>                 }
>
>                 public void start() throws JMSException {
>                         connection.start();
>                 }
>
>                 public void stop() throws JMSException {
>                         connection.stop();
>                 }
>
>         }
>
> }
>
> On using it instead of org.apache.activemq.ActiveMQXAConnectionFactory
> it I got:
>
> INFO  BrokerService - ActiveMQ null JMS Message Broker (localhost) is
> starting
> INFO  BrokerService - For help or more information please see:
> http://incubator.apache.org/activemq/
> INFO  TransportServerThreadSupport - Listening for connections at:
> tcp://prokopiev.stc.donpac.ru:5000
> INFO  TransportConnector - Connector tcp://prokopiev.stc.donpac.ru:5000
> Started
> INFO  BrokerService - ActiveMQ JMS Message Broker (localhost,
> ID:prokopiev.stc.donpac.ru-40533-1156158196797-0:0) started
> INFO  jotm - JOTM started with a local transaction factory which is not
> bound.
> INFO  jotm - CAROL initialization
> INFO  ConfigurationRepository - No protocols were defined for property
> 'carol.protocols', trying with default protocol = 'jrmp'.
> INFO  jta - JOTM 2.0.10
> INFO  JtaTransactionManager - Using JTA UserTransaction:
> org.objectweb.jotm.Current@39443f
> INFO  JtaTransactionManager - Using JTA TransactionManager:
> org.objectweb.jotm.Current@39443f
> INFO  ManagementContext - JMX consoles can connect to
> service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
> INFO  DefaultAopProxyFactory - CGLIB2 available: proxyTargetClass
> feature enabled
> DEBUG MessageReceiverSimple - ActiveMQObjectMessage {commandId = 5,
> responseRequired = true, messageId =
> ID:prokopiev.stc.donpac.ru-40541-1156158200439-0:0:1:1:1,
> originalDestination = null, originalTransactionId = null, producerId =
> ID:prokopiev.stc.donpac.ru-40541-1156158200439-0:0:1:1, destination =
> queue://messages.input, transactionId = null, expiration = 0, timestamp
> = 1156158200912, arrival = 0, correlationId = null, replyTo = null,
> persistent = true, type = null, priority = 4, groupID = null,
> groupSequence = 0, targetConsumerId = null, compressed = false, userID =
> null, content = org.apache.activeio.packet.ByteSequence@a45536,
> marshalledProperties = null, dataStructure = null, redeliveryCounter =
> 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody
> = true}
> INFO  jotm - set rollback only (tx=bb14:38:0:011d45be9b8fb301e8...44b402:)
>
> So, my questions are:
>
> 1. My ActiveMQXAConnection.createXASession() implementation looks like
> dirty hack and can't work propertly because transaction started but not
> commited anywhere. What is the rigth place to start and commit/rollback
> transaction?

Normally the JCA container does this. If you are using Spring then the
Spring Transaction Manager or Message Listener container shoudl do
this - not the connection factory


> 2. Is it possible to include similar ActiveMQXAConnectionFactory
> implementation into ActiveMQ? It will be very useful for using with
> Spring DefaultMessageListenerContainer for example.

I'd rather fix Spring's container to work with any JMS provider
properly than adding a dirty hack to ActiveMQ

Enlistment is the responsibility of the container - be it Jencks, MDB
container or Spring.


>  Now JTA transactions
> can't work with ActiveMQ/Spring/DefaultMessageListenerContainer.

You'd best ask the Spring guys - I'm not sure if the Spring container
supports JTA
-- 

James
-------
http://radio.weblogs.com/0112098/

Mime
View raw message