activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [33/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
Date Mon, 04 Apr 2016 16:09:42 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java
new file mode 100644
index 0000000..f47620f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.TestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Enforces a test case to run for only an allotted time to prevent them from
+ * hanging and breaking the whole testing.
+ *
+ *
+ */
+
+public abstract class AutoFailTestSupport extends TestCase {
+    public static final int EXIT_SUCCESS = 0;
+    public static final int EXIT_ERROR = 1;
+    private static final Logger LOG = LoggerFactory.getLogger(AutoFailTestSupport.class);
+
+    private long maxTestTime = 5 * 60 * 1000; // 5 mins by default
+    private Thread autoFailThread;
+
+    private boolean verbose = true;
+    private boolean useAutoFail; // Disable auto fail by default
+    private AtomicBoolean isTestSuccess;
+
+    protected void setUp() throws Exception {
+        // Runs the auto fail thread before performing any setup
+        if (isAutoFail()) {
+            startAutoFailThread();
+        }
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+
+        // Stops the auto fail thread only after performing any clean up
+        stopAutoFailThread();
+    }
+
+    /**
+     * Manually start the auto fail thread. To start it automatically, just set
+     * the auto fail to true before calling any setup methods. As a rule, this
+     * method is used only when you are not sure, if the setUp and tearDown
+     * method is propagated correctly.
+     */
+    public void startAutoFailThread() {
+        setAutoFail(true);
+        isTestSuccess = new AtomicBoolean(false);
+        autoFailThread = new Thread(new Runnable() {
+            public void run() {
+                try {
+                    // Wait for test to finish succesfully
+                    Thread.sleep(getMaxTestTime());
+                } catch (InterruptedException e) {
+                    // This usually means the test was successful
+                } finally {
+                    // Check if the test was able to tear down succesfully,
+                    // which usually means, it has finished its run.
+                    if (!isTestSuccess.get()) {
+                        LOG.error("Test case has exceeded the maximum allotted time to run of: " + getMaxTestTime() + " ms.");
+                        dumpAllThreads(getName());
+                        if (System.getProperty("org.apache.activemq.AutoFailTestSupport.disableSystemExit") == null) {
+                            System.exit(EXIT_ERROR);
+                        } else {
+                            LOG.error("No system.exit as it kills surefire - forkedProcessTimeoutInSeconds (surefire.timeout) will kick in eventually see pom.xml surefire plugin config");
+                        }
+                    }
+                }
+            }
+        }, "AutoFailThread");
+
+        if (verbose) {
+            LOG.info("Starting auto fail thread...");
+        }
+
+        LOG.info("Starting auto fail thread...");
+        autoFailThread.start();
+    }
+
+    /**
+     * Manually stops the auto fail thread. As a rule, this method is used only
+     * when you are not sure, if the setUp and tearDown method is propagated
+     * correctly.
+     */
+    public void stopAutoFailThread() {
+        if (isAutoFail() && autoFailThread != null && autoFailThread.isAlive()) {
+            isTestSuccess.set(true);
+
+            if (verbose) {
+                LOG.info("Stopping auto fail thread...");
+            }
+
+            LOG.info("Stopping auto fail thread...");
+            autoFailThread.interrupt();
+        }
+    }
+
+    /**
+     * Sets the auto fail value. As a rule, this should be used only before any
+     * setup methods is called to automatically enable the auto fail thread in
+     * the setup method of the test case.
+     *
+     * @param val
+     */
+    public void setAutoFail(boolean val) {
+        this.useAutoFail = val;
+    }
+
+    public boolean isAutoFail() {
+        return this.useAutoFail;
+    }
+
+    /**
+     * The assigned value will only be reflected when the auto fail thread has
+     * started its run. Value is in milliseconds.
+     *
+     * @param val
+     */
+    public void setMaxTestTime(long val) {
+        this.maxTestTime = val;
+    }
+
+    public long getMaxTestTime() {
+        return this.maxTestTime;
+    }
+
+
+    public static void dumpAllThreads(String prefix) {
+        Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces();
+        for (Entry<Thread, StackTraceElement[]> stackEntry : stacks.entrySet()) {
+            System.err.println(prefix + " " + stackEntry.getKey());
+            for(StackTraceElement element : stackEntry.getValue()) {
+                System.err.println("     " + element);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java
index d7caafa..dc8f138 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java
@@ -116,7 +116,7 @@ public abstract class CombinationTestSupport extends AutoFailTestSupport {
    public static void checkStopped() throws Exception {
       ArtemisBrokerHelper.stopArtemisBroker();
       boolean notStopped = BrokerService.checkStopped();
-      TcpTransportFactory.setBrokerName(null);
+      TcpTransportFactory.clearService();
       if (notStopped) {
          fail("brokers not stopped see exceptions above");
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
index 5e5b993..b8397e2 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
@@ -52,22 +52,50 @@ public class ConnectionCleanupTest extends TestCase {
 
       try {
          connection.setClientID("test");
-         // fail("Should have received JMSException");
+         fail("Should have received JMSException");
       }
       catch (JMSException e) {
       }
 
-      connection.cleanup();
+      connection.doCleanup(true);
       connection.setClientID("test");
 
       connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
       try {
          connection.setClientID("test");
-         // fail("Should have received JMSException");
+         fail("Should have received JMSException");
       }
       catch (JMSException e) {
       }
    }
 
+   public void testChangeClientIDDenied() throws JMSException {
+
+      connection.setClientID("test");
+      connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      try {
+         connection.setClientID("test");
+         fail("Should have received JMSException");
+      } catch (JMSException e) {
+      }
+
+      connection.cleanup();
+
+      try {
+         connection.setClientID("test");
+         fail("Should have received JMSException");
+      } catch (JMSException e) {
+      }
+
+      connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      try {
+         connection.setClientID("test");
+         fail("Should have received JMSException");
+      } catch (JMSException e) {
+      }
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
index fa58ebe..1e6a227 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
@@ -16,15 +16,23 @@
  */
 package org.apache.activemq;
 
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.rules.TemporaryFolder;
 import org.springframework.jms.core.JmsTemplate;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import java.io.File;
 
 /**
  * A useful base class which creates and closes an embedded broker
@@ -32,17 +40,27 @@ import javax.jms.Destination;
 public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
 
    protected BrokerService broker;
-   // protected String bindAddress = "tcp://localhost:61616";
-   protected String bindAddress = "vm://localhost";
+   protected EmbeddedJMS artemisBroker;
+   protected String bindAddress = "tcp://localhost:61616";
    protected ConnectionFactory connectionFactory;
    protected boolean useTopic;
    protected ActiveMQDestination destination;
    protected JmsTemplate template;
+   protected boolean disableWrapper = false;
+
+   public TemporaryFolder temporaryFolder;
+
+   public String CLUSTER_PASSWORD = "OPENWIRECLUSTER";
 
-   @Override
    protected void setUp() throws Exception {
-      if (broker == null) {
-         broker = createBroker();
+      BrokerService.disableWrapper = disableWrapper;
+      File tmpRoot = new File("./target/tmp");
+      tmpRoot.mkdirs();
+      temporaryFolder = new TemporaryFolder(tmpRoot);
+      temporaryFolder.create();
+
+      if (artemisBroker == null) {
+         artemisBroker = createArtemisBroker();
       }
       startBroker();
 
@@ -58,13 +76,43 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
 
    @Override
    protected void tearDown() throws Exception {
-      if (broker != null) {
+      if (artemisBroker != null) {
          try {
-            broker.stop();
+            artemisBroker.stop();
+            artemisBroker = null;
          }
          catch (Exception e) {
          }
       }
+      temporaryFolder.delete();
+   }
+
+   public String getTmp() {
+      return getTmpFile().getAbsolutePath();
+   }
+
+   public File getTmpFile() {
+      return temporaryFolder.getRoot();
+   }
+
+   protected String getJournalDir(int serverID, boolean backup) {
+      return getTmp() + "/journal_" + serverID + "_" + backup;
+   }
+
+   protected String getBindingsDir(int serverID, boolean backup) {
+      return getTmp() + "/binding_" + serverID + "_" + backup;
+   }
+
+   protected String getPageDir(int serverID, boolean backup) {
+      return getTmp() + "/paging_" + serverID + "_" + backup;
+   }
+
+   protected String getLargeMessagesDir(int serverID, boolean backup) {
+      return getTmp() + "/paging_" + serverID + "_" + backup;
+   }
+
+   protected static String newURI(String localhostAddress, int serverID) {
+      return "tcp://" + localhostAddress + ":" + (61616 + serverID);
    }
 
    /**
@@ -114,20 +162,44 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
       return new ActiveMQConnectionFactory(bindAddress);
    }
 
-   /**
-    * Factory method to create a new broker
-    *
-    * @throws Exception
-    */
+
+   public EmbeddedJMS createArtemisBroker() throws Exception {
+      Configuration config0 = createConfig("localhost", 0);
+      EmbeddedJMS newbroker = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      return newbroker;
+   }
+
+   protected Configuration createConfig(final String hostAddress, final int serverID) throws Exception {
+      ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false).
+              setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO).
+              setJournalDirectory(getJournalDir(serverID, false)).
+              setBindingsDirectory(getBindingsDir(serverID, false)).
+              setPagingDirectory(getPageDir(serverID, false)).
+              setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).
+              setJournalCompactMinFiles(0).
+              setJournalCompactPercentage(0).
+              setClusterPassword(CLUSTER_PASSWORD);
+
+      configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true));
+
+      configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID));
+      configuration.addConnectorConfiguration("netty-connector", newURI(hostAddress, serverID));
+
+      return configuration;
+   }
+
+   //we keep this because some other tests uses it.
+   //we'll delete this when those tests are dealt with.
    protected BrokerService createBroker() throws Exception {
       BrokerService answer = new BrokerService();
       answer.setPersistent(isPersistent());
+      answer.getManagementContext().setCreateConnector(false);
       answer.addConnector(bindAddress);
       return answer;
    }
 
    protected void startBroker() throws Exception {
-      broker.start();
+      artemisBroker.start();
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java
index 06fbbbe..1e0555c 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java
@@ -98,6 +98,7 @@ public class ExclusiveConsumerStartupDestinationTest extends EmbeddedBrokerTestS
       }
    }
 
+   //Exclusive consumer not implemented yet.
    public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException, InterruptedException {
       Connection conn = createConnection(true);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
index 0287a77..5bc647e 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
@@ -26,7 +26,9 @@ import javax.jms.Session;
 
 import junit.framework.TestCase;
 
+import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
 
 public class ExclusiveConsumerTest extends TestCase {
 
@@ -43,6 +45,7 @@ public class ExclusiveConsumerTest extends TestCase {
 
    @Override
    protected void tearDown() throws Exception {
+      TcpTransportFactory.clearService();
       super.tearDown();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
index 6bf47f6..6274890 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
@@ -35,11 +35,18 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
 import javax.management.ObjectName;
 
 import junit.framework.Test;
 
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.jms.management.DestinationControl;
+import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;
+import org.apache.activemq.artemis.api.jms.management.JMSServerControl;
+import org.apache.activemq.artemis.api.jms.management.TopicControl;
+import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.slf4j.Logger;
@@ -855,7 +862,7 @@ public class JMSConsumerTest extends JmsTestSupport {
    }
 
    public void initCombosForTestAckOfExpired() {
-      addCombinationValues("destinationType", new Object[]{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+      addCombinationValues("destinationType", new Object[]{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
    }
 
    public void testAckOfExpired() throws Exception {
@@ -867,6 +874,7 @@ public class JMSConsumerTest extends JmsTestSupport {
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       destination = (ActiveMQDestination) (destinationType == ActiveMQDestination.QUEUE_TYPE ? session.createQueue("test") : session.createTopic("test"));
 
+      createManagedDestinationOnServer(destination);
       MessageConsumer consumer = session.createConsumer(destination);
       connection.setStatsEnabled(true);
 
@@ -900,25 +908,43 @@ public class JMSConsumerTest extends JmsTestSupport {
       }
       assertEquals("consumer has expiredMessages", count, amqConsumer.getConsumerStats().getExpiredMessageCount().getCount());
 
-      DestinationViewMBean view = createView(destination);
+      DestinationControl view = createView(destination);
+
+      assertEquals("Wrong inFlightCount: " + view.getDeliveringCount(), 0, view.getDeliveringCount());
+      assertEquals("Wrong dispatch count: " + view.getMessagesAdded(), 8, view.getMessagesAdded());
+   }
 
-      assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount());
-      assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, view.getDispatchCount());
-      assertEquals("Wrong dequeue count: " + view.getDequeueCount(), 8, view.getDequeueCount());
-      assertEquals("Wrong expired count: " + view.getExpiredCount(), 4, view.getExpiredCount());
+   private void createManagedDestinationOnServer(ActiveMQDestination destination) throws Exception {
+      String destName = destination.getPhysicalName();
+      ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker();
+      MBeanServer beanServer = wrapper.getMbeanServer();
+      ObjectName objName = ObjectNameBuilder.DEFAULT.getJMSServerObjectName();
+      JMSServerControl serverControl = MBeanServerInvocationHandler.newProxyInstance(beanServer, objName, JMSServerControl.class, false);
+      serverControl.createQueue(destName);
    }
 
-   protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
+   protected DestinationControl createView(ActiveMQDestination destination) throws Exception {
 
-      String domain = "org.apache.activemq";
-      ObjectName name;
+      String destName = destination.getPhysicalName();
       if (destination.isQueue()) {
-         name = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test");
+         return createJMSQueueControl(destName);
       }
       else {
-         name = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test");
+         return createJMSTopicControl(destName);
       }
-      return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
    }
 
+   private JMSQueueControl createJMSQueueControl(String destName) throws Exception {
+      ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker();
+      MBeanServer beanServer = wrapper.getMbeanServer();
+      ObjectName objName = ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(destName);
+      return MBeanServerInvocationHandler.newProxyInstance(beanServer, objName, JMSQueueControl.class, false);
+   }
+
+   private TopicControl createJMSTopicControl(String destName) throws Exception {
+      ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker();
+      MBeanServer beanServer = wrapper.getMbeanServer();
+      ObjectName objName = ObjectNameBuilder.DEFAULT.getJMSTopicObjectName(destName);
+      return MBeanServerInvocationHandler.newProxyInstance(beanServer, objName, TopicControl.class, false);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java
index bf1535a..309fec9 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java
@@ -21,7 +21,7 @@ import javax.jms.DeliveryMode;
 import org.apache.activemq.test.JmsTopicSendReceiveTest;
 
 /**
- *
+ * https://issues.apache.org/jira/browse/ARTEMIS-189
  */
 public class JmsDurableQueueWildcardSendReceiveTest extends JmsTopicSendReceiveTest {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
index 12e7827..6a3cd19 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
@@ -263,6 +263,7 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
       consumer.close();
    }
 
+   //ref: https://issues.apache.org/jira/browse/ARTEMIS-384
    public void testBrowseReceive() throws Exception {
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       ActiveMQQueue destination = new ActiveMQQueue("TEST");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
new file mode 100755
index 0000000..b7c2e94
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.ArrayList;
+import java.util.Enumeration;
+
+import org.apache.activemq.test.JmsResourceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(JmsQueueTransactionTest.class);
+
+    /**
+     * @see org.apache.activemq.JmsTransactionTestSupport#getJmsResourceProvider()
+     */
+    protected JmsResourceProvider getJmsResourceProvider() {
+        JmsResourceProvider p = new JmsResourceProvider();
+        p.setTopic(false);
+        return p;
+    }
+
+    /**
+     * Tests if the the connection gets reset, the messages will still be
+     * received.
+     *
+     * @throws Exception
+     */
+    public void testReceiveTwoThenCloseConnection() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // lets consume any outstanding messages from previous test runs
+        beginTx();
+        while (consumer.receive(1000) != null) {
+        }
+        commitTx();
+
+        beginTx();
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        commitTx();
+
+        LOG.info("Sent 0: " + outbound[0]);
+        LOG.info("Sent 1: " + outbound[1]);
+
+        ArrayList<Message> messages = new ArrayList<Message>();
+        beginTx();
+        Message message = consumer.receive(2000);
+        assertEquals(outbound[0], message);
+
+        message = consumer.receive(2000);
+        assertNotNull(message);
+        assertEquals(outbound[1], message);
+
+        // Close and reopen connection.
+        reconnect();
+
+        // Consume again.. the previous message should
+        // get redelivered.
+        beginTx();
+        message = consumer.receive(2000);
+        assertNotNull("Should have re-received the first message again!", message);
+        messages.add(message);
+        assertEquals(outbound[0], message);
+
+        message = consumer.receive(5000);
+        assertNotNull("Should have re-received the second message again!", message);
+        messages.add(message);
+        assertEquals(outbound[1], message);
+        commitTx();
+
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+
+        assertTextMessagesEqual("Rollback did not work", outbound, inbound);
+    }
+
+    /**
+     * Tests sending and receiving messages with two sessions(one for producing
+     * and another for consuming).
+     *
+     * @throws Exception
+     */
+    public void testSendReceiveInSeperateSessionTest() throws Exception {
+        session.close();
+        int batchCount = 10;
+
+        for (int i = 0; i < batchCount; i++) {
+            // Session that sends messages
+            {
+                Session session = resourceProvider.createSession(connection);
+                this.session = session;
+                MessageProducer producer = resourceProvider.createProducer(session, destination);
+                // consumer = resourceProvider.createConsumer(session,
+                // destination);
+                beginTx();
+                producer.send(session.createTextMessage("Test Message: " + i));
+                commitTx();
+                session.close();
+            }
+
+            // Session that consumes messages
+            {
+                Session session = resourceProvider.createSession(connection);
+                this.session = session;
+                MessageConsumer consumer = resourceProvider.createConsumer(session, destination);
+
+                beginTx();
+                TextMessage message = (TextMessage)consumer.receive(1000 * 5);
+                assertNotNull("Received only " + i + " messages in batch ", message);
+                assertEquals("Test Message: " + i, message.getText());
+
+                commitTx();
+                session.close();
+            }
+        }
+    }
+
+    /**
+     * Tests the queue browser. Browses the messages then the consumer tries to
+     * receive them. The messages should still be in the queue even when it was
+     * browsed.
+     *
+     * @throws Exception
+     */
+    public void testReceiveBrowseReceive() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")};
+
+        // lets consume any outstanding messages from previous test runs
+        beginTx();
+        while (consumer.receive(1000) != null) {
+        }
+        commitTx();
+
+        beginTx();
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        producer.send(outbound[2]);
+        commitTx();
+
+        // Get the first.
+        beginTx();
+        assertEquals(outbound[0], consumer.receive(1000));
+        consumer.close();
+        commitTx();
+
+        beginTx();
+        QueueBrowser browser = session.createBrowser((Queue)destination);
+        Enumeration enumeration = browser.getEnumeration();
+
+        // browse the second
+        assertTrue("should have received the second message", enumeration.hasMoreElements());
+        assertEquals(outbound[1], (Message)enumeration.nextElement());
+
+        // browse the third.
+        assertTrue("Should have received the third message", enumeration.hasMoreElements());
+        assertEquals(outbound[2], (Message)enumeration.nextElement());
+
+        LOG.info("Check for more...");
+        // There should be no more.
+        boolean tooMany = false;
+        while (enumeration.hasMoreElements()) {
+            LOG.info("Got extra message: " + ((TextMessage)enumeration.nextElement()).getText());
+            tooMany = true;
+        }
+        assertFalse(tooMany);
+        LOG.info("close browser...");
+        browser.close();
+
+        LOG.info("reopen and consume...");
+        // Re-open the consumer.
+        consumer = resourceProvider.createConsumer(session, destination);
+        // Receive the second.
+        assertEquals(outbound[1], consumer.receive(1000));
+        // Receive the third.
+        assertEquals(outbound[2], consumer.receive(1000));
+        consumer.close();
+
+        commitTx();
+    }
+
+    public void testCloseConsumer() throws Exception {
+        Destination dest = session.createQueue(getSubject() + "?consumer.prefetchSize=0");
+        producer = session.createProducer(dest);
+        beginTx();
+        producer.send(session.createTextMessage("message 1"));
+        producer.send(session.createTextMessage("message 2"));
+        commitTx();
+
+        beginTx();
+        consumer = session.createConsumer(dest);
+        Message message1 = consumer.receive(1000);
+        String text1 = ((TextMessage)message1).getText();
+        assertNotNull(message1);
+        assertEquals("message 1", text1);
+
+        consumer.close();
+
+        consumer = session.createConsumer(dest);
+
+        Message message2 = consumer.receive(1000);
+        String text2 = ((TextMessage)message2).getText();
+        assertNotNull(message2);
+        assertEquals("message 2", text2);
+        commitTx();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java
index 296a56e..9a6dcb1 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java
@@ -29,7 +29,7 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.test.JmsTopicSendReceiveTest;
 
 /**
- *
+ * https://issues.apache.org/jira/browse/ARTEMIS-189
  */
 public class JmsQueueWildcardSendReceiveTest extends JmsTopicSendReceiveTest {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
index 8a64a85..c57845d 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
@@ -116,7 +116,7 @@ public class JmsRollbackRedeliveryTest {
             Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
             Destination destination = session.createQueue(destinationName);
             MessageConsumer consumer = session.createConsumer(destination);
-            TextMessage msg = (TextMessage) consumer.receive(6000000);
+            TextMessage msg = (TextMessage) consumer.receive(5000);
             if (msg != null) {
                if (rolledback.put(msg.getText(), Boolean.TRUE) != null) {
                   LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
index b5dcacc..216ed10 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq;
 
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -109,5 +111,6 @@ public class JmsTopicSendReceiveWithTwoConnectionsTest extends JmsSendReceiveTes
       receiveSession.close();
       sendConnection.close();
       receiveConnection.close();
+      TcpTransportFactory.clearService();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
new file mode 100755
index 0000000..abaf52d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
@@ -0,0 +1,722 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.test.JmsResourceProvider;
+import org.apache.activemq.test.TestSupport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public abstract class JmsTransactionTestSupport extends TestSupport implements MessageListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsTransactionTestSupport.class);
+    private static final int MESSAGE_COUNT = 5;
+    private static final String MESSAGE_TEXT = "message";
+
+    protected ConnectionFactory connectionFactory;
+    protected Connection connection;
+    protected Session session;
+    protected MessageConsumer consumer;
+    protected MessageProducer producer;
+    protected JmsResourceProvider resourceProvider;
+    protected Destination destination;
+    protected int batchCount = 10;
+    protected int batchSize = 20;
+    protected BrokerService broker;
+
+    // for message listener test
+    private final List<Message> unackMessages = new ArrayList<Message>(MESSAGE_COUNT);
+    private final List<Message> ackMessages = new ArrayList<Message>(MESSAGE_COUNT);
+    private boolean resendPhase;
+
+    public JmsTransactionTestSupport() {
+        super();
+    }
+
+    public JmsTransactionTestSupport(String name) {
+        super(name);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see junit.framework.TestCase#setUp()
+     */
+    @Override
+    protected void setUp() throws Exception {
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+
+        resourceProvider = getJmsResourceProvider();
+        topic = resourceProvider.isTopic();
+        // We will be using transacted sessions.
+        setSessionTransacted();
+        connectionFactory = newConnectionFactory();
+        reconnect();
+    }
+
+    protected void setSessionTransacted() {
+        resourceProvider.setTransacted(true);
+    }
+
+    protected ConnectionFactory newConnectionFactory() throws Exception {
+        return resourceProvider.createConnectionFactory();
+    }
+
+    protected void beginTx() throws Exception {
+        //no-op for local tx
+    }
+
+    protected void commitTx() throws Exception {
+        session.commit();
+    }
+
+    protected void rollbackTx() throws Exception {
+        session.rollback();
+    }
+
+    /**
+     */
+    protected BrokerService createBroker() throws Exception, URISyntaxException {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see junit.framework.TestCase#tearDown()
+     */
+    @Override
+    protected void tearDown() throws Exception {
+        LOG.info("Closing down connection");
+
+        try {
+            session.close();
+            session = null;
+            connection.close();
+            connection = null;
+        } catch (Exception e) {
+            LOG.info("Caught exception while closing resources.");
+        }
+
+        try {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        } catch (Exception e) {
+            LOG.info("Caught exception while shutting down the Broker", e);
+        }
+
+        LOG.info("Connection closed.");
+    }
+
+    protected abstract JmsResourceProvider getJmsResourceProvider();
+
+    /**
+     * Sends a batch of messages and validates that the messages are received.
+     *
+     * @throws Exception
+     */
+    public void testSendReceiveTransactedBatches() throws Exception {
+
+        TextMessage message = session.createTextMessage("Batch Message");
+        for (int j = 0; j < batchCount; j++) {
+            LOG.info("Producing bacth " + j + " of " + batchSize + " messages");
+
+            beginTx();
+            for (int i = 0; i < batchSize; i++) {
+                producer.send(message);
+            }
+            messageSent();
+            commitTx();
+            LOG.info("Consuming bacth " + j + " of " + batchSize + " messages");
+
+            beginTx();
+            for (int i = 0; i < batchSize; i++) {
+                message = (TextMessage)consumer.receive(1000 * 5);
+                assertNotNull("Received only " + i + " messages in batch " + j, message);
+                assertEquals("Batch Message", message.getText());
+            }
+
+            commitTx();
+        }
+    }
+
+    protected void messageSent() throws Exception {
+    }
+
+    /**
+     * Sends a batch of messages and validates that the rollbacked message was
+     * not consumed.
+     *
+     * @throws Exception
+     */
+    public void testSendRollback() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[0]);
+        commitTx();
+
+        // sends a message that gets rollbacked
+        beginTx();
+        producer.send(session.createTextMessage("I'm going to get rolled back."));
+        rollbackTx();
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[1]);
+        commitTx();
+
+        // receives the first message
+        beginTx();
+        ArrayList<Message> messages = new ArrayList<Message>();
+        LOG.info("About to consume message 1");
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // receives the second message
+        LOG.info("About to consume message 2");
+        message = consumer.receive(4000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+    }
+
+    /**
+     * spec section 3.6 acking a message with automation acks has no effect.
+     * @throws Exception
+     */
+    public void testAckMessageInTx() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message")};
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[0]);
+        outbound[0].acknowledge();
+        commitTx();
+        outbound[0].acknowledge();
+
+        // receives the first message
+        beginTx();
+        ArrayList<Message> messages = new ArrayList<Message>();
+        LOG.info("About to consume message 1");
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Message not delivered.", outbound, inbound);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the message sent before
+     * session close is not consumed.
+     *
+     * This test only works with local transactions, not xa.
+     * @throws Exception
+     */
+    public void testSendSessionClose() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[0]);
+        commitTx();
+
+        // sends a message that gets rollbacked
+        beginTx();
+        producer.send(session.createTextMessage("I'm going to get rolled back."));
+        consumer.close();
+
+        reconnectSession();
+
+        // sends a message
+        producer.send(outbound[1]);
+        commitTx();
+
+        // receives the first message
+        ArrayList<Message> messages = new ArrayList<Message>();
+        LOG.info("About to consume message 1");
+        beginTx();
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // receives the second message
+        LOG.info("About to consume message 2");
+        message = consumer.receive(4000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the message sent before
+     * session close is not consumed.
+     *
+     * @throws Exception
+     */
+    public void testSendSessionAndConnectionClose() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[0]);
+        commitTx();
+
+        // sends a message that gets rollbacked
+        beginTx();
+        producer.send(session.createTextMessage("I'm going to get rolled back."));
+        consumer.close();
+        session.close();
+
+        reconnect();
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[1]);
+        commitTx();
+
+        // receives the first message
+        ArrayList<Message> messages = new ArrayList<Message>();
+        LOG.info("About to consume message 1");
+        beginTx();
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // receives the second message
+        LOG.info("About to consume message 2");
+        message = consumer.receive(4000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the rollbacked message was
+     * redelivered.
+     *
+     * @throws Exception
+     */
+    public void testReceiveRollback() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // lets consume any outstanding messages from prev test runs
+        beginTx();
+            while (consumer.receive(1000) != null) {
+        }
+        commitTx();
+
+        // sent both messages
+        beginTx();
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        commitTx();
+
+        LOG.info("Sent 0: " + outbound[0]);
+        LOG.info("Sent 1: " + outbound[1]);
+
+        ArrayList<Message> messages = new ArrayList<Message>();
+        beginTx();
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        assertEquals(outbound[0], message);
+        commitTx();
+
+        // rollback so we can get that last message again.
+        beginTx();
+        message = consumer.receive(1000);
+        assertNotNull(message);
+        assertEquals(outbound[1], message);
+        rollbackTx();
+
+        // Consume again.. the prev message should
+        // get redelivered.
+        beginTx();
+        message = consumer.receive(5000);
+        assertNotNull("Should have re-received the message again!", message);
+        messages.add(message);
+        commitTx();
+
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Rollback did not work", outbound, inbound);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the rollbacked message was
+     * redelivered.
+     *
+     * @throws Exception
+     */
+    public void testReceiveTwoThenRollback() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // lets consume any outstanding messages from prev test runs
+        beginTx();
+        while (consumer.receive(1000) != null) {
+        }
+        commitTx();
+
+        //
+        beginTx();
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        commitTx();
+
+        LOG.info("Sent 0: " + outbound[0]);
+        LOG.info("Sent 1: " + outbound[1]);
+
+        ArrayList<Message> messages = new ArrayList<Message>();
+        beginTx();
+        Message message = consumer.receive(1000);
+        assertEquals(outbound[0], message);
+
+        message = consumer.receive(1000);
+        assertNotNull(message);
+        assertEquals(outbound[1], message);
+        rollbackTx();
+
+        // Consume again.. the prev message should
+        // get redelivered.
+        beginTx();
+        message = consumer.receive(5000);
+        assertNotNull("Should have re-received the first message again!", message);
+        messages.add(message);
+        assertEquals(outbound[0], message);
+        message = consumer.receive(5000);
+        assertNotNull("Should have re-received the second message again!", message);
+        messages.add(message);
+        assertEquals(outbound[1], message);
+
+        assertNull(consumer.receiveNoWait());
+        commitTx();
+
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Rollback did not work", outbound, inbound);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the rollbacked message was
+     * not consumed.
+     *
+     * @throws Exception
+     */
+    public void testSendReceiveWithPrefetchOne() throws Exception {
+        setPrefetchToOne();
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message"),
+                                            session.createTextMessage("Fourth Message")};
+
+        beginTx();
+        for (int i = 0; i < outbound.length; i++) {
+            // sends a message
+            producer.send(outbound[i]);
+        }
+        commitTx();
+
+        // receives the first message
+        beginTx();
+        for (int i = 0; i < outbound.length; i++) {
+            LOG.info("About to consume message 1");
+            Message message = consumer.receive(1000);
+            assertNotNull(message);
+            LOG.info("Received: " + message);
+        }
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+    }
+
+    /**
+     * Perform the test that validates if the rollbacked message was redelivered
+     * multiple times.
+     *
+     * @throws Exception
+     */
+    public void testReceiveTwoThenRollbackManyTimes() throws Exception {
+        for (int i = 0; i < 5; i++) {
+            testReceiveTwoThenRollback();
+        }
+    }
+
+    /**
+     * Sends a batch of messages and validates that the rollbacked message was
+     * not consumed. This test differs by setting the message prefetch to one.
+     *
+     * @throws Exception
+     */
+    public void testSendRollbackWithPrefetchOfOne() throws Exception {
+        setPrefetchToOne();
+        testSendRollback();
+    }
+
+    /**
+     * Sends a batch of messages and and validates that the rollbacked message
+     * was redelivered. This test differs by setting the message prefetch to
+     * one.
+     *
+     * @throws Exception
+     */
+    public void testReceiveRollbackWithPrefetchOfOne() throws Exception {
+        setPrefetchToOne();
+        testReceiveRollback();
+    }
+
+    /**
+     * Tests if the messages can still be received if the consumer is closed
+     * (session is not closed).
+     *
+     * @throws Exception see http://jira.codehaus.org/browse/AMQ-143
+     */
+    public void testCloseConsumerBeforeCommit() throws Exception {
+        TextMessage[] outbound = new TextMessage[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // lets consume any outstanding messages from prev test runs
+        beginTx();
+        while (consumer.receiveNoWait() != null) {
+        }
+
+        commitTx();
+
+        // sends the messages
+        beginTx();
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        commitTx();
+        LOG.info("Sent 0: " + outbound[0]);
+        LOG.info("Sent 1: " + outbound[1]);
+
+        beginTx();
+        TextMessage message = (TextMessage)consumer.receive(1000);
+        assertEquals(outbound[0].getText(), message.getText());
+        // Close the consumer before the commit. This should not cause the
+        // received message
+        // to rollback.
+        consumer.close();
+        commitTx();
+
+        // Create a new consumer
+        consumer = resourceProvider.createConsumer(session, destination);
+        LOG.info("Created consumer: " + consumer);
+
+        beginTx();
+        message = (TextMessage)consumer.receive(1000);
+        assertEquals(outbound[1].getText(), message.getText());
+        commitTx();
+    }
+
+    public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {
+        ArrayList<String> list = new ArrayList<String>();
+        list.add("First");
+        Message outbound = session.createObjectMessage(list);
+        outbound.setStringProperty("foo", "abc");
+
+        beginTx();
+        producer.send(outbound);
+        commitTx();
+
+        LOG.info("About to consume message 1");
+        beginTx();
+        Message message = consumer.receive(5000);
+
+        List<String> body = assertReceivedObjectMessageWithListBody(message);
+
+        // now lets try mutate it
+        try {
+            message.setStringProperty("foo", "def");
+            fail("Cannot change properties of the object!");
+        } catch (JMSException e) {
+            LOG.info("Caught expected exception: " + e, e);
+        }
+        body.clear();
+        body.add("This should never be seen!");
+        rollbackTx();
+
+        beginTx();
+        message = consumer.receive(5000);
+        List<String> secondBody = assertReceivedObjectMessageWithListBody(message);
+        assertNotSame("Second call should return a different body", secondBody, body);
+        commitTx();
+    }
+
+    @SuppressWarnings("unchecked")
+    protected List<String> assertReceivedObjectMessageWithListBody(Message message) throws JMSException {
+        assertNotNull("Should have received a message!", message);
+        assertEquals("foo header", "abc", message.getStringProperty("foo"));
+
+        assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage);
+        ObjectMessage objectMessage = (ObjectMessage)message;
+        List<String> body = (List<String>)objectMessage.getObject();
+        LOG.info("Received body: " + body);
+
+        assertEquals("Size of list should be 1", 1, body.size());
+        assertEquals("element 0 of list", "First", body.get(0));
+        return body;
+    }
+
+    /**
+     * Recreates the connection.
+     *
+     * @throws javax.jms.JMSException
+     */
+    protected void reconnect() throws Exception {
+
+        if (connection != null) {
+            // Close the prev connection.
+            connection.close();
+        }
+        session = null;
+        connection = resourceProvider.createConnection(connectionFactory);
+        reconnectSession();
+        connection.start();
+    }
+
+    /**
+     * Recreates the connection.
+     *
+     * @throws javax.jms.JMSException
+     */
+    protected void reconnectSession() throws JMSException {
+        if (session != null) {
+            session.close();
+        }
+
+        session = resourceProvider.createSession(connection);
+        destination = resourceProvider.createDestination(session, getSubject());
+        producer = resourceProvider.createProducer(session, destination);
+        consumer = resourceProvider.createConsumer(session, destination);
+    }
+
+    /**
+     * Sets the prefeftch policy to one.
+     */
+    protected void setPrefetchToOne() {
+        ActiveMQPrefetchPolicy prefetchPolicy = getPrefetchPolicy();
+        prefetchPolicy.setQueuePrefetch(1);
+        prefetchPolicy.setTopicPrefetch(1);
+        prefetchPolicy.setDurableTopicPrefetch(1);
+        prefetchPolicy.setOptimizeDurableTopicPrefetch(1);
+    }
+
+    protected ActiveMQPrefetchPolicy getPrefetchPolicy() {
+        return ((ActiveMQConnection)connection).getPrefetchPolicy();
+    }
+
+    //This test won't work with xa tx so no beginTx() has been added.
+    public void testMessageListener() throws Exception {
+        // send messages
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            producer.send(session.createTextMessage(MESSAGE_TEXT + i));
+        }
+        commitTx();
+        consumer.setMessageListener(this);
+        // wait receive
+        waitReceiveUnack();
+        assertEquals(unackMessages.size(), MESSAGE_COUNT);
+        // resend phase
+        waitReceiveAck();
+        assertEquals(ackMessages.size(), MESSAGE_COUNT);
+        // should no longer re-receive
+        consumer.setMessageListener(null);
+        assertNull(consumer.receive(500));
+        reconnect();
+    }
+
+    @Override
+    public void onMessage(Message message) {
+        if (!resendPhase) {
+            unackMessages.add(message);
+            if (unackMessages.size() == MESSAGE_COUNT) {
+                try {
+                    rollbackTx();
+                    resendPhase = true;
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        } else {
+            ackMessages.add(message);
+            if (ackMessages.size() == MESSAGE_COUNT) {
+                try {
+                    commitTx();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void waitReceiveUnack() throws Exception {
+        for (int i = 0; i < 100 && !resendPhase; i++) {
+            Thread.sleep(100);
+        }
+        assertTrue(resendPhase);
+    }
+
+    private void waitReceiveAck() throws Exception {
+        for (int i = 0; i < 100 && ackMessages.size() < MESSAGE_COUNT; i++) {
+            Thread.sleep(100);
+        }
+        assertFalse(ackMessages.size() < MESSAGE_COUNT);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java
deleted file mode 100644
index 37899e8..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Destination;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author rnewson
- */
-public final class LargeStreamletTest extends TestCase {
-
-   private static final Logger LOG = LoggerFactory.getLogger(LargeStreamletTest.class);
-   private static final String BROKER_URL = "vm://localhost?broker.persistent=false";
-   private static final int BUFFER_SIZE = 1 * 1024;
-   private static final int MESSAGE_COUNT = 10 * 1024;
-
-   protected Exception writerException;
-   protected Exception readerException;
-
-   private final AtomicInteger totalRead = new AtomicInteger();
-   private final AtomicInteger totalWritten = new AtomicInteger();
-   private final AtomicBoolean stopThreads = new AtomicBoolean(false);
-
-   public void testStreamlets() throws Exception {
-      final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
-
-      final ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
-      connection.start();
-      try {
-         final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         try {
-            final Destination destination = session.createQueue("wibble");
-            final Thread readerThread = new Thread(new Runnable() {
-
-               @Override
-               public void run() {
-                  totalRead.set(0);
-                  try {
-                     final InputStream inputStream = connection.createInputStream(destination);
-                     try {
-                        int read;
-                        final byte[] buf = new byte[BUFFER_SIZE];
-                        while (!stopThreads.get() && (read = inputStream.read(buf)) != -1) {
-                           totalRead.addAndGet(read);
-                        }
-                     }
-                     finally {
-                        inputStream.close();
-                     }
-                  }
-                  catch (Exception e) {
-                     readerException = e;
-                     e.printStackTrace();
-                  }
-                  finally {
-                     LOG.info(totalRead + " total bytes read.");
-                  }
-               }
-            });
-
-            final Thread writerThread = new Thread(new Runnable() {
-               private final Random random = new Random();
-
-               @Override
-               public void run() {
-                  totalWritten.set(0);
-                  int count = MESSAGE_COUNT;
-                  try {
-                     final OutputStream outputStream = connection.createOutputStream(destination);
-                     try {
-                        final byte[] buf = new byte[BUFFER_SIZE];
-                        random.nextBytes(buf);
-                        while (count > 0 && !stopThreads.get()) {
-                           outputStream.write(buf);
-                           totalWritten.addAndGet(buf.length);
-                           count--;
-                        }
-                     }
-                     finally {
-                        outputStream.close();
-                     }
-                  }
-                  catch (Exception e) {
-                     writerException = e;
-                     e.printStackTrace();
-                  }
-                  finally {
-                     LOG.info(totalWritten + " total bytes written.");
-                  }
-               }
-            });
-
-            readerThread.start();
-            writerThread.start();
-
-            // Wait till reader is has finished receiving all the messages
-            // or he has stopped
-            // receiving messages.
-            Thread.sleep(1000);
-            int lastRead = totalRead.get();
-            while (readerThread.isAlive()) {
-               readerThread.join(1000);
-               // No progress?? then stop waiting..
-               if (lastRead == totalRead.get()) {
-                  break;
-               }
-               lastRead = totalRead.get();
-            }
-
-            stopThreads.set(true);
-
-            assertTrue("Should not have received a reader exception", readerException == null);
-            assertTrue("Should not have received a writer exception", writerException == null);
-
-            assertEquals("Not all messages accounted for", totalWritten.get(), totalRead.get());
-
-         }
-         finally {
-            session.close();
-         }
-      }
-      finally {
-         connection.close();
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java
index 0358323..4ae2feb 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java
@@ -26,8 +26,11 @@ import javax.jms.Session;
 
 import junit.framework.TestCase;
 
+import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
 
+//https://issues.apache.org/jira/browse/ARTEMIS-196
 public class QueueConsumerPriorityTest extends TestCase {
 
    private static final String VM_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=true";
@@ -43,6 +46,7 @@ public class QueueConsumerPriorityTest extends TestCase {
 
    @Override
    protected void tearDown() throws Exception {
+      TcpTransportFactory.clearService();
       super.tearDown();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java
index c6f60f8..d737f2c 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java
@@ -96,12 +96,13 @@ public class ReconnectWithSameClientIDTest extends EmbeddedBrokerTestSupport {
 
    @Override
    protected ConnectionFactory createConnectionFactory() throws Exception {
-      return new ActiveMQConnectionFactory((useFailover ? "failover:" : "") + broker.getTransportConnectors().get(0).getPublishableConnectString());
+      return new ActiveMQConnectionFactory((useFailover ? "failover:" : "") + newURI("localhost", 0));
    }
 
    @Override
    protected void setUp() throws Exception {
       bindAddress = "tcp://localhost:0";
+      disableWrapper = true;
       super.setUp();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java
index 894abe3..ff1c6c6 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java
@@ -55,6 +55,7 @@ public class RemoveDestinationTest {
 
    @Before
    public void setUp() throws Exception {
+      BrokerService.disableWrapper = true;
       broker = BrokerFactory.createBroker(new URI(BROKER_URL));
       broker.start();
       broker.waitUntilStarted();
@@ -62,6 +63,7 @@ public class RemoveDestinationTest {
 
    @After
    public void tearDown() throws Exception {
+      BrokerService.disableWrapper = false;
       broker.stop();
       broker.waitUntilStopped();
       broker = null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java
index 87c5fc9..d423442 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java
@@ -26,6 +26,7 @@ import javax.jms.Session;
 
 import junit.framework.TestCase;
 
+import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
 import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
@@ -34,6 +35,16 @@ import org.apache.activemq.broker.view.ConnectionDotFilePlugin;
 
 public class TimeStampTest extends TestCase {
 
+   @Override
+   public void setUp() throws Exception {
+      BrokerService.disableWrapper = true;
+   }
+   @Override
+   public void tearDown() {
+      ArtemisBrokerHelper.stopArtemisBroker();
+      BrokerService.disableWrapper = false;
+   }
+
    public void test() throws Exception {
       BrokerService broker = new BrokerService();
       broker.setPersistent(false);
@@ -91,6 +102,5 @@ public class TimeStampTest extends TestCase {
       consumer.close();
       session.close();
       connection.close();
-      broker.stop();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java
index 8d239e7..d2e917d 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java
@@ -23,9 +23,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.TransactionRolledBackException;
 
+import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TransactionContextTest {
@@ -36,13 +41,22 @@ public class TransactionContextTest {
 
    @Before
    public void setup() throws Exception {
-      connection = factory.createActiveMQConnection();
-      underTest = new TransactionContext(connection);
+      try {
+         connection = factory.createActiveMQConnection();
+         underTest = new TransactionContext(connection);
+      }
+      catch (Exception e) {
+         e.printStackTrace();
+         throw e;
+      }
    }
 
    @After
    public void tearDown() throws Exception {
-      connection.close();
+      if (connection != null) {
+         connection.close();
+      }
+      TcpTransportFactory.clearService();
    }
 
    @Test
@@ -104,6 +118,7 @@ public class TransactionContextTest {
 
    @Test
    public void testSyncIndexCleared() throws Exception {
+      System.out.println("================================= test testSyncIndexCleared ===========");
       final AtomicInteger beforeEndCountA = new AtomicInteger(0);
       final AtomicInteger rollbackCountA = new AtomicInteger(0);
       Synchronization sync = new Synchronization() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
index d207da7..a9a564b 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
@@ -26,10 +26,11 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ConsumerControl;
@@ -349,8 +350,10 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
       assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize());
 
       // verify sub view broker
-      Subscription sub = broker.getRegionBroker().getDestinationMap().get(ActiveMQDestination.transform(brokerZeroQueue)).getConsumers().get(0);
-      assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());
+      // I comment out this because it checks broker internal
+      // which doesn't apply to artemis broker.
+      //Subscription sub = broker.getRegionBroker().getDestinationMap().get(ActiveMQDestination.transform(brokerZeroQueue)).getConsumers().get(0);
+      //assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());
 
       // manipulate Prefetch (like failover and stomp)
       ConsumerControl consumerControl = new ConsumerControl();
@@ -361,22 +364,22 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
       Object reply = ((ActiveMQConnection) connection).getTransport().request(consumerControl);
       assertTrue("good request", !(reply instanceof ExceptionResponse));
       assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize());
-      assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());
    }
 
    @Override
-   protected BrokerService createBroker() throws Exception {
-      BrokerService brokerService = super.createBroker();
-      PolicyMap policyMap = new PolicyMap();
-      PolicyEntry zeroPrefetchPolicy = new PolicyEntry();
-      zeroPrefetchPolicy.setQueuePrefetch(0);
-      policyMap.put(ActiveMQDestination.transform(brokerZeroQueue), zeroPrefetchPolicy);
-      brokerService.setDestinationPolicy(policyMap);
-      return brokerService;
+   public EmbeddedJMS createArtemisBroker() throws Exception {
+      Configuration config0 = createConfig("localhost", 0);
+      String coreQueueAddress = "jms.queue." + brokerZeroQueue.getQueueName();
+      AddressSettings addrSettings = new AddressSettings();
+      addrSettings.setQueuePrefetch(0);
+      config0.getAddressesSettings().put(coreQueueAddress, addrSettings);
+      EmbeddedJMS newbroker = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      return newbroker;
    }
 
    @Override
    protected void setUp() throws Exception {
+      disableWrapper = true;
       bindAddress = "tcp://localhost:0";
       super.setUp();
 
@@ -388,11 +391,12 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
    @Override
    protected void startBroker() throws Exception {
       super.startBroker();
-      bindAddress = broker.getTransportConnectors().get(0).getConnectUri().toString();
+      bindAddress = newURI("localhost", 0);
    }
 
    @Override
    protected void tearDown() throws Exception {
+      BrokerService.disableWrapper = false;
       connection.close();
       super.tearDown();
    }


Mime
View raw message