activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eugene Prokopiev <prokop...@stc.donpac.ru>
Subject XA connection and XA session in separate threads
Date Mon, 21 Aug 2006 11:26:02 GMT
Hi,

I need to use XA connection and XA session created from it in
separate threads. Example context looks like:

<beans>

	<bean id="broker" class="org.apache.activemq.broker.BrokerService" 
init-method="start" destroy-method="stop">
		<property name="persistent" value="false"/>
		<property name="transportConnectorURIs">
			<list>
				<value>tcp://localhost:5000</value>
			</list>
		</property>
	</bean>
	
	<bean id="jotm" 
class="org.springframework.transaction.jta.JotmFactoryBean"/>
	<bean id="jotmTransactionManager" 
class="org.springframework.transaction.jta.JtaTransactionManager">
		<property name="userTransaction" ref="jotm"/>
	</bean>
	
	<bean id="connectionFactory" 
class="org.apache.activemq.ActiveMQXAConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:5000" />
	</bean>
	
	<bean id="messageReceiver" 
class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
         <property name="transactionManager" ref="jotmTransactionManager"/>
         <property name="transactionAttributes">
             <props>
                 <prop key="*">PROPAGATION_REQUIRED</prop>
             </props>
         </property>
         <property name="target">
         	<bean class="simple.MessageReceiverSimple">
				<property name="jmsTemplate">
					<bean class="org.springframework.jms.core.JmsTemplate">
						<property name="connectionFactory" ref="connectionFactory"/>
						<property name="defaultDestinationName" value="messages.input"/>
					</bean>
				</property>
			</bean>
         </property>
		<property name="proxyTargetClass" value="true"/>
     </bean>
	
</beans>

MessageReceiverSimple.java is:

public class MessageReceiverSimple {

	private Log log = LogFactory.getLog(getClass());
	
	private JmsTemplate jmsTemplate;
	
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
	
	public void receive() {
		Thread readerThread = new Thread(new Runnable(){
			public void run() {
				while(!Thread.currentThread().isInterrupted()) {
					Message message = jmsTemplate.receive();
					log.debug(message);
				}
			}			
		});
		readerThread.start();
	}
	
}

In this example code plain JMS API can be used instead of JmsTemplate 
but it is not important in this case. Result will be the same.

On running this example I got:

javax.jms.JMSException: Session's XAResource has not been enlisted in a 
distributed transaction.
	at 
org.apache.activemq.ActiveMQXASession.doStartTransaction(ActiveMQXASession.java:109)
	at 
org.apache.activemq.ActiveMQMessageConsumer.acknowledge(ActiveMQMessageConsumer.java:711)
	at 
org.apache.activemq.ActiveMQMessageConsumer.dispose(ActiveMQMessageConsumer.java:572)
	at 
org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:515)
	at 
org.springframework.jms.support.JmsUtils.closeMessageConsumer(JmsUtils.java:105)
	at org.springframework.jms.core.JmsTemplate.doReceive(JmsTemplate.java:714)
	at org.springframework.jms.core.JmsTemplate.doReceive(JmsTemplate.java:677)
	at org.springframework.jms.core.JmsTemplate$9.doInJms(JmsTemplate.java:635)
	at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:432)
	at org.springframework.jms.core.JmsTemplate.receive(JmsTemplate.java:632)
	at org.springframework.jms.core.JmsTemplate.receive(JmsTemplate.java:619)
	at simple.MessageReceiverSimple$1.run(MessageReceiverSimple.java:22)
	at java.lang.Thread.run(Thread.java:595)

So, I write simple ActiveMQXAConnectionFactory wrapper for enlisting 
Session's XAResource in distributed transaction:

public class ActiveMQXAConnectionFactory implements ConnectionFactory, 
XAConnectionFactory {

	private XAConnectionFactory connectionFactory;
	private JtaTransactionManager transactionManager;
	
	public void setConnectionFactory(XAConnectionFactory connectionFactory) {
		this.connectionFactory = connectionFactory;
	}

	public void setTransactionManager(JtaTransactionManager 
transactionManager) {
		this.transactionManager = transactionManager;
	}

	public Connection createConnection() throws JMSException {
		return createXAConnection();
	}

	public Connection createConnection(String userName, String password) 
throws JMSException {
		return createXAConnection(userName, password);
	}

	public XAConnection createXAConnection() throws JMSException {
		XAConnection connection = connectionFactory.createXAConnection();
		return new ActiveMQXAConnection(connection, transactionManager);
	}

	public XAConnection createXAConnection(String userName, String 
password) throws JMSException {
		XAConnection connection = 
connectionFactory.createXAConnection(userName, password);
		return new ActiveMQXAConnection(connection, transactionManager);
	}
	
	public class ActiveMQXAConnection implements XAConnection {

		private XAConnection connection;
		private JtaTransactionManager transactionManager;
		
		public ActiveMQXAConnection(XAConnection connection, 
JtaTransactionManager transactionManager) {
			this.connection = connection;
			this.transactionManager = transactionManager;
		}
		
		public Session createSession(boolean transacted, int acknowledgeMode) 
throws JMSException {
			return createXASession();
		}

		public XASession createXASession() throws JMSException {
			XASession session = connection.createXASession();
			try {
				transactionManager.getUserTransaction().begin();
			 
transactionManager.getTransactionManager().getTransaction().enlistResource(session.getXAResource());
			} catch (Exception e) {
				e.printStackTrace();
			}
			return session;
		}

		public void close() throws JMSException {
			connection.close();			
		}

		public ConnectionConsumer createConnectionConsumer(Destination arg0, 
String arg1, ServerSessionPool arg2, int arg3) throws JMSException {
			return connection.createConnectionConsumer(arg0, arg1, arg2, arg3);
		}

		public ConnectionConsumer createDurableConnectionConsumer(Topic arg0, 
String arg1, String arg2, ServerSessionPool arg3, int arg4) throws 
JMSException {
			return connection.createDurableConnectionConsumer(arg0, arg1, arg2, 
arg3, arg4);
		}

		public String getClientID() throws JMSException {
			return connection.getClientID();
		}

		public ExceptionListener getExceptionListener() throws JMSException {
			return connection.getExceptionListener();
		}

		public ConnectionMetaData getMetaData() throws JMSException {
			return connection.getMetaData();
		}

		public void setClientID(String arg0) throws JMSException {
			connection.setClientID(arg0);
		}

		public void setExceptionListener(ExceptionListener arg0) throws 
JMSException {
			connection.setExceptionListener(arg0);
		}

		public void start() throws JMSException {
			connection.start();
		}

		public void stop() throws JMSException {
			connection.stop();
		}
		
	}

}

On using it instead of org.apache.activemq.ActiveMQXAConnectionFactory 
it I got:

INFO  BrokerService - ActiveMQ null JMS Message Broker (localhost) is 
starting
INFO  BrokerService - For help or more information please see: 
http://incubator.apache.org/activemq/
INFO  TransportServerThreadSupport - Listening for connections at: 
tcp://prokopiev.stc.donpac.ru:5000
INFO  TransportConnector - Connector tcp://prokopiev.stc.donpac.ru:5000 
Started
INFO  BrokerService - ActiveMQ JMS Message Broker (localhost, 
ID:prokopiev.stc.donpac.ru-40533-1156158196797-0:0) started
INFO  jotm - JOTM started with a local transaction factory which is not 
bound.
INFO  jotm - CAROL initialization
INFO  ConfigurationRepository - No protocols were defined for property 
'carol.protocols', trying with default protocol = 'jrmp'.
INFO  jta - JOTM 2.0.10
INFO  JtaTransactionManager - Using JTA UserTransaction: 
org.objectweb.jotm.Current@39443f
INFO  JtaTransactionManager - Using JTA TransactionManager: 
org.objectweb.jotm.Current@39443f
INFO  ManagementContext - JMX consoles can connect to 
service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
INFO  DefaultAopProxyFactory - CGLIB2 available: proxyTargetClass 
feature enabled
DEBUG MessageReceiverSimple - ActiveMQObjectMessage {commandId = 5, 
responseRequired = true, messageId = 
ID:prokopiev.stc.donpac.ru-40541-1156158200439-0:0:1:1:1, 
originalDestination = null, originalTransactionId = null, producerId = 
ID:prokopiev.stc.donpac.ru-40541-1156158200439-0:0:1:1, destination = 
queue://messages.input, transactionId = null, expiration = 0, timestamp 
= 1156158200912, arrival = 0, correlationId = null, replyTo = null, 
persistent = true, type = null, priority = 4, groupID = null, 
groupSequence = 0, targetConsumerId = null, compressed = false, userID = 
null, content = org.apache.activeio.packet.ByteSequence@a45536, 
marshalledProperties = null, dataStructure = null, redeliveryCounter = 
0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody 
= true}
INFO  jotm - set rollback only (tx=bb14:38:0:011d45be9b8fb301e8...44b402:)

So, my questions are:

1. My ActiveMQXAConnection.createXASession() implementation looks like 
dirty hack and can't work propertly because transaction started but not 
commited anywhere. What is the rigth place to start and commit/rollback 
transaction?

2. Is it possible to include similar ActiveMQXAConnectionFactory 
implementation into ActiveMQ? It will be very useful for using with 
Spring DefaultMessageListenerContainer for example. Now JTA transactions 
can't work with ActiveMQ/Spring/DefaultMessageListenerContainer.

I know about Jencks project but in some cases it can be more heavyweight 
than Spring message driven POJO or any other custom consumer 
implementation which must use JTA.

--
Thanks,
Eugene Prokopiev



Mime
View raw message