Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0242110014 for ; Tue, 16 Dec 2014 17:19:05 +0000 (UTC) Received: (qmail 5853 invoked by uid 500); 16 Dec 2014 17:19:04 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 5797 invoked by uid 500); 16 Dec 2014 17:19:04 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 5785 invoked by uid 99); 16 Dec 2014 17:19:04 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Dec 2014 17:19:04 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2560FA2CCCF; Tue, 16 Dec 2014 17:19:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 16 Dec 2014 17:19:04 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/5] activemq-6 git commit: Only Set TM on Bridge in QoS modes that require it Repository: activemq-6 Updated Branches: refs/heads/master 64cc435ea -> b1d6c0b44 Only Set TM on Bridge in QoS modes that require it The bridge currently tries to assign a TM even when the quality of service level is set to Duplicates OK. This QoS does not use or require a TM. This patch stops the bridge from attempting to assign a TM for this QoS and also checks that a TM is set of the other QoS. If TM is not set for a QoS that requires one, a error is logged and RunTime exception thrown. Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/f28c9be8 Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/f28c9be8 Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/f28c9be8 Branch: refs/heads/master Commit: f28c9be8afd2b86288389c51b8e5184fea5d3cb9 Parents: 739c0f3 Author: Martyn Taylor Authored: Mon Dec 15 16:27:27 2014 +0000 Committer: Martyn Taylor Committed: Tue Dec 16 16:33:58 2014 +0000 ---------------------------------------------------------------------- .../jms/bridge/ActiveMQJMSBridgeLogger.java | 4 ++ .../activemq/jms/bridge/impl/JMSBridgeImpl.java | 50 ++++++++------ .../jms/bridge/impl/JMSBridgeImplTest.java | 72 ++++++++++++++++++++ 3 files changed, 107 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f28c9be8/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/ActiveMQJMSBridgeLogger.java ---------------------------------------------------------------------- diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/ActiveMQJMSBridgeLogger.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/ActiveMQJMSBridgeLogger.java index 3eeb9e6..41b6167 100644 --- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/ActiveMQJMSBridgeLogger.java +++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/ActiveMQJMSBridgeLogger.java @@ -101,4 +101,8 @@ public interface ActiveMQJMSBridgeLogger extends BasicLogger @LogMessage(level = Logger.Level.ERROR) @Message(id = 344001, value = "Failed to start source connection" , format = Message.Format.MESSAGE_FORMAT) void jmsBridgeSrcConnectError(@Cause Exception e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 344002, value = "Failed to start JMS Bridge. QoS Mode: {0} requires a Transaction Manager, none found" , format = Message.Format.MESSAGE_FORMAT) + void jmsBridgeTransactionManagerMissing(QualityOfServiceMode qosMode); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f28c9be8/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java ---------------------------------------------------------------------- diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java index cefc928..0301e84 100644 --- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java +++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java @@ -66,7 +66,6 @@ import org.apache.activemq.jms.client.ActiveMQMessage; import org.apache.activemq.jms.server.ActiveMQJMSServerBundle; import org.apache.activemq.service.extensions.ServiceUtils; import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistry; -import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistryImpl; import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig; import org.apache.activemq.utils.ClassloadingUtil; import org.apache.activemq.utils.DefaultSensitiveStringCodec; @@ -402,29 +401,46 @@ public final class JMSBridgeImpl implements JMSBridge checkParams(); - if (tm == null) - { - tm = ServiceUtils.getTransactionManager(); - } - // There may already be a JTA transaction associated to the thread boolean ok; - Transaction toResume = null; - try + // Check to see if the QoSMode requires a TM + if (qualityOfServiceMode.equals(QualityOfServiceMode.AT_MOST_ONCE) || + qualityOfServiceMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE)) { - toResume = tm.suspend(); + if (tm == null) + { + tm = ServiceUtils.getTransactionManager(); + } - ok = setupJMSObjects(); - } - finally - { - if (toResume != null) + if (tm == null) { - tm.resume(toResume); + ActiveMQJMSBridgeLogger.LOGGER.jmsBridgeTransactionManagerMissing(qualityOfServiceMode); + throw new RuntimeException(); + } + + // There may already be a JTA transaction associated to the thread + + Transaction toResume = null; + try + { + toResume = tm.suspend(); + + ok = setupJMSObjects(); + } + finally + { + if (toResume != null) + { + tm.resume(toResume); + } } } + else + { + ok = setupJMSObjects(); + } if (ok) { @@ -2241,10 +2257,6 @@ public final class JMSBridgeImpl implements JMSBridge { registry = sl.iterator().next(); } - else - { - registry = ActiveMQRegistryImpl.getInstance(); - } } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f28c9be8/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java ---------------------------------------------------------------------- diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java index 1ec6dbc..c28251a 100644 --- a/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java +++ b/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java @@ -36,6 +36,7 @@ import javax.transaction.SystemException; import javax.transaction.Transaction; import javax.transaction.TransactionManager; import java.lang.management.ManagementFactory; +import java.lang.reflect.Field; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -65,7 +66,9 @@ import org.apache.activemq.tests.util.UnitTestCase; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; /** * @author Jeff Mesnil @@ -84,6 +87,9 @@ public class JMSBridgeImplTest extends UnitTestCase private JMSServerManager jmsServer; + @Rule + public ExpectedException thrown = ExpectedException.none(); + // Static -------------------------------------------------------- protected static TransactionManager newTransactionManager() @@ -633,6 +639,72 @@ public class JMSBridgeImplTest extends UnitTestCase super.tearDown(); } + @Test + public void testTransactionManagerNotSetForDuplicatesOK() throws Exception + { + + ConnectionFactoryFactory sourceCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory()); + ConnectionFactoryFactory targetCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory()); + DestinationFactory sourceDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.SOURCE)); + DestinationFactory targetDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.TARGET)); + + JMSBridgeImpl bridge = new JMSBridgeImpl(); + Assert.assertNotNull(bridge); + + bridge.setSourceConnectionFactoryFactory(sourceCFF); + bridge.setSourceDestinationFactory(sourceDF); + bridge.setTargetConnectionFactoryFactory(targetCFF); + bridge.setTargetDestinationFactory(targetDF); + bridge.setFailureRetryInterval(10); + bridge.setMaxRetries(1); + bridge.setMaxBatchTime(-1); + bridge.setMaxBatchSize(10); + bridge.setQualityOfServiceMode(QualityOfServiceMode.DUPLICATES_OK); + + Assert.assertFalse(bridge.isStarted()); + bridge.start(); + + Field field = JMSBridgeImpl.class.getDeclaredField("tm"); + field.setAccessible(true); + assertNull(field.get(bridge)); + + bridge.stop(); + Assert.assertFalse(bridge.isStarted()); + } + + @Test + public void testThrowErrorWhenTMNotSetForOnceOnly() throws Exception + { + thrown.expect(RuntimeException.class); + + ConnectionFactoryFactory sourceCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory()); + ConnectionFactoryFactory targetCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory()); + DestinationFactory sourceDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.SOURCE)); + DestinationFactory targetDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.TARGET)); + + JMSBridgeImpl bridge = new JMSBridgeImpl(); + Assert.assertNotNull(bridge); + + bridge.setSourceConnectionFactoryFactory(sourceCFF); + bridge.setSourceDestinationFactory(sourceDF); + bridge.setTargetConnectionFactoryFactory(targetCFF); + bridge.setTargetDestinationFactory(targetDF); + bridge.setFailureRetryInterval(10); + bridge.setMaxRetries(1); + bridge.setMaxBatchTime(-1); + bridge.setMaxBatchSize(10); + bridge.setQualityOfServiceMode(QualityOfServiceMode.ONCE_AND_ONLY_ONCE); + + Assert.assertFalse(bridge.isStarted()); + bridge.start(); + + Field field = JMSBridgeImpl.class.getDeclaredField("tm"); + field.setAccessible(true); + assertNotNull(field.get(bridge)); + + bridge.stop(); + Assert.assertFalse(bridge.isStarted()); + } // Private ------------------------------------------------------- // Inner classes -------------------------------------------------