activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [08/15] activemq-6 git commit: Refactored the testsuite a bit
Date Fri, 06 Mar 2015 22:30:41 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/xa/XATest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/xa/XATest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/xa/XATest.java
new file mode 100644
index 0000000..4847e34
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/xa/XATest.java
@@ -0,0 +1,2142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.jms.xa;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
+import com.arjuna.ats.arjuna.coordinator.TxControl;
+import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
+
+import org.apache.activemq.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.tests.extras.ExtrasTestLogger;
+import org.apache.activemq.tests.util.JMSTestBase;
+import org.jboss.tm.TxUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ * A XATestBase
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ *
+ */
+public class XATest extends JMSTestBase
+{
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   protected TransactionManager tm;
+
+   protected Transaction suspendedTx;
+
+   protected XAConnectionFactory xacf;
+   
+   protected Queue queue1;
+
+   // Constructors --------------------------------------------------
+
+   // TestCase overrides -------------------------------------------
+
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      super.setUp();
+
+      xacf = ActiveMQJMSClient.createConnectionFactory("tcp://localhost:61616", "test");
+      
+      queue1 = createQueue("queue1");
+      TxControl.enable();
+
+      tm = new TransactionManagerImple();
+
+      Assert.assertTrue(tm instanceof TransactionManagerImple);
+
+      suspendedTx = tm.suspend();
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception
+   {
+      if (TxUtils.isUncommitted(tm))
+      {
+         // roll it back
+         try
+         {
+            tm.rollback();
+         }
+         catch (Throwable ignore)
+         {
+            // The connection will probably be closed so this may well throw an exception
+         }
+      }
+      if (tm.getTransaction() != null)
+      {
+         Transaction tx = tm.suspend();
+         if (tx != null)
+         {
+            ExtrasTestLogger.LOGGER.warn("Transaction still associated with thread " + tx +
+                                            " at status " +
+                                            TxUtils.getStatusAsString(tx.getStatus()));
+         }
+      }
+
+      if (suspendedTx != null)
+      {
+         tm.resume(suspendedTx);
+      }
+
+      TxControl.disable(true);
+
+      TransactionReaper.terminate(false);
+
+      super.tearDown();
+
+   }
+
+   // Public --------------------------------------------------------
+
+   @Test
+   public void test2PCSendCommit1PCOptimization() throws Exception
+   {
+      // Since both resources have same RM, TM will probably use 1PC optimization
+
+      XAConnection conn = null;
+      Connection conn2 = null;
+
+      try
+      {
+         conn = xacf.createXAConnection();
+
+         tm.begin();
+
+         XASession sess = conn.createXASession();
+         XAResource res = sess.getXAResource();
+
+         XAResource res2 = new DummyXAResource();
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res);
+         tx.enlistResource(res2);
+
+         MessageProducer prod = sess.createProducer(queue1);
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         Message m = sess.createTextMessage("XATest1");
+         prod.send(m);
+         m = sess.createTextMessage("XATest2");
+         prod.send(m);
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         tm.commit();
+
+         conn2 = cf.createConnection();
+         conn2.start();
+         Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sessReceiver.createConsumer(queue1);
+         TextMessage m2 = (TextMessage)cons.receive(1000);
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest1", m2.getText());
+         m2 = (TextMessage)cons.receive(1000);
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest2", m2.getText());
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+   @Test
+   public void test2PCSendCommit() throws Exception
+   {
+      XAConnection conn = null;
+      Connection conn2 = null;
+
+      try
+      {
+         conn = xacf.createXAConnection();
+
+         tm.begin();
+
+         XASession sess = conn.createXASession();
+
+         XAResource res = sess.getXAResource();
+         XAResource res2 = new DummyXAResource();
+
+         // To prevent 1PC optimization being used
+         // res.setForceNotSameRM(true);
+
+         Transaction tx = tm.getTransaction();
+
+         tx.enlistResource(res);
+
+         tx.enlistResource(res2);
+
+         MessageProducer prod = sess.createProducer(queue1);
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         Message m = sess.createTextMessage("XATest1");
+         prod.send(m);
+         m = sess.createTextMessage("XATest2");
+         prod.send(m);
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         tm.commit();
+
+         conn2 = cf.createConnection();
+         conn2.start();
+         Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sessReceiver.createConsumer(queue1);
+         TextMessage m2 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest1", m2.getText());
+         m2 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest2", m2.getText());
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+   @Test
+   public void test2PCSendFailOnPrepare() throws Exception
+   {
+      XAConnection conn = null;
+      Connection conn2 = null;
+      try
+      {
+         conn = xacf.createXAConnection();
+
+         tm.begin();
+
+         XASession sess = conn.createXASession();
+         XAResource res = sess.getXAResource();
+
+         // prevent 1Pc optimisation
+         // res.setForceNotSameRM(true);
+
+         XAResource res2 = new DummyXAResource(true);
+         XAResource res3 = new DummyXAResource();
+         XAResource res4 = new DummyXAResource();
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res);
+         tx.enlistResource(res2);
+         tx.enlistResource(res3);
+         tx.enlistResource(res4);
+
+         MessageProducer prod = sess.createProducer(null);
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         Message m = sess.createTextMessage("XATest1");
+         prod.send(queue1, m);
+         m = sess.createTextMessage("XATest2");
+         prod.send(queue1, m);
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+         tx.delistResource(res3, XAResource.TMSUCCESS);
+         tx.delistResource(res4, XAResource.TMSUCCESS);
+
+         try
+         {
+            tm.commit();
+
+            Assert.fail("should not get here");
+         }
+         catch (Exception e)
+         {
+            // We should expect this
+         }
+
+         conn2 = cf.createConnection();
+         conn2.start();
+         Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sessReceiver.createConsumer(queue1);
+         Message m2 = cons.receive(100);
+         Assert.assertNull(m2);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+   @Test
+   public void test2PCSendRollback() throws Exception
+   {
+      XAConnection conn = null;
+      Connection conn2 = null;
+      try
+      {
+         conn = xacf.createXAConnection();
+
+         tm.begin();
+
+         XASession sess = conn.createXASession();
+         XAResource res = sess.getXAResource();
+
+         // prevent 1Pc optimisation
+         // res.setForceNotSameRM(true);
+
+         XAResource res2 = new DummyXAResource();
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res);
+         tx.enlistResource(res2);
+
+         MessageProducer prod = sess.createProducer(null);
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         Message m = sess.createTextMessage("XATest1");
+         prod.send(queue1, m);
+         m = sess.createTextMessage("XATest2");
+         prod.send(queue1, m);
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         tm.rollback();
+
+         conn2 = cf.createConnection();
+         conn2.start();
+         Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sessReceiver.createConsumer(queue1);
+         Message m2 = cons.receive(100);
+         Assert.assertNull(m2);
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+   @Test
+   public void test2PCReceiveCommit1PCOptimization() throws Exception
+   {
+      // Since both resources have some RM, TM will probably use 1PC optimization
+
+      XAConnection conn = null;
+      Connection conn2 = null;
+
+      try
+      {
+         conn2 = cf.createConnection();
+         conn2.start();
+         Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sessProducer.createProducer(queue1);
+         Message m = sessProducer.createTextMessage("XATest1");
+         prod.send(m);
+         m = sessProducer.createTextMessage("XATest2");
+         prod.send(m);
+
+         conn = xacf.createXAConnection();
+         conn.start();
+
+         tm.begin();
+
+         XASession sess = conn.createXASession();
+         XAResource res = sess.getXAResource();
+
+         XAResource res2 = new DummyXAResource();
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res);
+         tx.enlistResource(res2);
+
+         MessageConsumer cons = sess.createConsumer(queue1);
+
+         TextMessage m2 = (TextMessage)cons.receive(5000);
+
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest1", m2.getText());
+
+         m2 = (TextMessage)cons.receive(5000);
+
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest2", m2.getText());
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         tm.commit();
+
+         // New tx
+         tm.begin();
+         tx = tm.getTransaction();
+         tx.enlistResource(res);
+         tx.enlistResource(res2);
+
+         Message m3 = cons.receive(100);
+
+         Assert.assertNull(m3);
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         tm.commit();
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+
+   }
+
+   @Test
+   public void test2PCReceiveCommit() throws Exception
+   {
+      XAConnection conn = null;
+      Connection conn2 = null;
+
+      try
+      {
+         conn2 = cf.createConnection();
+         conn2.start();
+         Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sessProducer.createProducer(queue1);
+         Message m = sessProducer.createTextMessage("XATest1");
+         prod.send(m);
+         m = sessProducer.createTextMessage("XATest2");
+         prod.send(m);
+
+         conn = xacf.createXAConnection();
+         conn.start();
+
+         tm.begin();
+
+         XASession sess = conn.createXASession();
+         XAResource res = sess.getXAResource();
+         // res.setForceNotSameRM(true);
+
+         XAResource res2 = new DummyXAResource();
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res);
+         tx.enlistResource(res2);
+
+         MessageConsumer cons = sess.createConsumer(queue1);
+
+         TextMessage m2 = (TextMessage)cons.receive(5000);
+
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest1", m2.getText());
+
+         m2 = (TextMessage)cons.receive(5000);
+
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest2", m2.getText());
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         tm.commit();
+
+         // New tx
+         tm.begin();
+         tx = tm.getTransaction();
+         tx.enlistResource(res);
+         tx.enlistResource(res2);
+
+         Message m3 = cons.receive(100);
+
+         Assert.assertNull(m3);
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         tm.commit();
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+
+   }
+
+   @Test
+   public void test2PCReceiveRollback1PCOptimization() throws Exception
+   {
+      // Since both resources have some RM, TM will probably use 1PC optimization
+
+      XAConnection conn = null;
+      Connection conn2 = null;
+
+      try
+      {
+         conn2 = cf.createConnection();
+         Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sessProducer.createProducer(queue1);
+         Message m = sessProducer.createTextMessage("XATest1");
+         prod.send(m);
+
+         m = sessProducer.createTextMessage("XATest2");
+         prod.send(m);
+
+         conn = xacf.createXAConnection();
+         conn.start();
+
+         tm.begin();
+
+         XASession sess = conn.createXASession();
+         XAResource res = sess.getXAResource();
+
+         XAResource res2 = new DummyXAResource();
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res);
+         tx.enlistResource(res2);
+
+         MessageConsumer cons = sess.createConsumer(queue1);
+
+         TextMessage m2 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest1", m2.getText());
+         m2 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest2", m2.getText());
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         tm.rollback();
+
+         // Message should be redelivered
+
+         // New tx
+         tm.begin();
+         tx = tm.getTransaction();
+         tx.enlistResource(res);
+         tx.enlistResource(res2);
+
+         TextMessage m3 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(m3);
+         Assert.assertEquals("XATest1", m3.getText());
+         m3 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(m3);
+         Assert.assertEquals("XATest2", m3.getText());
+
+         Assert.assertTrue(m3.getJMSRedelivered());
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         tm.commit();
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+   @Test
+   public void test2PCReceiveRollback() throws Exception
+   {
+      XAConnection conn = null;
+      Connection conn2 = null;
+
+      try
+      {
+         conn2 = cf.createConnection();
+         Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sessProducer.createProducer(queue1);
+         Message m = sessProducer.createTextMessage("XATest1");
+         prod.send(m);
+
+         m = sessProducer.createTextMessage("XATest2");
+         prod.send(m);
+
+         conn = xacf.createXAConnection();
+         conn.start();
+
+         tm.begin();
+
+         XASession sess = conn.createXASession();
+         XAResource res = sess.getXAResource();
+         // res.setForceNotSameRM(true);
+
+         XAResource res2 = new DummyXAResource();
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res);
+         tx.enlistResource(res2);
+
+         MessageConsumer cons = sess.createConsumer(queue1);
+
+         TextMessage m2 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest1", m2.getText());
+         m2 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest2", m2.getText());
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         tm.rollback();
+
+         // Message should be redelivered
+
+         // New tx
+         tm.begin();
+         tx = tm.getTransaction();
+         tx.enlistResource(res);
+         tx.enlistResource(res2);
+
+         TextMessage m3 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(m3);
+         Assert.assertEquals("XATest1", m3.getText());
+         m3 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(m3);
+         Assert.assertEquals("XATest2", m3.getText());
+
+         Assert.assertTrue(m3.getJMSRedelivered());
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         tm.commit();
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+
+   }
+
+   @Test
+   public void test1PCSendCommit() throws Exception
+   {
+      XAConnection conn = null;
+      Connection conn2 = null;
+
+      try
+      {
+         conn = xacf.createXAConnection();
+
+         tm.begin();
+
+         XASession sess = conn.createXASession();
+         XAResource res = sess.getXAResource();
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res);
+
+         MessageProducer prod = sess.createProducer(null);
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         Message m = sess.createTextMessage("XATest1");
+         prod.send(queue1, m);
+         m = sess.createTextMessage("XATest2");
+         prod.send(queue1, m);
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+
+         tm.commit();
+
+         conn2 = cf.createConnection();
+         conn2.start();
+         Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sessReceiver.createConsumer(queue1);
+         TextMessage m2 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest1", m2.getText());
+         m2 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest2", m2.getText());
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+
+   }
+
+   @Test
+   public void test1PCReceiveCommit() throws Exception
+   {
+      XAConnection conn = null;
+      Connection conn2 = null;
+
+      try
+      {
+         conn2 = cf.createConnection();
+         conn2.start();
+         Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sessProducer.createProducer(queue1);
+         Message m = sessProducer.createTextMessage("XATest1");
+         prod.send(m);
+         m = sessProducer.createTextMessage("XATest2");
+         prod.send(m);
+
+         conn = xacf.createXAConnection();
+         conn.start();
+
+         tm.begin();
+
+         XASession sess = conn.createXASession();
+         XAResource res = sess.getXAResource();
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res);
+
+         MessageConsumer cons = sess.createConsumer(queue1);
+
+         TextMessage m2 = (TextMessage)cons.receive(5000);
+
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest1", m2.getText());
+         m2 = (TextMessage)cons.receive(5000);
+
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest2", m2.getText());
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+
+         tm.commit();
+
+         // New tx
+         tm.begin();
+         tx = tm.getTransaction();
+         tx.enlistResource(res);
+
+         Message m3 = cons.receive(100);
+
+         Assert.assertNull(m3);
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+
+         tm.commit();
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+
+   }
+
+   @Test
+   public void test1PCReceiveRollback() throws Exception
+   {
+      XAConnection conn = null;
+      Connection conn2 = null;
+
+      try
+      {
+         conn2 = cf.createConnection();
+         Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sessProducer.createProducer(queue1);
+         Message m = sessProducer.createTextMessage("XATest1");
+         prod.send(m);
+         m = sessProducer.createTextMessage("XATest2");
+         prod.send(m);
+
+         conn = xacf.createXAConnection();
+         conn.start();
+
+         tm.begin();
+
+         XASession sess = conn.createXASession();
+         XAResource res = sess.getXAResource();
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res);
+
+         MessageConsumer cons = sess.createConsumer(queue1);
+
+         TextMessage m2 = (TextMessage)cons.receive(5000);
+
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest1", m2.getText());
+
+         m2 = (TextMessage)cons.receive(5000);
+
+         Assert.assertNotNull(m2);
+         Assert.assertEquals("XATest2", m2.getText());
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+
+         tm.rollback();
+
+         // Message should be redelivered
+
+         // New tx
+         tm.begin();
+         tx = tm.getTransaction();
+         tx.enlistResource(res);
+
+         TextMessage m3 = (TextMessage)cons.receive(5000);
+
+         Assert.assertNotNull(m3);
+         Assert.assertEquals("XATest1", m3.getText());
+
+         m3 = (TextMessage)cons.receive(5000);
+
+         Assert.assertNotNull(m3);
+         Assert.assertEquals("XATest2", m3.getText());
+
+         Assert.assertTrue(m3.getJMSRedelivered());
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+
+         tm.commit();
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+
+   }
+
+   @Test
+   public void testMultipleSessionsOneTxCommitAcknowledge1PCOptimization() throws Exception
+   {
+      XAConnection conn = null;
+      Connection conn2 = null;
+
+      // Since both resources have some RM, TM will probably use 1PC optimization
+
+      try
+      {
+         // First send 2 messages
+         conn2 = cf.createConnection();
+         Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sessProducer.createProducer(queue1);
+         Message m = sessProducer.createTextMessage("jellyfish1");
+         prod.send(m);
+         m = sessProducer.createTextMessage("jellyfish2");
+         prod.send(m);
+
+         conn = xacf.createXAConnection();
+         conn.start();
+
+         tm.begin();
+
+         // Create 2 sessions and enlist them
+         XASession sess1 = conn.createXASession();
+         XAResource res1 = sess1.getXAResource();
+         XASession sess2 = conn.createXASession();
+         XAResource res2 = sess2.getXAResource();
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res1);
+         tx.enlistResource(res2);
+
+         // Receive the messages, one on each consumer
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         TextMessage r1 = (TextMessage)cons1.receive(5000);
+
+         Assert.assertNotNull(r1);
+         Assert.assertEquals("jellyfish1", r1.getText());
+
+         cons1.close();
+
+         MessageConsumer cons2 = sess2.createConsumer(queue1);
+         TextMessage r2 = (TextMessage)cons2.receive(5000);
+
+         Assert.assertNotNull(r2);
+         Assert.assertEquals("jellyfish2", r2.getText());
+
+         tx.delistResource(res1, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         // commit
+         tm.commit();
+
+         Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sess.createConsumer(queue1);
+         conn2.start();
+
+         TextMessage r3 = (TextMessage)cons.receive(100);
+         Assert.assertNull(r3);
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+
+   }
+
+   @Test
+   public void testMultipleSessionsOneTxCommitAcknowledge() throws Exception
+   {
+      XAConnection conn = null;
+      Connection conn2 = null;
+
+      try
+      {
+         // First send 2 messages
+         conn2 = cf.createConnection();
+         Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sessProducer.createProducer(queue1);
+         Message m = sessProducer.createTextMessage("jellyfish1");
+         prod.send(m);
+         m = sessProducer.createTextMessage("jellyfish2");
+         prod.send(m);
+
+         conn = xacf.createXAConnection();
+         conn.start();
+
+         tm.begin();
+
+         // Create 2 sessions and enlist them
+         XASession sess1 = conn.createXASession();
+         ClientSessionInternal res1 = (ClientSessionInternal)sess1.getXAResource();
+         XASession sess2 = conn.createXASession();
+         ClientSessionInternal res2 = (ClientSessionInternal)sess2.getXAResource();
+         res1.setForceNotSameRM(true);
+         res2.setForceNotSameRM(true);
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res1);
+         tx.enlistResource(res2);
+
+         // Receive the messages, one on each consumer
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         TextMessage r1 = (TextMessage)cons1.receive(5000);
+
+         Assert.assertNotNull(r1);
+         Assert.assertEquals("jellyfish1", r1.getText());
+
+         cons1.close();
+
+         MessageConsumer cons2 = sess2.createConsumer(queue1);
+         TextMessage r2 = (TextMessage)cons2.receive(5000);
+
+         Assert.assertNotNull(r2);
+         Assert.assertEquals("jellyfish2", r2.getText());
+
+         tx.delistResource(res1, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         // commit
+         tm.commit();
+
+         Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sess.createConsumer(queue1);
+         conn2.start();
+
+         TextMessage r3 = (TextMessage)cons.receive(100);
+         Assert.assertNull(r3);
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+
+   }
+
+   @Test
+   public void testMultipleSessionsOneTxRollbackAcknowledge1PCOptimization() throws Exception
+   {
+      XAConnection conn = null;
+      Connection conn2 = null;
+
+      // Since both resources have some RM, TM will probably use 1PC optimization
+
+      try
+      {
+         // First send 2 messages
+         conn2 = cf.createConnection();
+         Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sessProducer.createProducer(queue1);
+         Message m = sessProducer.createTextMessage("jellyfish1");
+         prod.send(m);
+         m = sessProducer.createTextMessage("jellyfish2");
+         prod.send(m);
+         m = sessProducer.createTextMessage("jellyfish3");
+         prod.send(m);
+         m = sessProducer.createTextMessage("jellyfish4");
+         prod.send(m);
+
+         conn = xacf.createXAConnection();
+         conn.start();
+
+         tm.begin();
+
+         // Create 2 sessions and enlist them
+         XASession sess1 = conn.createXASession();
+         ClientSessionInternal res1 = (ClientSessionInternal)sess1.getXAResource();
+         XASession sess2 = conn.createXASession();
+         ClientSessionInternal res2 = (ClientSessionInternal)sess2.getXAResource();
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res1);
+         tx.enlistResource(res2);
+
+         // Receive the messages, two on each consumer
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         TextMessage r1 = (TextMessage)cons1.receive(5000);
+
+         Assert.assertNotNull(r1);
+         Assert.assertEquals("jellyfish1", r1.getText());
+
+         r1 = (TextMessage)cons1.receive(5000);
+
+         Assert.assertNotNull(r1);
+         Assert.assertEquals("jellyfish2", r1.getText());
+
+         cons1.close();
+
+         MessageConsumer cons2 = sess2.createConsumer(queue1);
+         TextMessage r2 = (TextMessage)cons2.receive(5000);
+
+         Assert.assertNotNull(r2);
+         Assert.assertEquals("jellyfish3", r2.getText());
+
+         r2 = (TextMessage)cons2.receive(5000);
+
+         Assert.assertNotNull(r2);
+         Assert.assertEquals("jellyfish4", r2.getText());
+
+         cons2.close();
+
+         // rollback
+
+         tx.delistResource(res1, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         tm.rollback();
+
+         // Rollback causes cancel which is asynch
+         Thread.sleep(1000);
+
+         // We cannot assume anything about the order in which the transaction manager rollsback
+         // the sessions - this is implementation dependent
+
+         Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sess.createConsumer(queue1);
+         conn2.start();
+
+         TextMessage r = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(r);
+
+         boolean session1First = false;
+
+         if (r.getText().equals("jellyfish1"))
+         {
+            session1First = true;
+         }
+         else if (r.getText().equals("jellyfish3"))
+         {
+            session1First = false;
+         }
+         else
+         {
+            Assert.fail("Unexpected message");
+         }
+
+         if (session1First)
+         {
+            r = (TextMessage)cons.receive(5000);
+
+            Assert.assertNotNull(r);
+
+            Assert.assertEquals("jellyfish2", r.getText());
+
+            r = (TextMessage)cons.receive(5000);
+
+            Assert.assertNotNull(r);
+
+            Assert.assertEquals("jellyfish3", r.getText());
+
+            r = (TextMessage)cons.receive(5000);
+
+            Assert.assertNotNull(r);
+
+            Assert.assertEquals("jellyfish4", r.getText());
+
+         }
+         else
+         {
+            r = (TextMessage)cons.receive(5000);
+
+            Assert.assertNotNull(r);
+
+            Assert.assertEquals("jellyfish4", r.getText());
+
+            r = (TextMessage)cons.receive(5000);
+
+            Assert.assertNotNull(r);
+
+            Assert.assertEquals("jellyfish1", r.getText());
+
+            r = (TextMessage)cons.receive(5000);
+
+            Assert.assertNotNull(r);
+
+            Assert.assertEquals("jellyfish2", r.getText());
+         }
+
+         r = (TextMessage)cons.receive(100);
+
+         Assert.assertNull(r);
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+
+   }
+
+   @Test
+   public void testMultipleSessionsOneTxRollbackAcknowledge() throws Exception
+   {
+      XAConnection conn = null;
+      Connection conn2 = null;
+
+      try
+      {
+         // First send 2 messages
+         conn2 = cf.createConnection();
+         Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sessProducer.createProducer(queue1);
+         Message m = sessProducer.createTextMessage("jellyfish1");
+         prod.send(m);
+         m = sessProducer.createTextMessage("jellyfish2");
+         prod.send(m);
+         m = sessProducer.createTextMessage("jellyfish3");
+         prod.send(m);
+         m = sessProducer.createTextMessage("jellyfish4");
+         prod.send(m);
+
+         conn = xacf.createXAConnection();
+         conn.start();
+
+         tm.begin();
+
+         // Create 2 sessions and enlist them
+         XASession sess1 = conn.createXASession();
+         ClientSessionInternal res1 = (ClientSessionInternal)sess1.getXAResource();
+         XASession sess2 = conn.createXASession();
+         ClientSessionInternal res2 = (ClientSessionInternal)sess2.getXAResource();
+         res1.setForceNotSameRM(true);
+         res2.setForceNotSameRM(true);
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res1);
+         tx.enlistResource(res2);
+
+         // Receive the messages, two on each consumer
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         TextMessage r1 = (TextMessage)cons1.receive(5000);
+
+         Assert.assertNotNull(r1);
+         Assert.assertEquals("jellyfish1", r1.getText());
+
+         r1 = (TextMessage)cons1.receive(5000);
+
+         Assert.assertNotNull(r1);
+         Assert.assertEquals("jellyfish2", r1.getText());
+
+         cons1.close();
+
+         // Cancel is asynch
+         Thread.sleep(500);
+
+         MessageConsumer cons2 = sess2.createConsumer(queue1);
+         TextMessage r2 = (TextMessage)cons2.receive(5000);
+
+         Assert.assertNotNull(r2);
+         Assert.assertEquals("jellyfish3", r2.getText());
+
+         r2 = (TextMessage)cons2.receive(5000);
+
+         Assert.assertNotNull(r2);
+         Assert.assertEquals("jellyfish4", r2.getText());
+
+         // rollback
+
+         cons2.close();
+
+         tx.delistResource(res1, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         tm.rollback();
+
+         // Rollback causes cancel which is asynch
+         Thread.sleep(1000);
+
+         // We cannot assume anything about the order in which the transaction manager rollsback
+         // the sessions - this is implementation dependent
+
+         Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sess.createConsumer(queue1);
+         conn2.start();
+
+         TextMessage r = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(r);
+
+         boolean session1First = false;
+
+         if (r.getText().equals("jellyfish1"))
+         {
+            session1First = true;
+         }
+         else if (r.getText().equals("jellyfish3"))
+         {
+            session1First = false;
+         }
+         else
+         {
+            Assert.fail("Unexpected message");
+         }
+
+         if (session1First)
+         {
+            r = (TextMessage)cons.receive(5000);
+
+            Assert.assertNotNull(r);
+
+            Assert.assertEquals("jellyfish2", r.getText());
+
+            r = (TextMessage)cons.receive(5000);
+
+            Assert.assertNotNull(r);
+
+            Assert.assertEquals("jellyfish3", r.getText());
+
+            r = (TextMessage)cons.receive(5000);
+
+            Assert.assertNotNull(r);
+
+            Assert.assertEquals("jellyfish4", r.getText());
+
+         }
+         else
+         {
+            r = (TextMessage)cons.receive(5000);
+
+            Assert.assertNotNull(r);
+
+            Assert.assertEquals("jellyfish4", r.getText());
+
+            r = (TextMessage)cons.receive(5000);
+
+            Assert.assertNotNull(r);
+
+            Assert.assertEquals("jellyfish1", r.getText());
+
+            r = (TextMessage)cons.receive(5000);
+
+            Assert.assertNotNull(r);
+
+            Assert.assertEquals("jellyfish2", r.getText());
+         }
+
+         r = (TextMessage)cons.receive(100);
+
+         Assert.assertNull(r);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+   @Test
+   public void testMultipleSessionsOneTxRollbackAcknowledgeForceFailureInCommit() throws Exception
+   {
+      XAConnection conn = null;
+      Connection conn2 = null;
+
+      try
+      {
+         // First send 4 messages
+         conn2 = cf.createConnection();
+         Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sessProducer.createProducer(queue1);
+
+         Message m = sessProducer.createTextMessage("jellyfish1");
+         prod.send(m);
+         m = sessProducer.createTextMessage("jellyfish2");
+         prod.send(m);
+         m = sessProducer.createTextMessage("jellyfish3");
+         prod.send(m);
+         m = sessProducer.createTextMessage("jellyfish4");
+         prod.send(m);
+
+         conn = xacf.createXAConnection();
+         conn.start();
+
+         tm.begin();
+
+         XASession sess1 = conn.createXASession();
+         XAResource res1 = sess1.getXAResource();
+         DummyXAResource res2 = new DummyXAResource(true);
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res1);
+         tx.enlistResource(res2);
+
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         TextMessage r1 = (TextMessage)cons1.receive(5000);
+
+         Assert.assertNotNull(r1);
+         Assert.assertEquals("jellyfish1", r1.getText());
+
+         r1 = (TextMessage)cons1.receive(5000);
+
+         Assert.assertNotNull(r1);
+         Assert.assertEquals("jellyfish2", r1.getText());
+
+         r1 = (TextMessage)cons1.receive(5000);
+
+         Assert.assertNotNull(r1);
+         Assert.assertEquals("jellyfish3", r1.getText());
+
+         r1 = (TextMessage)cons1.receive(5000);
+
+         Assert.assertNotNull(r1);
+         Assert.assertEquals("jellyfish4", r1.getText());
+
+         r1 = (TextMessage)cons1.receive(100);
+
+         Assert.assertNull(r1);
+
+         cons1.close();
+
+         // try and commit - and we're going to make the dummyxaresource throw an exception on commit,
+         // which should cause rollback to be called on the other resource
+
+         tx.delistResource(res1, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         // rollback will cause an attempt to deliver messages locally to the original consumers.
+         // the original consumer has closed, so it will cancelled to the server
+         // the server cancel is asynch, so we need to sleep for a bit to make sure it completes
+         ExtrasTestLogger.LOGGER.trace("Forcing failure");
+         try
+         {
+            tm.commit();
+            Assert.fail("should not get here");
+         }
+         catch (Exception e)
+         {
+            // We should expect this
+         }
+
+         Thread.sleep(1000);
+
+         Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sess.createConsumer(queue1);
+         conn2.start();
+
+         TextMessage r = (TextMessage)cons.receive(5000);
+
+         Assert.assertNotNull(r);
+
+         Assert.assertEquals("jellyfish1", r.getText());
+
+         r = (TextMessage)cons.receive(5000);
+
+         Assert.assertNotNull(r);
+
+         Assert.assertEquals("jellyfish2", r.getText());
+
+         r = (TextMessage)cons.receive(5000);
+
+         Assert.assertNotNull(r);
+
+         Assert.assertEquals("jellyfish3", r.getText());
+
+         r = (TextMessage)cons.receive(5000);
+
+         Assert.assertNotNull(r);
+
+         Assert.assertEquals("jellyfish4", r.getText());
+
+         r = (TextMessage)cons.receive(100);
+
+         Assert.assertNull(r);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+
+   }
+
+   @Test
+   public void testMultipleSessionsOneTxCommitSend1PCOptimization() throws Exception
+   {
+      // Since both resources have some RM, TM will probably use 1PC optimization
+
+      XAConnection conn = null;
+
+      Connection conn2 = null;
+
+      try
+      {
+         conn = xacf.createXAConnection();
+         conn.start();
+
+         tm.begin();
+
+         // Create 2 sessions and enlist them
+         XASession sess1 = conn.createXASession();
+         XAResource res1 = sess1.getXAResource();
+         XASession sess2 = conn.createXASession();
+         XAResource res2 = sess2.getXAResource();
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res1);
+         tx.enlistResource(res2);
+
+         // Send 2 messages - one from each session
+
+         MessageProducer prod1 = sess1.createProducer(queue1);
+         MessageProducer prod2 = sess2.createProducer(queue1);
+
+         prod1.send(sess1.createTextMessage("echidna1"));
+         prod2.send(sess2.createTextMessage("echidna2"));
+
+         tx.delistResource(res1, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         // commit
+         tm.commit();
+
+         // Messages should be in queue
+
+         conn2 = cf.createConnection();
+         Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sess.createConsumer(queue1);
+         conn2.start();
+
+         TextMessage r1 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(r1);
+         Assert.assertEquals("echidna1", r1.getText());
+
+         TextMessage r2 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(r2);
+         Assert.assertEquals("echidna2", r2.getText());
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+   @Test
+   public void testMultipleSessionsOneTxCommitSend() throws Exception
+   {
+      // Since both resources have some RM, TM will probably use 1PC optimization
+
+      XAConnection conn = null;
+
+      Connection conn2 = null;
+
+      try
+      {
+
+         conn = xacf.createXAConnection();
+         conn.start();
+
+         tm.begin();
+
+         // Create 2 sessions and enlist them
+         XASession sess1 = conn.createXASession();
+         ClientSessionInternal res1 = (ClientSessionInternal)sess1.getXAResource();
+         XASession sess2 = conn.createXASession();
+         ClientSessionInternal res2 = (ClientSessionInternal)sess2.getXAResource();
+         res1.setForceNotSameRM(true);
+         res2.setForceNotSameRM(true);
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res1);
+         tx.enlistResource(res2);
+
+         // Send 2 messages - one from each session
+
+         MessageProducer prod1 = sess1.createProducer(queue1);
+         MessageProducer prod2 = sess2.createProducer(queue1);
+
+         prod1.send(sess1.createTextMessage("echidna1"));
+         prod2.send(sess2.createTextMessage("echidna2"));
+
+         tx.delistResource(res1, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         // commit
+         tm.commit();
+
+         // Messages should be in queue
+
+         conn2 = cf.createConnection();
+         Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sess.createConsumer(queue1);
+         conn2.start();
+
+         TextMessage r1 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(r1);
+         Assert.assertEquals("echidna1", r1.getText());
+
+         TextMessage r2 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(r2);
+         Assert.assertEquals("echidna2", r2.getText());
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+
+      }
+
+   }
+
+   @Test
+   public void testMultipleSessionsOneTxRollbackSend1PCOptimization() throws Exception
+   {
+      // Since both resources have some RM, TM will probably use 1PC optimization
+
+      XAConnection conn = null;
+
+      Connection conn2 = null;
+
+      try
+      {
+         conn = xacf.createXAConnection();
+         conn.start();
+
+         tm.begin();
+
+         // Create 2 sessions and enlist them
+         XASession sess1 = conn.createXASession();
+         XAResource res1 = sess1.getXAResource();
+         XASession sess2 = conn.createXASession();
+         XAResource res2 = sess2.getXAResource();
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res1);
+         tx.enlistResource(res2);
+
+         // Send 2 messages - one from each session
+
+         MessageProducer prod1 = sess1.createProducer(queue1);
+         MessageProducer prod2 = sess2.createProducer(queue1);
+
+         prod1.send(sess1.createTextMessage("echidna1"));
+         prod2.send(sess2.createTextMessage("echidna2"));
+
+         tx.delistResource(res1, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         // rollback
+         tm.rollback();
+
+         // Messages should not be in queue
+
+         conn2 = cf.createConnection();
+         Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sess.createConsumer(queue1);
+         conn2.start();
+
+         TextMessage r1 = (TextMessage)cons.receive(100);
+         Assert.assertNull(r1);
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+   @Test
+   public void testMultipleSessionsOneTxRollbackSend() throws Exception
+   {
+      XAConnection conn = null;
+
+      Connection conn2 = null;
+
+      try
+      {
+
+         conn = xacf.createXAConnection();
+         conn.start();
+
+         tm.begin();
+
+         // Create 2 sessions and enlist them
+         XASession sess1 = conn.createXASession();
+         ClientSessionInternal res1 = (ClientSessionInternal)sess1.getXAResource();
+         XASession sess2 = conn.createXASession();
+         ClientSessionInternal res2 = (ClientSessionInternal)sess2.getXAResource();
+         res1.setForceNotSameRM(true);
+         res2.setForceNotSameRM(true);
+
+         Transaction tx = tm.getTransaction();
+         tx.enlistResource(res1);
+         tx.enlistResource(res2);
+
+         // Send 2 messages - one from each session
+
+         MessageProducer prod1 = sess1.createProducer(queue1);
+         MessageProducer prod2 = sess2.createProducer(queue1);
+
+         prod1.send(sess1.createTextMessage("echidna1"));
+         prod2.send(sess2.createTextMessage("echidna2"));
+
+         tx.delistResource(res1, XAResource.TMSUCCESS);
+         tx.delistResource(res2, XAResource.TMSUCCESS);
+
+         // rollback
+         tm.rollback();
+
+         // Messages should not be in queue
+
+         conn2 = cf.createConnection();
+         Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sess.createConsumer(queue1);
+         conn2.start();
+
+         TextMessage r1 = (TextMessage)cons.receive(100);
+         Assert.assertNull(r1);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+   @Test
+   public void testOneSessionTwoTransactionsCommitAcknowledge() throws Exception
+   {
+      XAConnection conn = null;
+
+      Connection conn2 = null;
+
+      try
+      {
+         // First send 2 messages
+         conn2 = cf.createConnection();
+         Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sessProducer.createProducer(queue1);
+         Message m = sessProducer.createTextMessage("jellyfish1");
+         prod.send(m);
+         m = sessProducer.createTextMessage("jellyfish2");
+         prod.send(m);
+
+         conn = xacf.createXAConnection();
+
+         // Create a session
+         XASession sess1 = conn.createXASession();
+         XAResource res1 = sess1.getXAResource();
+
+         conn.start();
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+         tm.begin();
+
+         Transaction tx1 = tm.getTransaction();
+         tx1.enlistResource(res1);
+
+         // Receive one message in one tx
+
+         TextMessage r1 = (TextMessage)cons1.receive(5000);
+         Assert.assertNotNull(r1);
+         Assert.assertEquals("jellyfish1", r1.getText());
+
+         tx1.delistResource(res1, XAResource.TMSUCCESS);
+
+         // suspend the tx
+         Transaction suspended = tm.suspend();
+
+         tm.begin();
+
+         Transaction tx2 = tm.getTransaction();
+         tx2.enlistResource(res1);
+
+         // Receive 2nd message in a different tx
+         TextMessage r2 = (TextMessage)cons1.receive(5000);
+         Assert.assertNotNull(r2);
+         Assert.assertEquals("jellyfish2", r2.getText());
+
+         tx2.delistResource(res1, XAResource.TMSUCCESS);
+
+         // commit this transaction
+         tm.commit();
+
+         // verify that no messages are available
+         conn2.close();
+         conn2 = cf.createConnection();
+         Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         conn2.start();
+         MessageConsumer cons = sess.createConsumer(queue1);
+         TextMessage r3 = (TextMessage)cons.receive(100);
+         Assert.assertNull(r3);
+
+         // now resume the first tx and then commit it
+         tm.resume(suspended);
+
+         tm.commit();
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+   @Test
+   public void testOneSessionTwoTransactionsRollbackAcknowledge() throws Exception
+   {
+      XAConnection conn = null;
+
+      Connection conn2 = null;
+
+      try
+      {
+         // First send 2 messages
+         conn2 = cf.createConnection();
+         Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sessProducer.createProducer(queue1);
+         Message m = sessProducer.createTextMessage("jellyfish1");
+         prod.send(m);
+         m = sessProducer.createTextMessage("jellyfish2");
+         prod.send(m);
+
+         conn = xacf.createXAConnection();
+
+         // Create a session
+         XASession sess1 = conn.createXASession();
+         XAResource res1 = sess1.getXAResource();
+
+         conn.start();
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+         tm.begin();
+
+         Transaction tx1 = tm.getTransaction();
+         tx1.enlistResource(res1);
+
+         // Receive one message in one tx
+
+         TextMessage r1 = (TextMessage)cons1.receive(5000);
+         Assert.assertNotNull(r1);
+         Assert.assertEquals("jellyfish1", r1.getText());
+
+         tx1.delistResource(res1, XAResource.TMSUCCESS);
+
+         // suspend the tx
+         Transaction suspended = tm.suspend();
+
+         tm.begin();
+
+         Transaction tx2 = tm.getTransaction();
+         tx2.enlistResource(res1);
+
+         // Receive 2nd message in a different tx
+         TextMessage r2 = (TextMessage)cons1.receive(5000);
+         Assert.assertNotNull(r2);
+         Assert.assertEquals("jellyfish2", r2.getText());
+
+         cons1.close();
+
+         tx2.delistResource(res1, XAResource.TMSUCCESS);
+
+         // rollback this transaction
+         tm.rollback();
+
+         // verify that second message is available
+         conn2.close();
+         conn2 = cf.createConnection();
+         Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         conn2.start();
+         MessageConsumer cons = sess.createConsumer(queue1);
+
+         TextMessage r3 = (TextMessage)cons.receive(5000);
+
+         Assert.assertNotNull(r3);
+         Assert.assertEquals("jellyfish2", r3.getText());
+         r3 = (TextMessage)cons.receive(100);
+         Assert.assertNull(r3);
+
+         // rollback the other tx
+         tm.resume(suspended);
+         tm.rollback();
+
+         // Verify the first message is now available
+         r3 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(r3);
+         Assert.assertEquals("jellyfish1", r3.getText());
+         r3 = (TextMessage)cons.receive(100);
+         Assert.assertNull(r3);
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+
+      }
+
+   }
+
+   @Test
+   public void testOneSessionTwoTransactionsCommitSend() throws Exception
+   {
+      XAConnection conn = null;
+
+      Connection conn2 = null;
+
+      try
+      {
+         conn = xacf.createXAConnection();
+
+         // Create a session
+         XASession sess1 = conn.createXASession();
+         XAResource res1 = sess1.getXAResource();
+
+         MessageProducer prod1 = sess1.createProducer(queue1);
+
+         tm.begin();
+
+         Transaction tx1 = tm.getTransaction();
+         tx1.enlistResource(res1);
+
+         // Send a message
+         prod1.send(sess1.createTextMessage("kangaroo1"));
+
+         tx1.delistResource(res1, XAResource.TMSUCCESS);
+
+         // suspend the tx
+         Transaction suspended = tm.suspend();
+
+         tm.begin();
+
+         // Send another message in another tx using the same session
+         Transaction tx2 = tm.getTransaction();
+         tx2.enlistResource(res1);
+
+         // Send a message
+         prod1.send(sess1.createTextMessage("kangaroo2"));
+
+         tx2.delistResource(res1, XAResource.TMSUCCESS);
+
+         // commit this transaction
+         tm.commit();
+
+         // verify only kangaroo2 message is sent
+         conn2 = cf.createConnection();
+         Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         conn2.start();
+         MessageConsumer cons = sess.createConsumer(queue1);
+         TextMessage r1 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(r1);
+         Assert.assertEquals("kangaroo2", r1.getText());
+         TextMessage r2 = (TextMessage)cons.receive(100);
+         Assert.assertNull(r2);
+
+         // now resume the first tx and then commit it
+         tm.resume(suspended);
+
+         tm.commit();
+
+         // verify that the first text message is received
+         TextMessage r3 = (TextMessage)cons.receive(5000);
+         Assert.assertNotNull(r3);
+         Assert.assertEquals("kangaroo1", r3.getText());
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+
+      }
+
+   }
+
+   @Test
+   public void testIsSamRM() throws Exception
+   {
+      XAConnection conn = null;
+
+      conn = xacf.createXAConnection();
+
+      // Create a session
+      XASession sess1 = conn.createXASession();
+      XAResource res1 = sess1.getXAResource();
+
+      // Create a session
+      XASession sess2 = conn.createXASession();
+      XAResource res2 = sess2.getXAResource();
+
+      Assert.assertTrue(res1.isSameRM(res2));
+   }
+
+   static class DummyXAResource implements XAResource
+   {
+      boolean failOnPrepare;
+
+      DummyXAResource()
+      {
+      }
+
+      DummyXAResource(final boolean failOnPrepare)
+      {
+         this.failOnPrepare = failOnPrepare;
+      }
+
+      public void commit(final Xid arg0, final boolean arg1) throws XAException
+      {
+      }
+
+      public void end(final Xid arg0, final int arg1) throws XAException
+      {
+      }
+
+      public void forget(final Xid arg0) throws XAException
+      {
+      }
+
+      public int getTransactionTimeout() throws XAException
+      {
+         return 0;
+      }
+
+      public boolean isSameRM(final XAResource arg0) throws XAException
+      {
+         return false;
+      }
+
+      public int prepare(final Xid arg0) throws XAException
+      {
+         if (failOnPrepare)
+         {
+            throw new XAException(XAException.XAER_RMFAIL);
+         }
+         return XAResource.XA_OK;
+      }
+
+      public Xid[] recover(final int arg0) throws XAException
+      {
+         return null;
+      }
+
+      public void rollback(final Xid arg0) throws XAException
+      {
+      }
+
+      public boolean setTransactionTimeout(final int arg0) throws XAException
+      {
+         return false;
+      }
+
+      public void start(final Xid arg0, final int arg1) throws XAException
+      {
+
+      }
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 6eb00bf..3c32bfe 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -181,18 +181,6 @@
          <version>5.0.0.GA</version>
       </dependency>
 
-      <!-- Needed for JMS Bridge Tests -->
-      <dependency>
-         <groupId>org.jboss.jbossts.jts</groupId>
-         <artifactId>jbossjts-jacorb</artifactId>
-         <version>4.17.13.Final</version>
-      </dependency>
-      <dependency>
-         <groupId>org.jboss</groupId>
-         <artifactId>jboss-transaction-spi</artifactId>
-         <version>7.1.0.Final</version>
-      </dependency>
-
       <!--Vertx provided dependencies-->
       <dependency>
          <groupId>io.vertx</groupId>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/BridgeTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/BridgeTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/BridgeTestBase.java
deleted file mode 100644
index d5dfa9c..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/bridge/BridgeTestBase.java
+++ /dev/null
@@ -1,613 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.tests.integration.jms.bridge;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.XAConnectionFactory;
-import javax.transaction.TransactionManager;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
-
-import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
-import com.arjuna.ats.arjuna.coordinator.TxControl;
-import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.api.core.management.ResourceNames;
-import org.apache.activemq.api.jms.ActiveMQJMSClient;
-import org.apache.activemq.api.jms.JMSFactoryType;
-import org.apache.activemq.api.jms.management.JMSQueueControl;
-import org.apache.activemq.api.jms.management.TopicControl;
-import org.apache.activemq.core.config.Configuration;
-import org.apache.activemq.core.registry.JndiBindingRegistry;
-import org.apache.activemq.core.remoting.impl.invm.TransportConstants;
-import org.apache.activemq.core.server.ActiveMQServer;
-import org.apache.activemq.core.server.ActiveMQServers;
-import org.apache.activemq.core.server.management.ManagementService;
-import org.apache.activemq.jms.bridge.ConnectionFactoryFactory;
-import org.apache.activemq.jms.bridge.DestinationFactory;
-import org.apache.activemq.jms.bridge.QualityOfServiceMode;
-import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.jms.client.ActiveMQJMSConnectionFactory;
-import org.apache.activemq.jms.client.ActiveMQMessage;
-import org.apache.activemq.jms.client.ActiveMQXAConnectionFactory;
-import org.apache.activemq.jms.server.JMSServerManager;
-import org.apache.activemq.jms.server.impl.JMSServerManagerImpl;
-import org.apache.activemq.tests.integration.IntegrationTestLogger;
-import org.apache.activemq.tests.unit.util.InVMNamingContext;
-import org.apache.activemq.tests.util.UnitTestCase;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-
-/**
- * A BridgeTestBase
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- */
-public abstract class BridgeTestBase extends UnitTestCase
-{
-   private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
-
-   protected ConnectionFactoryFactory cff0, cff1;
-
-   protected ConnectionFactoryFactory cff0xa, cff1xa;
-
-   protected ConnectionFactory cf0, cf1;
-
-   protected XAConnectionFactory cf0xa, cf1xa;
-
-   protected DestinationFactory sourceQueueFactory;
-   protected DestinationFactory targetQueueFactory;
-   protected DestinationFactory localTargetQueueFactory;
-   protected DestinationFactory sourceTopicFactory;
-
-   protected Queue sourceQueue, targetQueue, localTargetQueue;
-
-   protected Topic sourceTopic;
-
-   protected ActiveMQServer server0;
-
-   protected JMSServerManager jmsServer0;
-
-   protected ActiveMQServer server1;
-
-   protected JMSServerManager jmsServer1;
-
-   private InVMNamingContext context0;
-
-   protected InVMNamingContext context1;
-
-   protected HashMap<String, Object> params1;
-
-   protected ConnectionFactoryFactory cff0LowProducerWindow;
-
-   @Override
-   @Before
-   public void setUp() throws Exception
-   {
-      super.setUp();
-
-      // Start the servers
-      Configuration conf0 = createBasicConfig()
-         .setJournalDirectory(getJournalDir(0, false))
-         .setBindingsDirectory(getBindingsDir(0, false))
-         .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY));
-
-      server0 = addServer(ActiveMQServers.newActiveMQServer(conf0, false));
-
-      context0 = new InVMNamingContext();
-      jmsServer0 = new JMSServerManagerImpl(server0);
-      jmsServer0.setRegistry(new JndiBindingRegistry(context0));
-      jmsServer0.start();
-
-      params1 = new HashMap<String, Object>();
-      params1.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
-      Configuration conf1 = createBasicConfig()
-         .setJournalDirectory(getJournalDir(1, false))
-         .setBindingsDirectory(getBindingsDir(1, false))
-         .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params1));
-
-      server1 = addServer(ActiveMQServers.newActiveMQServer(conf1, false));
-
-      context1 = new InVMNamingContext();
-
-      jmsServer1 = new JMSServerManagerImpl(server1);
-      jmsServer1.setRegistry(new JndiBindingRegistry(context1));
-      jmsServer1.start();
-
-      createQueue("sourceQueue", 0);
-
-      jmsServer0.createTopic(false, "sourceTopic", "/topic/sourceTopic");
-
-      createQueue("localTargetQueue", 0);
-
-      createQueue("targetQueue", 1);
-
-      setUpAdministeredObjects();
-      TxControl.enable();
-      // We need a local transaction and recovery manager
-      // We must start this after the remote servers have been created or it won't
-      // have deleted the database and the recovery manager may attempt to recover transactions
-
-   }
-
-   protected void createQueue(final String queueName, final int index) throws Exception
-   {
-      JMSServerManager server = jmsServer0;
-      if (index == 1)
-      {
-         server = jmsServer1;
-      }
-      assertTrue("queue '/queue/" + queueName + "' created",
-                 server.createQueue(false, queueName, null, true, "/queue/" + queueName));
-   }
-
-   @Override
-   @After
-   public void tearDown() throws Exception
-   {
-      checkEmpty(sourceQueue, 0);
-      checkEmpty(localTargetQueue, 0);
-      checkEmpty(targetQueue, 1);
-
-      // Check no subscriptions left lying around
-
-      checkNoSubscriptions(sourceTopic, 0);
-      if (cff0 instanceof ActiveMQConnectionFactory)
-      {
-         ((ActiveMQConnectionFactory) cff0).close();
-      }
-      if (cff1 instanceof ActiveMQConnectionFactory)
-      {
-         ((ActiveMQConnectionFactory) cff1).close();
-      }
-      stopComponent(jmsServer0);
-      stopComponent(jmsServer1);
-      cff0 = cff1 = null;
-      cff0xa = cff1xa = null;
-
-      cf0 = cf1 = null;
-
-      cf0xa = cf1xa = null;
-
-      sourceQueueFactory = targetQueueFactory = localTargetQueueFactory = sourceTopicFactory = null;
-
-      sourceQueue = targetQueue = localTargetQueue = null;
-
-      sourceTopic = null;
-
-      server0 = null;
-
-      jmsServer0 = null;
-
-      server1 = null;
-
-      jmsServer1 = null;
-      if (context0 != null)
-         context0.close();
-      context0 = null;
-      if (context1 != null)
-         context1.close();
-      context1 = null;
-
-      // Shutting down Arjuna threads
-      TxControl.disable(true);
-
-      TransactionReaper.terminate(false);
-      super.tearDown();
-   }
-
-
-   protected void setUpAdministeredObjects() throws Exception
-   {
-      cff0LowProducerWindow = new ConnectionFactoryFactory()
-      {
-         public ConnectionFactory createConnectionFactory() throws Exception
-         {
-            ActiveMQConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
-                                                                                              new TransportConfiguration(
-                                                                                                 INVM_CONNECTOR_FACTORY));
-
-            // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
-            cf.setReconnectAttempts(0);
-            cf.setBlockOnNonDurableSend(true);
-            cf.setBlockOnDurableSend(true);
-            cf.setCacheLargeMessagesClient(true);
-            cf.setProducerWindowSize(100);
-
-            return cf;
-         }
-
-      };
-
-
-      cff0 = new ConnectionFactoryFactory()
-      {
-         public ConnectionFactory createConnectionFactory() throws Exception
-         {
-            ActiveMQConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
-                                                                                              new TransportConfiguration(
-                                                                                                 INVM_CONNECTOR_FACTORY));
-
-            // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
-            cf.setReconnectAttempts(0);
-            cf.setBlockOnNonDurableSend(true);
-            cf.setBlockOnDurableSend(true);
-            cf.setCacheLargeMessagesClient(true);
-
-            return cf;
-         }
-
-      };
-
-      cff0xa = new ConnectionFactoryFactory()
-      {
-         public Object createConnectionFactory() throws Exception
-         {
-            ActiveMQXAConnectionFactory cf = (ActiveMQXAConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF,
-                                                                                                                            new TransportConfiguration(
-                                                                                                                               INVM_CONNECTOR_FACTORY));
-
-            // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
-            cf.setReconnectAttempts(0);
-            cf.setBlockOnNonDurableSend(true);
-            cf.setBlockOnDurableSend(true);
-            cf.setCacheLargeMessagesClient(true);
-
-            return cf;
-         }
-
-      };
-
-      cf0 = (ConnectionFactory) cff0.createConnectionFactory();
-      cf0xa = (XAConnectionFactory) cff0xa.createConnectionFactory();
-
-      cff1 = new ConnectionFactoryFactory()
-      {
-
-         public ConnectionFactory createConnectionFactory() throws Exception
-         {
-            ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
-                                                                                                                              new TransportConfiguration(
-                                                                                                                                 INVM_CONNECTOR_FACTORY,
-                                                                                                                                 params1));
-
-            // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
-            cf.setReconnectAttempts(0);
-            cf.setBlockOnNonDurableSend(true);
-            cf.setBlockOnDurableSend(true);
-            cf.setCacheLargeMessagesClient(true);
-
-            return cf;
-         }
-      };
-
-      cff1xa = new ConnectionFactoryFactory()
-      {
-
-         public XAConnectionFactory createConnectionFactory() throws Exception
-         {
-            ActiveMQXAConnectionFactory cf = (ActiveMQXAConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF,
-                                                                                                                            new TransportConfiguration(
-                                                                                                                               INVM_CONNECTOR_FACTORY,
-                                                                                                                               params1));
-
-            // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
-            cf.setReconnectAttempts(0);
-            cf.setBlockOnNonDurableSend(true);
-            cf.setBlockOnDurableSend(true);
-            cf.setCacheLargeMessagesClient(true);
-
-            return cf;
-         }
-      };
-
-      cf1 = (ConnectionFactory) cff1.createConnectionFactory();
-      cf1xa = (XAConnectionFactory) cff1xa.createConnectionFactory();
-
-      sourceQueueFactory = new DestinationFactory()
-      {
-         public Destination createDestination() throws Exception
-         {
-            return (Destination) context0.lookup("/queue/sourceQueue");
-         }
-      };
-
-      sourceQueue = (Queue) sourceQueueFactory.createDestination();
-
-      targetQueueFactory = new DestinationFactory()
-      {
-         public Destination createDestination() throws Exception
-         {
-            return (Destination) context1.lookup("/queue/targetQueue");
-         }
-      };
-
-      targetQueue = (Queue) targetQueueFactory.createDestination();
-
-      sourceTopicFactory = new DestinationFactory()
-      {
-         public Destination createDestination() throws Exception
-         {
-            return (Destination) context0.lookup("/topic/sourceTopic");
-         }
-      };
-
-      sourceTopic = (Topic) sourceTopicFactory.createDestination();
-
-      localTargetQueueFactory = new DestinationFactory()
-      {
-         public Destination createDestination() throws Exception
-         {
-            return (Destination) context0.lookup("/queue/localTargetQueue");
-         }
-      };
-
-      localTargetQueue = (Queue) localTargetQueueFactory.createDestination();
-   }
-
-   protected void sendMessages(final ConnectionFactory cf,
-                               final Destination dest,
-                               final int start,
-                               final int numMessages,
-                               final boolean persistent,
-                               final boolean largeMessage) throws Exception
-   {
-      Connection conn = null;
-
-      try
-      {
-         conn = cf.createConnection();
-
-         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         MessageProducer prod = sess.createProducer(dest);
-
-         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         for (int i = start; i < start + numMessages; i++)
-         {
-            if (largeMessage)
-            {
-               BytesMessage msg = sess.createBytesMessage();
-               ((ActiveMQMessage) msg).setInputStream(UnitTestCase.createFakeLargeStream(1024L * 1024L));
-               msg.setStringProperty("msg", "message" + i);
-               prod.send(msg);
-            }
-            else
-            {
-               TextMessage tm = sess.createTextMessage("message" + i);
-               prod.send(tm);
-            }
-
-         }
-      }
-      finally
-      {
-         if (conn != null)
-         {
-            conn.close();
-         }
-      }
-   }
-
-   protected void checkMessagesReceived(final ConnectionFactory cf,
-                                        final Destination dest,
-                                        final QualityOfServiceMode qosMode,
-                                        final int numMessages,
-                                        final boolean longWaitForFirst,
-                                        final boolean largeMessage) throws Exception
-   {
-      Connection conn = null;
-
-      try
-      {
-         conn = cf.createConnection();
-
-         conn.start();
-
-         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         MessageConsumer cons = sess.createConsumer(dest);
-
-         // Consume the messages
-
-         Set<String> msgs = new HashSet<String>();
-
-         int count = 0;
-
-         // We always wait longer for the first one - it may take some time to arrive especially if we are
-         // waiting for recovery to kick in
-         while (true)
-         {
-            Message tm = cons.receive(count == 0 ? (longWaitForFirst ? 60000 : 10000) : 5000);
-
-            if (tm == null)
-            {
-               break;
-            }
-
-            // log.info("Got message " + tm.getText());
-
-            if (largeMessage)
-            {
-               BytesMessage bmsg = (BytesMessage) tm;
-               msgs.add(tm.getStringProperty("msg"));
-               byte[] buffRead = new byte[1024];
-               for (int i = 0; i < 1024; i++)
-               {
-                  Assert.assertEquals(1024, bmsg.readBytes(buffRead));
-               }
-            }
-            else
-            {
-               msgs.add(((TextMessage) tm).getText());
-            }
-
-            count++;
-
-         }
-
-         if (qosMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE || qosMode == QualityOfServiceMode.DUPLICATES_OK)
-         {
-            // All the messages should be received
-
-            for (int i = 0; i < numMessages; i++)
-            {
-               Assert.assertTrue("quality=" + qosMode + ", #=" + i + ", message=" + msgs, msgs.contains("message" + i));
-            }
-
-            // Should be no more
-            if (qosMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE)
-            {
-               Assert.assertEquals(numMessages, msgs.size());
-            }
-         }
-         else if (qosMode == QualityOfServiceMode.AT_MOST_ONCE)
-         {
-            // No *guarantee* that any messages will be received
-            // but you still might get some depending on how/where the crash occurred
-         }
-
-         BridgeTestBase.log.trace("Check complete");
-
-      }
-      finally
-      {
-         if (conn != null)
-         {
-            conn.close();
-         }
-      }
-   }
-
-   protected void checkAllMessageReceivedInOrder(final ConnectionFactory cf,
-                                                 final Destination dest,
-                                                 final int start,
-                                                 final int numMessages,
-                                                 final boolean largeMessage) throws Exception
-   {
-      Connection conn = null;
-      try
-      {
-         conn = cf.createConnection();
-
-         conn.start();
-
-         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         MessageConsumer cons = sess.createConsumer(dest);
-
-         // Consume the messages
-
-         for (int i = 0; i < numMessages; i++)
-         {
-            Message tm = cons.receive(30000);
-
-            Assert.assertNotNull(tm);
-
-            if (largeMessage)
-            {
-               BytesMessage bmsg = (BytesMessage) tm;
-               Assert.assertEquals("message" + (i + start), tm.getStringProperty("msg"));
-               byte[] buffRead = new byte[1024];
-               for (int j = 0; j < 1024; j++)
-               {
-                  Assert.assertEquals(1024, bmsg.readBytes(buffRead));
-               }
-            }
-            else
-            {
-               Assert.assertEquals("message" + (i + start), ((TextMessage) tm).getText());
-            }
-         }
-      }
-      finally
-      {
-         if (conn != null)
-         {
-            conn.close();
-         }
-      }
-   }
-
-   public boolean checkEmpty(final Queue queue, final int index) throws Exception
-   {
-      ManagementService managementService = server0.getManagementService();
-      if (index == 1)
-      {
-         managementService = server1.getManagementService();
-      }
-      JMSQueueControl queueControl = (JMSQueueControl) managementService.getResource(ResourceNames.JMS_QUEUE + queue.getQueueName());
-
-      //server may be closed
-      if (queueControl != null)
-      {
-         queueControl.flushExecutor();
-         Long messageCount = queueControl.getMessageCount();
-
-         if (messageCount > 0)
-         {
-            queueControl.removeMessages(null);
-         }
-      }
-      return true;
-   }
-
-   protected void checkNoSubscriptions(final Topic topic, final int index) throws Exception
-   {
-      ManagementService managementService = server0.getManagementService();
-      if (index == 1)
-      {
-         managementService = server1.getManagementService();
-      }
-      TopicControl topicControl = (TopicControl) managementService.getResource(ResourceNames.JMS_TOPIC + topic.getTopicName());
-      Assert.assertEquals(0, topicControl.getSubscriptionCount());
-
-   }
-
-   protected void removeAllMessages(final String queueName, final int index) throws Exception
-   {
-      ManagementService managementService = server0.getManagementService();
-      if (index == 1)
-      {
-         managementService = server1.getManagementService();
-      }
-      JMSQueueControl queueControl = (JMSQueueControl) managementService.getResource(ResourceNames.JMS_QUEUE + queueName);
-      queueControl.removeMessages(null);
-   }
-
-   protected TransactionManager newTransactionManager()
-   {
-      return new TransactionManagerImple();
-   }
-
-   // Inner classes -------------------------------------------------------------------
-}


Mime
View raw message