activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1005794 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/activemq/transaction/ activemq-spring/ activemq-spring/src/test/java/org/apache/activemq/spring/ activemq-spring/s...
Date Fri, 08 Oct 2010 12:08:32 GMT
Author: dejanb
Date: Fri Oct  8 12:08:32 2010
New Revision: 1005794

URL: http://svn.apache.org/viewvc?rev=1005794&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2950 - additional fix to support parallel transactions

Added:
    activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java
    activemq/trunk/activemq-spring/src/test/resources/spring/xa.xml
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
    activemq/trunk/activemq-spring/pom.xml

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?rev=1005794&r1=1005793&r2=1005794&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
Fri Oct  8 12:08:32 2010
@@ -58,7 +58,7 @@ public class TransactionBroker extends B
 
     // The prepared XA transactions.
     private TransactionStore transactionStore;
-    private Map<TransactionId, Transaction> xaTransactions = new LinkedHashMap<TransactionId,
Transaction>();
+    private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId,
XATransaction>();
     private ActiveMQMessageAudit audit;
 
     public TransactionBroker(Broker next, TransactionStore transactionStore) {
@@ -125,7 +125,7 @@ public class TransactionBroker extends B
     public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception
{
         List<TransactionId> txs = new ArrayList<TransactionId>();
         synchronized (xaTransactions) {
-            for (Iterator<Transaction> iter = xaTransactions.values().iterator(); iter.hasNext();)
{
+            for (Iterator<XATransaction> iter = xaTransactions.values().iterator();
iter.hasNext();) {
                 Transaction tx = iter.next();
                 if (tx.isPrepared()) {
                     if (LOG.isDebugEnabled()) {
@@ -146,13 +146,13 @@ public class TransactionBroker extends B
     public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception
{
         // the transaction may have already been started.
         if (xid.isXATransaction()) {
-            Transaction transaction = null;
+            XATransaction transaction = null;
             synchronized (xaTransactions) {
                 transaction = xaTransactions.get(xid);
                 if (transaction != null) {
                     return;
                 }
-                transaction = new XATransaction(transactionStore, (XATransactionId)xid, this);
+                transaction = new XATransaction(transactionStore, (XATransactionId)xid, this,
context.getConnectionId());
                 xaTransactions.put(xid, transaction);
             }
         } else {
@@ -252,9 +252,10 @@ public class TransactionBroker extends B
             iter.remove();
         }
 
-        for (Transaction tx : xaTransactions.values()) {
+
+        for (XATransaction tx : xaTransactions.values()) {
            try {
-             if (!tx.isPrepared()) {
+             if (tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared())
{
                 tx.rollback();
              }
            } catch (Exception e) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java?rev=1005794&r1=1005793&r2=1005794&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
Fri Oct  8 12:08:32 2010
@@ -20,6 +20,7 @@ import java.io.IOException;
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import org.apache.activemq.broker.TransactionBroker;
+import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.store.TransactionStore;
@@ -36,11 +37,13 @@ public class XATransaction extends Trans
     private final TransactionStore transactionStore;
     private final XATransactionId xid;
     private final TransactionBroker broker;
+    private final ConnectionId connectionId;
 
-    public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker
broker) {
+    public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker
broker, ConnectionId connectionId) {
         this.transactionStore = transactionStore;
         this.xid = xid;
         this.broker = broker;
+        this.connectionId = connectionId;
         if (LOG.isDebugEnabled()) {
             LOG.debug("XA Transaction new/begin : " + xid);
         }
@@ -199,6 +202,10 @@ public class XATransaction extends Trans
         broker.removeTransaction(xid);
     }
 
+    public ConnectionId getConnectionId() {
+        return connectionId;
+    }
+
     @Override
     public TransactionId getTransactionId() {
         return xid;

Modified: activemq/trunk/activemq-spring/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/pom.xml?rev=1005794&r1=1005793&r2=1005794&view=diff
==============================================================================
--- activemq/trunk/activemq-spring/pom.xml (original)
+++ activemq/trunk/activemq-spring/pom.xml Fri Oct  8 12:08:32 2010
@@ -115,6 +115,29 @@
     	<scope>test</scope>
     </dependency>
     <dependency>
+         <groupId>org.jencks</groupId>
+         <artifactId>jencks</artifactId>
+         <version>2.2</version>
+         <scope>test</scope>
+    </dependency>
+    <dependency>
+         <groupId>org.slf4j</groupId>
+         <artifactId>slf4j-api</artifactId>
+         <version>1.4.3</version>
+         <scope>test</scope>
+    </dependency>
+    <dependency>
+         <groupId>org.slf4j</groupId>
+         <artifactId>slf4j-log4j12</artifactId>
+         <version>1.4.3</version>
+         <scope>test</scope>
+    </dependency>
+    <dependency>
+         <groupId>${project.groupId}</groupId>
+         <artifactId>activemq-ra</artifactId>
+         <scope>test</scope>
+    </dependency>
+    <dependency>
         <groupId>org.springframework.osgi</groupId>
         <artifactId>spring-osgi-core</artifactId>
         <exclusions>

Added: activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java?rev=1005794&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java
(added)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java
Fri Oct  8 12:08:32 2010
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.spring;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import org.springframework.test.context.transaction.TransactionConfiguration;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionException;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallbackWithoutResult;
+import org.springframework.transaction.support.TransactionTemplate;
+
+import javax.annotation.Resource;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import java.util.Arrays;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+
+@ContextConfiguration(locations = {"classpath:spring/xa.xml"})
+@TransactionConfiguration(transactionManager = "transactionManager", defaultRollback = false)
+public class ParallelXATransactionTest {
+
+    private static final Log LOG = LogFactory.getLog(ParallelXATransactionTest.class);
+
+    @Resource(name = "transactionManager")
+    PlatformTransactionManager txManager = null;
+
+    @Resource(name = "transactionManager2")
+    PlatformTransactionManager txManager2 = null;
+
+
+    @Resource(name = "jmsTemplate")
+    JmsTemplate jmsTemplate = null;
+
+    @Resource(name = "jmsTemplate2")
+    JmsTemplate jmsTemplate2 = null;
+
+
+    public static final int NB_MSG = 100;
+    public static final String BODY = Arrays.toString(new int[1024]);
+    private static final String[] QUEUES = {"TEST.queue1", "TEST.queue2", "TEST.queue3",
"TEST.queue4", "TEST.queue5"};
+    private static final String AUDIT = "TEST.audit";
+    public static final int SLEEP = 500;
+
+    @Test
+    @DirtiesContext
+    public void testParalellXaTx() throws Exception {
+
+
+        class ProducerThread extends Thread {
+
+            PlatformTransactionManager txManager;
+            JmsTemplate jmsTemplate;
+            Exception lastException;
+
+
+            public ProducerThread(JmsTemplate jmsTemplate, PlatformTransactionManager txManager)
{
+               this.jmsTemplate = jmsTemplate;
+               this.txManager = txManager;
+            }
+
+            public void run() {
+                int i = 0;
+                while (i++ < 10) {
+
+                    try {
+                        Thread.sleep((long) (Math.random() * SLEEP));
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    TransactionTemplate tt = new TransactionTemplate(this.txManager);
+
+
+                    try {
+                        tt.execute(new TransactionCallbackWithoutResult() {
+                            @Override
+                            protected void doInTransactionWithoutResult(TransactionStatus
status) {
+                                try {
+
+                                    for (final String queue : QUEUES) {
+                                        jmsTemplate.send(queue + "," + AUDIT, new MessageCreator()
{
+                                            public Message createMessage(Session session)
throws JMSException {
+                                                return session.createTextMessage("P1: " +
queue + " - " + BODY);
+                                            }
+                                        });
+                                        Thread.sleep((long) (Math.random() * SLEEP));
+                                        LOG.info("P1: Send msg to " + queue + "," + AUDIT);
+                                    }
+
+                                } catch (Exception e) {
+                                    Assert.fail("Exception occurred " + e);
+                                }
+
+
+                            }
+                        });
+                    } catch (TransactionException e) {
+                        lastException = e;
+                        break;
+                    }
+
+                }
+            }
+
+            public Exception getLastException() {
+                return lastException;
+            }
+        }
+
+
+        ProducerThread t1 = new ProducerThread(jmsTemplate, txManager);
+        ProducerThread t2 = new ProducerThread(jmsTemplate2, txManager2);
+
+        t1.start();
+        t2.start();
+
+        t1.join();
+        t2.join();
+
+        if (t1.getLastException() != null) {
+            Assert.fail("Exception occurred " + t1.getLastException());
+        }
+
+        if (t2.getLastException() != null) {
+            Assert.fail("Exception occurred " + t2.getLastException());
+        }
+
+    }
+
+}

Added: activemq/trunk/activemq-spring/src/test/resources/spring/xa.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/resources/spring/xa.xml?rev=1005794&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/test/resources/spring/xa.xml (added)
+++ activemq/trunk/activemq-spring/src/test/resources/spring/xa.xml Fri Oct  8 12:08:32 2010
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:amq="http://activemq.apache.org/schema/core"
+       xsi:schemaLocation="
+	   http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+	   http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
+	   http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
+	   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+    <!-- broker -->
+
+    <amq:broker brokerName="test" useJmx="false" persistent="false">
+        <amq:transportConnectors>
+            <amq:transportConnector name="transport" uri="nio://0.0.0.0:61616"/>
+        </amq:transportConnectors>
+    </amq:broker>
+
+    <!-- simple tx -->
+
+    <bean id="jmsTemplate2" class="org.springframework.jms.core.JmsTemplate">
+        <property name="connectionFactory" ref="connectionFactory2"/>
+    </bean>
+
+
+    <bean id="transactionManager2" class="org.springframework.jms.connection.JmsTransactionManager">
+        <property name="connectionFactory" ref="connectionFactory2"/>
+    </bean>
+
+
+    <bean id="connectionFactory2" class="org.apache.activemq.ActiveMQConnectionFactory">
+        <property name="brokerURL" value="tcp://localhost:61616"/>
+        <property name="userName" value="smx"/>
+        <property name="password" value="smx"/>
+    </bean>
+
+    <!-- xa tx -->
+
+    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
+        <property name="connectionFactory" ref="connectionFactory"/>
+    </bean>
+
+    <bean id="transactionManager" class="org.jencks.factory.TransactionManagerFactoryBean">
+        <property name="defaultTransactionTimeoutSeconds" value="300"/>
+    </bean>
+
+    <bean id="connectionFactory" class="org.jencks.factory.ConnectionFactoryFactoryBean">
+        <property name="connectionManager" ref="jmsConnectionManager"/>
+        <property name="managedConnectionFactory" ref="jmsManagedConnectionFactory"/>
+    </bean>
+
+    <bean id="jmsConnectionManager" class="org.jencks.factory.ConnectionManagerFactoryBean">
+        <property name="transaction" value="xa"/>
+        <property name="transactionManager" ref="transactionManager"/>
+        <property name="poolMaxSize" value="20"/>
+        <property name="connectionTracker">
+            <bean class="org.jencks.factory.ConnectionTrackerFactoryBean">
+                <property name="geronimoTransactionManager" ref="transactionManager"/>
+            </bean>
+        </property>
+    </bean>
+
+    <bean id="jmsManagedConnectionFactory" class="org.apache.activemq.ra.ActiveMQManagedConnectionFactory">
+        <property name="resourceAdapter">
+            <bean class="org.apache.activemq.ra.ActiveMQResourceAdapter">
+                <property name="serverUrl" value="tcp://localhost:61616"/>
+                <property name="maximumRedeliveries" value="6"/>
+                <property name="allPrefetchValues" value="1"/>
+            </bean>
+        </property>
+    </bean>
+
+</beans>    
\ No newline at end of file



Mime
View raw message