activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/5] activemq-6 git commit: Only Set TM on Bridge in QoS modes that require it
Date Tue, 16 Dec 2014 17:19:04 GMT
Repository: activemq-6
Updated Branches:
  refs/heads/master 64cc435ea -> b1d6c0b44


Only Set TM on Bridge in QoS modes that require it

The bridge currently tries to assign a TM even when the quality of
service level is set to Duplicates OK.  This QoS does not use or require
a TM.  This patch stops the bridge from attempting to assign a TM for
this QoS and also checks that a TM is set of the other QoS.  If TM is
not set for a QoS that requires one, a error is logged and RunTime
exception thrown.


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/f28c9be8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/f28c9be8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/f28c9be8

Branch: refs/heads/master
Commit: f28c9be8afd2b86288389c51b8e5184fea5d3cb9
Parents: 739c0f3
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Mon Dec 15 16:27:27 2014 +0000
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Tue Dec 16 16:33:58 2014 +0000

----------------------------------------------------------------------
 .../jms/bridge/ActiveMQJMSBridgeLogger.java     |  4 ++
 .../activemq/jms/bridge/impl/JMSBridgeImpl.java | 50 ++++++++------
 .../jms/bridge/impl/JMSBridgeImplTest.java      | 72 ++++++++++++++++++++
 3 files changed, 107 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f28c9be8/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/ActiveMQJMSBridgeLogger.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/ActiveMQJMSBridgeLogger.java
b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/ActiveMQJMSBridgeLogger.java
index 3eeb9e6..41b6167 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/ActiveMQJMSBridgeLogger.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/ActiveMQJMSBridgeLogger.java
@@ -101,4 +101,8 @@ public interface ActiveMQJMSBridgeLogger extends BasicLogger
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 344001, value = "Failed to start source connection" , format = Message.Format.MESSAGE_FORMAT)
    void jmsBridgeSrcConnectError(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 344002, value = "Failed to start JMS Bridge.  QoS Mode: {0} requires a Transaction
Manager, none found" , format = Message.Format.MESSAGE_FORMAT)
+   void jmsBridgeTransactionManagerMissing(QualityOfServiceMode qosMode);
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f28c9be8/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java
b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java
index cefc928..0301e84 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/bridge/impl/JMSBridgeImpl.java
@@ -66,7 +66,6 @@ import org.apache.activemq.jms.client.ActiveMQMessage;
 import org.apache.activemq.jms.server.ActiveMQJMSServerBundle;
 import org.apache.activemq.service.extensions.ServiceUtils;
 import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistry;
-import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistryImpl;
 import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
 import org.apache.activemq.utils.ClassloadingUtil;
 import org.apache.activemq.utils.DefaultSensitiveStringCodec;
@@ -402,29 +401,46 @@ public final class JMSBridgeImpl implements JMSBridge
 
       checkParams();
 
-      if (tm == null)
-      {
-         tm = ServiceUtils.getTransactionManager();
-      }
-
       // There may already be a JTA transaction associated to the thread
 
       boolean ok;
 
-      Transaction toResume = null;
-      try
+      // Check to see if the QoSMode requires a TM
+      if (qualityOfServiceMode.equals(QualityOfServiceMode.AT_MOST_ONCE) ||
+         qualityOfServiceMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
       {
-         toResume = tm.suspend();
+         if (tm == null)
+         {
+            tm = ServiceUtils.getTransactionManager();
+         }
 
-         ok = setupJMSObjects();
-      }
-      finally
-      {
-         if (toResume != null)
+         if (tm == null)
          {
-            tm.resume(toResume);
+            ActiveMQJMSBridgeLogger.LOGGER.jmsBridgeTransactionManagerMissing(qualityOfServiceMode);
+            throw new RuntimeException();
+         }
+
+         // There may already be a JTA transaction associated to the thread
+
+         Transaction toResume = null;
+         try
+         {
+            toResume = tm.suspend();
+
+            ok = setupJMSObjects();
+         }
+         finally
+         {
+            if (toResume != null)
+            {
+               tm.resume(toResume);
+            }
          }
       }
+      else
+      {
+         ok = setupJMSObjects();
+      }
 
       if (ok)
       {
@@ -2241,10 +2257,6 @@ public final class JMSBridgeImpl implements JMSBridge
                {
                   registry = sl.iterator().next();
                }
-               else
-               {
-                  registry = ActiveMQRegistryImpl.getInstance();
-               }
             }
             catch (Throwable e)
             {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f28c9be8/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
----------------------------------------------------------------------
diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
b/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
index 1ec6dbc..c28251a 100644
--- a/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
+++ b/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
@@ -36,6 +36,7 @@ import javax.transaction.SystemException;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 import java.lang.management.ManagementFactory;
+import java.lang.reflect.Field;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -65,7 +66,9 @@ import org.apache.activemq.tests.util.UnitTestCase;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 /**
  * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
@@ -84,6 +87,9 @@ public class JMSBridgeImplTest extends UnitTestCase
 
    private JMSServerManager jmsServer;
 
+   @Rule
+   public ExpectedException thrown = ExpectedException.none();
+
    // Static --------------------------------------------------------
 
    protected static TransactionManager newTransactionManager()
@@ -633,6 +639,72 @@ public class JMSBridgeImplTest extends UnitTestCase
       super.tearDown();
    }
 
+   @Test
+   public void testTransactionManagerNotSetForDuplicatesOK() throws Exception
+   {
+
+      ConnectionFactoryFactory sourceCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory());
+      ConnectionFactoryFactory targetCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory());
+      DestinationFactory sourceDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.SOURCE));
+      DestinationFactory targetDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.TARGET));
+
+      JMSBridgeImpl bridge = new JMSBridgeImpl();
+      Assert.assertNotNull(bridge);
+
+      bridge.setSourceConnectionFactoryFactory(sourceCFF);
+      bridge.setSourceDestinationFactory(sourceDF);
+      bridge.setTargetConnectionFactoryFactory(targetCFF);
+      bridge.setTargetDestinationFactory(targetDF);
+      bridge.setFailureRetryInterval(10);
+      bridge.setMaxRetries(1);
+      bridge.setMaxBatchTime(-1);
+      bridge.setMaxBatchSize(10);
+      bridge.setQualityOfServiceMode(QualityOfServiceMode.DUPLICATES_OK);
+
+      Assert.assertFalse(bridge.isStarted());
+      bridge.start();
+
+      Field field = JMSBridgeImpl.class.getDeclaredField("tm");
+      field.setAccessible(true);
+      assertNull(field.get(bridge));
+
+      bridge.stop();
+      Assert.assertFalse(bridge.isStarted());
+   }
+
+   @Test
+   public void testThrowErrorWhenTMNotSetForOnceOnly() throws Exception
+   {
+      thrown.expect(RuntimeException.class);
+
+      ConnectionFactoryFactory sourceCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory());
+      ConnectionFactoryFactory targetCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory());
+      DestinationFactory sourceDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.SOURCE));
+      DestinationFactory targetDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.TARGET));
+
+      JMSBridgeImpl bridge = new JMSBridgeImpl();
+      Assert.assertNotNull(bridge);
+
+      bridge.setSourceConnectionFactoryFactory(sourceCFF);
+      bridge.setSourceDestinationFactory(sourceDF);
+      bridge.setTargetConnectionFactoryFactory(targetCFF);
+      bridge.setTargetDestinationFactory(targetDF);
+      bridge.setFailureRetryInterval(10);
+      bridge.setMaxRetries(1);
+      bridge.setMaxBatchTime(-1);
+      bridge.setMaxBatchSize(10);
+      bridge.setQualityOfServiceMode(QualityOfServiceMode.ONCE_AND_ONLY_ONCE);
+
+      Assert.assertFalse(bridge.isStarted());
+      bridge.start();
+
+      Field field = JMSBridgeImpl.class.getDeclaredField("tm");
+      field.setAccessible(true);
+      assertNotNull(field.get(bridge));
+
+      bridge.stop();
+      Assert.assertFalse(bridge.isStarted());
+   }
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------


Mime
View raw message