Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 32482 invoked from network); 8 Oct 2010 12:09:00 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 8 Oct 2010 12:09:00 -0000 Received: (qmail 39807 invoked by uid 500); 8 Oct 2010 12:09:00 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 39746 invoked by uid 500); 8 Oct 2010 12:08:58 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 39739 invoked by uid 99); 8 Oct 2010 12:08:58 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Oct 2010 12:08:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Oct 2010 12:08:54 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 881C62388906; Fri, 8 Oct 2010 12:08:32 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1005794 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/activemq/transaction/ activemq-spring/ activemq-spring/src/test/java/org/apache/activemq/spring/ activemq-spring/s... Date: Fri, 08 Oct 2010 12:08:32 -0000 To: commits@activemq.apache.org From: dejanb@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101008120832.881C62388906@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dejanb Date: Fri Oct 8 12:08:32 2010 New Revision: 1005794 URL: http://svn.apache.org/viewvc?rev=1005794&view=rev Log: https://issues.apache.org/activemq/browse/AMQ-2950 - additional fix to support parallel transactions Added: activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java activemq/trunk/activemq-spring/src/test/resources/spring/xa.xml Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java activemq/trunk/activemq-spring/pom.xml Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?rev=1005794&r1=1005793&r2=1005794&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java Fri Oct 8 12:08:32 2010 @@ -58,7 +58,7 @@ public class TransactionBroker extends B // The prepared XA transactions. private TransactionStore transactionStore; - private Map xaTransactions = new LinkedHashMap(); + private Map xaTransactions = new LinkedHashMap(); private ActiveMQMessageAudit audit; public TransactionBroker(Broker next, TransactionStore transactionStore) { @@ -125,7 +125,7 @@ public class TransactionBroker extends B public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { List txs = new ArrayList(); synchronized (xaTransactions) { - for (Iterator iter = xaTransactions.values().iterator(); iter.hasNext();) { + for (Iterator iter = xaTransactions.values().iterator(); iter.hasNext();) { Transaction tx = iter.next(); if (tx.isPrepared()) { if (LOG.isDebugEnabled()) { @@ -146,13 +146,13 @@ public class TransactionBroker extends B public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { // the transaction may have already been started. if (xid.isXATransaction()) { - Transaction transaction = null; + XATransaction transaction = null; synchronized (xaTransactions) { transaction = xaTransactions.get(xid); if (transaction != null) { return; } - transaction = new XATransaction(transactionStore, (XATransactionId)xid, this); + transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId()); xaTransactions.put(xid, transaction); } } else { @@ -252,9 +252,10 @@ public class TransactionBroker extends B iter.remove(); } - for (Transaction tx : xaTransactions.values()) { + + for (XATransaction tx : xaTransactions.values()) { try { - if (!tx.isPrepared()) { + if (tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) { tx.rollback(); } } catch (Exception e) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java?rev=1005794&r1=1005793&r2=1005794&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java Fri Oct 8 12:08:32 2010 @@ -20,6 +20,7 @@ import java.io.IOException; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import org.apache.activemq.broker.TransactionBroker; +import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.store.TransactionStore; @@ -36,11 +37,13 @@ public class XATransaction extends Trans private final TransactionStore transactionStore; private final XATransactionId xid; private final TransactionBroker broker; + private final ConnectionId connectionId; - public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker) { + public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker, ConnectionId connectionId) { this.transactionStore = transactionStore; this.xid = xid; this.broker = broker; + this.connectionId = connectionId; if (LOG.isDebugEnabled()) { LOG.debug("XA Transaction new/begin : " + xid); } @@ -199,6 +202,10 @@ public class XATransaction extends Trans broker.removeTransaction(xid); } + public ConnectionId getConnectionId() { + return connectionId; + } + @Override public TransactionId getTransactionId() { return xid; Modified: activemq/trunk/activemq-spring/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/pom.xml?rev=1005794&r1=1005793&r2=1005794&view=diff ============================================================================== --- activemq/trunk/activemq-spring/pom.xml (original) +++ activemq/trunk/activemq-spring/pom.xml Fri Oct 8 12:08:32 2010 @@ -115,6 +115,29 @@ test + org.jencks + jencks + 2.2 + test + + + org.slf4j + slf4j-api + 1.4.3 + test + + + org.slf4j + slf4j-log4j12 + 1.4.3 + test + + + ${project.groupId} + activemq-ra + test + + org.springframework.osgi spring-osgi-core Added: activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java?rev=1005794&view=auto ============================================================================== --- activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java (added) +++ activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java Fri Oct 8 12:08:32 2010 @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.spring; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.MessageCreator; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.transaction.TransactionConfiguration; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +import javax.annotation.Resource; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import java.util.Arrays; + +@RunWith(SpringJUnit4ClassRunner.class) + +@ContextConfiguration(locations = {"classpath:spring/xa.xml"}) +@TransactionConfiguration(transactionManager = "transactionManager", defaultRollback = false) +public class ParallelXATransactionTest { + + private static final Log LOG = LogFactory.getLog(ParallelXATransactionTest.class); + + @Resource(name = "transactionManager") + PlatformTransactionManager txManager = null; + + @Resource(name = "transactionManager2") + PlatformTransactionManager txManager2 = null; + + + @Resource(name = "jmsTemplate") + JmsTemplate jmsTemplate = null; + + @Resource(name = "jmsTemplate2") + JmsTemplate jmsTemplate2 = null; + + + public static final int NB_MSG = 100; + public static final String BODY = Arrays.toString(new int[1024]); + private static final String[] QUEUES = {"TEST.queue1", "TEST.queue2", "TEST.queue3", "TEST.queue4", "TEST.queue5"}; + private static final String AUDIT = "TEST.audit"; + public static final int SLEEP = 500; + + @Test + @DirtiesContext + public void testParalellXaTx() throws Exception { + + + class ProducerThread extends Thread { + + PlatformTransactionManager txManager; + JmsTemplate jmsTemplate; + Exception lastException; + + + public ProducerThread(JmsTemplate jmsTemplate, PlatformTransactionManager txManager) { + this.jmsTemplate = jmsTemplate; + this.txManager = txManager; + } + + public void run() { + int i = 0; + while (i++ < 10) { + + try { + Thread.sleep((long) (Math.random() * SLEEP)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + TransactionTemplate tt = new TransactionTemplate(this.txManager); + + + try { + tt.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + + for (final String queue : QUEUES) { + jmsTemplate.send(queue + "," + AUDIT, new MessageCreator() { + public Message createMessage(Session session) throws JMSException { + return session.createTextMessage("P1: " + queue + " - " + BODY); + } + }); + Thread.sleep((long) (Math.random() * SLEEP)); + LOG.info("P1: Send msg to " + queue + "," + AUDIT); + } + + } catch (Exception e) { + Assert.fail("Exception occurred " + e); + } + + + } + }); + } catch (TransactionException e) { + lastException = e; + break; + } + + } + } + + public Exception getLastException() { + return lastException; + } + } + + + ProducerThread t1 = new ProducerThread(jmsTemplate, txManager); + ProducerThread t2 = new ProducerThread(jmsTemplate2, txManager2); + + t1.start(); + t2.start(); + + t1.join(); + t2.join(); + + if (t1.getLastException() != null) { + Assert.fail("Exception occurred " + t1.getLastException()); + } + + if (t2.getLastException() != null) { + Assert.fail("Exception occurred " + t2.getLastException()); + } + + } + +} Added: activemq/trunk/activemq-spring/src/test/resources/spring/xa.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/resources/spring/xa.xml?rev=1005794&view=auto ============================================================================== --- activemq/trunk/activemq-spring/src/test/resources/spring/xa.xml (added) +++ activemq/trunk/activemq-spring/src/test/resources/spring/xa.xml Fri Oct 8 12:08:32 2010 @@ -0,0 +1,90 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file