Return-Path: Delivered-To: apmail-geronimo-activemq-users-archive@www.apache.org Received: (qmail 89086 invoked from network); 21 Aug 2006 11:40:08 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 21 Aug 2006 11:40:08 -0000 Received: (qmail 51965 invoked by uid 500); 21 Aug 2006 11:40:08 -0000 Delivered-To: apmail-geronimo-activemq-users-archive@geronimo.apache.org Received: (qmail 51950 invoked by uid 500); 21 Aug 2006 11:40:08 -0000 Mailing-List: contact activemq-users-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-users@geronimo.apache.org Delivered-To: mailing list activemq-users@geronimo.apache.org Received: (qmail 51941 invoked by uid 99); 21 Aug 2006 11:40:08 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Aug 2006 04:40:08 -0700 X-ASF-Spam-Status: No, hits=0.5 required=10.0 tests=DNS_FROM_RFC_ABUSE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: domain of james.strachan@gmail.com designates 64.233.182.190 as permitted sender) Received: from [64.233.182.190] (HELO nf-out-0910.google.com) (64.233.182.190) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Aug 2006 04:40:06 -0700 Received: by nf-out-0910.google.com with SMTP id n29so2057365nfc for ; Mon, 21 Aug 2006 04:39:45 -0700 (PDT) DomainKey-Signature: a=rsa-sha1; q=dns; c=nofws; s=beta; d=gmail.com; h=received:message-id:date:from:to:subject:in-reply-to:mime-version:content-type:content-transfer-encoding:content-disposition:references; b=egG+x06HhTAa7/omJraursXuaabNEGD2VWWecUtGMOPaja+VXLMIpN0M1QUIgc+t6otIAr6ISL2r+x9vGTcHWGPPmc+2Hn3wSrEZDKZc38d63rhXngW5OqL7grQH//+Dn2hHi6Oz/BQNinxm1sUUXYdRKNsij44CKArwqiadbcs= Received: by 10.48.48.15 with SMTP id v15mr7614952nfv; Mon, 21 Aug 2006 04:39:45 -0700 (PDT) Received: by 10.78.173.20 with HTTP; Mon, 21 Aug 2006 04:39:45 -0700 (PDT) Message-ID: Date: Mon, 21 Aug 2006 12:39:45 +0100 From: "James Strachan" To: activemq-users@geronimo.apache.org Subject: Re: XA connection and XA session in separate threads In-Reply-To: <44E9984A.10407@stc.donpac.ru> MIME-Version: 1.0 Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit Content-Disposition: inline References: <44E9984A.10407@stc.donpac.ru> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N On 8/21/06, Eugene Prokopiev wrote: > Hi, > > I need to use XA connection and XA session created from it in > separate threads. Example context looks like: > > > > init-method="start" destroy-method="stop"> > > > > tcp://localhost:5000 > > > > > class="org.springframework.transaction.jta.JotmFactoryBean"/> > class="org.springframework.transaction.jta.JtaTransactionManager"> > > > > class="org.apache.activemq.ActiveMQXAConnectionFactory"> > > > > class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean"> > > > > PROPAGATION_REQUIRED > > > > > > > > > > > > > > > > > > MessageReceiverSimple.java is: > > public class MessageReceiverSimple { > > private Log log = LogFactory.getLog(getClass()); > > private JmsTemplate jmsTemplate; > > public void setJmsTemplate(JmsTemplate jmsTemplate) { > this.jmsTemplate = jmsTemplate; > } > > public void receive() { > Thread readerThread = new Thread(new Runnable(){ > public void run() { > while(!Thread.currentThread().isInterrupted()) { > Message message = jmsTemplate.receive(); > log.debug(message); > } > } > }); > readerThread.start(); > } > > } > > In this example code plain JMS API can be used instead of JmsTemplate > but it is not important in this case. Result will be the same. > > On running this example I got: > > javax.jms.JMSException: Session's XAResource has not been enlisted in a > distributed transaction. > at > org.apache.activemq.ActiveMQXASession.doStartTransaction(ActiveMQXASession.java:109) > at > org.apache.activemq.ActiveMQMessageConsumer.acknowledge(ActiveMQMessageConsumer.java:711) > at > org.apache.activemq.ActiveMQMessageConsumer.dispose(ActiveMQMessageConsumer.java:572) > at > org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:515) > at > org.springframework.jms.support.JmsUtils.closeMessageConsumer(JmsUtils.java:105) > at org.springframework.jms.core.JmsTemplate.doReceive(JmsTemplate.java:714) > at org.springframework.jms.core.JmsTemplate.doReceive(JmsTemplate.java:677) > at org.springframework.jms.core.JmsTemplate$9.doInJms(JmsTemplate.java:635) > at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:432) > at org.springframework.jms.core.JmsTemplate.receive(JmsTemplate.java:632) > at org.springframework.jms.core.JmsTemplate.receive(JmsTemplate.java:619) > at simple.MessageReceiverSimple$1.run(MessageReceiverSimple.java:22) > at java.lang.Thread.run(Thread.java:595) > > So, I write simple ActiveMQXAConnectionFactory wrapper for enlisting > Session's XAResource in distributed transaction: > > public class ActiveMQXAConnectionFactory implements ConnectionFactory, > XAConnectionFactory { > > private XAConnectionFactory connectionFactory; > private JtaTransactionManager transactionManager; > > public void setConnectionFactory(XAConnectionFactory connectionFactory) { > this.connectionFactory = connectionFactory; > } > > public void setTransactionManager(JtaTransactionManager > transactionManager) { > this.transactionManager = transactionManager; > } > > public Connection createConnection() throws JMSException { > return createXAConnection(); > } > > public Connection createConnection(String userName, String password) > throws JMSException { > return createXAConnection(userName, password); > } > > public XAConnection createXAConnection() throws JMSException { > XAConnection connection = connectionFactory.createXAConnection(); > return new ActiveMQXAConnection(connection, transactionManager); > } > > public XAConnection createXAConnection(String userName, String > password) throws JMSException { > XAConnection connection = > connectionFactory.createXAConnection(userName, password); > return new ActiveMQXAConnection(connection, transactionManager); > } > > public class ActiveMQXAConnection implements XAConnection { > > private XAConnection connection; > private JtaTransactionManager transactionManager; > > public ActiveMQXAConnection(XAConnection connection, > JtaTransactionManager transactionManager) { > this.connection = connection; > this.transactionManager = transactionManager; > } > > public Session createSession(boolean transacted, int acknowledgeMode) > throws JMSException { > return createXASession(); > } > > public XASession createXASession() throws JMSException { > XASession session = connection.createXASession(); > try { > transactionManager.getUserTransaction().begin(); > > transactionManager.getTransactionManager().getTransaction().enlistResource(session.getXAResource()); > } catch (Exception e) { > e.printStackTrace(); > } > return session; > } > > public void close() throws JMSException { > connection.close(); > } > > public ConnectionConsumer createConnectionConsumer(Destination arg0, > String arg1, ServerSessionPool arg2, int arg3) throws JMSException { > return connection.createConnectionConsumer(arg0, arg1, arg2, arg3); > } > > public ConnectionConsumer createDurableConnectionConsumer(Topic arg0, > String arg1, String arg2, ServerSessionPool arg3, int arg4) throws > JMSException { > return connection.createDurableConnectionConsumer(arg0, arg1, arg2, > arg3, arg4); > } > > public String getClientID() throws JMSException { > return connection.getClientID(); > } > > public ExceptionListener getExceptionListener() throws JMSException { > return connection.getExceptionListener(); > } > > public ConnectionMetaData getMetaData() throws JMSException { > return connection.getMetaData(); > } > > public void setClientID(String arg0) throws JMSException { > connection.setClientID(arg0); > } > > public void setExceptionListener(ExceptionListener arg0) throws > JMSException { > connection.setExceptionListener(arg0); > } > > public void start() throws JMSException { > connection.start(); > } > > public void stop() throws JMSException { > connection.stop(); > } > > } > > } > > On using it instead of org.apache.activemq.ActiveMQXAConnectionFactory > it I got: > > INFO BrokerService - ActiveMQ null JMS Message Broker (localhost) is > starting > INFO BrokerService - For help or more information please see: > http://incubator.apache.org/activemq/ > INFO TransportServerThreadSupport - Listening for connections at: > tcp://prokopiev.stc.donpac.ru:5000 > INFO TransportConnector - Connector tcp://prokopiev.stc.donpac.ru:5000 > Started > INFO BrokerService - ActiveMQ JMS Message Broker (localhost, > ID:prokopiev.stc.donpac.ru-40533-1156158196797-0:0) started > INFO jotm - JOTM started with a local transaction factory which is not > bound. > INFO jotm - CAROL initialization > INFO ConfigurationRepository - No protocols were defined for property > 'carol.protocols', trying with default protocol = 'jrmp'. > INFO jta - JOTM 2.0.10 > INFO JtaTransactionManager - Using JTA UserTransaction: > org.objectweb.jotm.Current@39443f > INFO JtaTransactionManager - Using JTA TransactionManager: > org.objectweb.jotm.Current@39443f > INFO ManagementContext - JMX consoles can connect to > service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi > INFO DefaultAopProxyFactory - CGLIB2 available: proxyTargetClass > feature enabled > DEBUG MessageReceiverSimple - ActiveMQObjectMessage {commandId = 5, > responseRequired = true, messageId = > ID:prokopiev.stc.donpac.ru-40541-1156158200439-0:0:1:1:1, > originalDestination = null, originalTransactionId = null, producerId = > ID:prokopiev.stc.donpac.ru-40541-1156158200439-0:0:1:1, destination = > queue://messages.input, transactionId = null, expiration = 0, timestamp > = 1156158200912, arrival = 0, correlationId = null, replyTo = null, > persistent = true, type = null, priority = 4, groupID = null, > groupSequence = 0, targetConsumerId = null, compressed = false, userID = > null, content = org.apache.activeio.packet.ByteSequence@a45536, > marshalledProperties = null, dataStructure = null, redeliveryCounter = > 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody > = true} > INFO jotm - set rollback only (tx=bb14:38:0:011d45be9b8fb301e8...44b402:) > > So, my questions are: > > 1. My ActiveMQXAConnection.createXASession() implementation looks like > dirty hack and can't work propertly because transaction started but not > commited anywhere. What is the rigth place to start and commit/rollback > transaction? Normally the JCA container does this. If you are using Spring then the Spring Transaction Manager or Message Listener container shoudl do this - not the connection factory > 2. Is it possible to include similar ActiveMQXAConnectionFactory > implementation into ActiveMQ? It will be very useful for using with > Spring DefaultMessageListenerContainer for example. I'd rather fix Spring's container to work with any JMS provider properly than adding a dirty hack to ActiveMQ Enlistment is the responsibility of the container - be it Jencks, MDB container or Spring. > Now JTA transactions > can't work with ActiveMQ/Spring/DefaultMessageListenerContainer. You'd best ask the Spring guys - I'm not sure if the Spring container supports JTA -- James ------- http://radio.weblogs.com/0112098/