activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [4/7] activemq-artemis git commit: ARTEMIS-1123 Major AMQP Test Suite refactoring
Date Fri, 28 Apr 2017 09:17:01 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java
new file mode 100644
index 0000000..9c7488b
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.Base64;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test that various message types are handled as expected with an AMQP JMS client.
+ */
+public class JMSMessageTypesTest extends JMSClientTestSupport {
+
+   final int NUM_MESSAGES = 10;
+
+   @Test(timeout = 60000)
+   public void testAddressControlSendMessage() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
+
+      AddressControl addressControl = ManagementControlHelper.createAddressControl(address, mBeanServer);
+      Assert.assertEquals(1, addressControl.getQueueNames().length);
+      addressControl.sendMessage(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, fullUser, fullPass);
+
+      Wait.waitFor(() -> addressControl.getMessageCount() == 1);
+
+      Assert.assertEquals(1, addressControl.getMessageCount());
+
+      Connection connection = createConnection("myClientId");
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(address.toString());
+         MessageConsumer consumer = session.createConsumer(queue);
+         Message message = consumer.receive(500);
+         assertNotNull(message);
+         byte[] buffer = new byte[(int)((BytesMessage)message).getBodyLength()];
+         ((BytesMessage)message).readBytes(buffer);
+         assertEquals("test", new String(buffer));
+         session.close();
+         connection.close();
+      } finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testAddressControlSendMessageWithText() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
+
+      AddressControl addressControl = ManagementControlHelper.createAddressControl(address, mBeanServer);
+      Assert.assertEquals(1, addressControl.getQueueNames().length);
+      addressControl.sendMessage(null, org.apache.activemq.artemis.api.core.Message.TEXT_TYPE, "test", false, fullUser, fullPass);
+
+      Wait.waitFor(() -> addressControl.getMessageCount() == 1);
+
+      Assert.assertEquals(1, addressControl.getMessageCount());
+
+      Connection connection = createConnection("myClientId");
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(address.toString());
+         MessageConsumer consumer = session.createConsumer(queue);
+         Message message = consumer.receive(500);
+         assertNotNull(message);
+         String text = ((TextMessage) message).getText();
+         assertEquals("test", text);
+         session.close();
+         connection.close();
+      } finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testBytesMessageSendReceive() throws Throwable {
+      long time = System.currentTimeMillis();
+
+      Connection connection = createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+
+      byte[] bytes = new byte[0xf + 1];
+      for (int i = 0; i <= 0xf; i++) {
+         bytes[i] = (byte) i;
+      }
+
+      MessageProducer producer = session.createProducer(queue);
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         System.out.println("Sending " + i);
+         BytesMessage message = session.createBytesMessage();
+
+         message.writeBytes(bytes);
+         message.setIntProperty("count", i);
+         producer.send(message);
+      }
+
+      Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         BytesMessage m = (BytesMessage) consumer.receive(5000);
+         Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
+
+         m.reset();
+
+         long size = m.getBodyLength();
+         byte[] bytesReceived = new byte[(int) size];
+         m.readBytes(bytesReceived);
+
+         System.out.println("Received " + ByteUtil.bytesToHex(bytesReceived, 1) + " count - " + m.getIntProperty("count"));
+
+         Assert.assertArrayEquals(bytes, bytesReceived);
+      }
+
+      long taken = (System.currentTimeMillis() - time) / 1000;
+      System.out.println("taken = " + taken);
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageSendReceive() throws Throwable {
+      long time = System.currentTimeMillis();
+
+      Connection connection = createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+
+      byte[] bytes = new byte[0xf + 1];
+      for (int i = 0; i <= 0xf; i++) {
+         bytes[i] = (byte) i;
+      }
+
+      MessageProducer producer = session.createProducer(queue);
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         System.out.println("Sending " + i);
+         Message message = session.createMessage();
+
+         message.setIntProperty("count", i);
+         producer.send(message);
+      }
+
+      Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         Message m = consumer.receive(5000);
+         Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
+      }
+
+      long taken = (System.currentTimeMillis() - time) / 1000;
+      System.out.println("taken = " + taken);
+   }
+
+   @Test(timeout = 60000)
+   public void testMapMessageSendReceive() throws Throwable {
+      long time = System.currentTimeMillis();
+
+      Connection connection = createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+
+      MessageProducer producer = session.createProducer(queue);
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         System.out.println("Sending " + i);
+         MapMessage message = session.createMapMessage();
+
+         message.setInt("i", i);
+         message.setIntProperty("count", i);
+         producer.send(message);
+      }
+
+      Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         MapMessage m = (MapMessage) consumer.receive(5000);
+         Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
+
+         Assert.assertEquals(i, m.getInt("i"));
+         Assert.assertEquals(i, m.getIntProperty("count"));
+      }
+
+      long taken = (System.currentTimeMillis() - time) / 1000;
+      System.out.println("taken = " + taken);
+   }
+
+   @Test(timeout = 60000)
+   public void testTextMessageSendReceive() throws Throwable {
+      long time = System.currentTimeMillis();
+
+      Connection connection = createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+
+      MessageProducer producer = session.createProducer(queue);
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         System.out.println("Sending " + i);
+         TextMessage message = session.createTextMessage("text" + i);
+         message.setStringProperty("text", "text" + i);
+         producer.send(message);
+      }
+
+      Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         TextMessage m = (TextMessage) consumer.receive(5000);
+         Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
+         Assert.assertEquals("text" + i, m.getText());
+      }
+
+      long taken = (System.currentTimeMillis() - time) / 1000;
+      System.out.println("taken = " + taken);
+   }
+
+   @Test(timeout = 60000)
+   public void testStreamMessageSendReceive() throws Throwable {
+      Connection connection = createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+
+      MessageProducer producer = session.createProducer(queue);
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         StreamMessage message = session.createStreamMessage();
+         message.writeInt(i);
+         message.writeBoolean(true);
+         message.writeString("test");
+         producer.send(message);
+      }
+
+      Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         StreamMessage m = (StreamMessage) consumer.receive(5000);
+         Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
+
+         Assert.assertEquals(i, m.readInt());
+         Assert.assertEquals(true, m.readBoolean());
+         Assert.assertEquals("test", m.readString());
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testObjectMessageWithArrayListPayload() throws Throwable {
+      ArrayList<String> payload = new ArrayList<>();
+      payload.add("aString");
+
+      Connection connection = createConnection();
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+      MessageProducer producer = session.createProducer(queue);
+      ObjectMessage objectMessage = session.createObjectMessage(payload);
+      producer.send(objectMessage);
+      session.close();
+
+      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer cons = session.createConsumer(queue);
+      connection.start();
+
+      objectMessage = (ObjectMessage) cons.receive(5000);
+      assertNotNull(objectMessage);
+      @SuppressWarnings("unchecked")
+      ArrayList<String> received = (ArrayList<String>) objectMessage.getObject();
+      assertEquals(received.get(0), "aString");
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testObjectMessageUsingCustomType() throws Throwable {
+      long time = System.currentTimeMillis();
+
+      Connection connection = createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+
+      MessageProducer producer = session.createProducer(queue);
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         System.out.println("Sending " + i);
+         ObjectMessage message = session.createObjectMessage(new AnythingSerializable(i));
+         producer.send(message);
+      }
+
+      Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         ObjectMessage msg = (ObjectMessage) consumer.receive(5000);
+         Assert.assertNotNull("Could not receive message count=" + i + " on consumer", msg);
+
+         AnythingSerializable someSerialThing = (AnythingSerializable) msg.getObject();
+         Assert.assertEquals(i, someSerialThing.getCount());
+      }
+
+      long taken = (System.currentTimeMillis() - time) / 1000;
+      System.out.println("taken = " + taken);
+   }
+
+   public static class AnythingSerializable implements Serializable {
+      private static final long serialVersionUID = 5972085029690947807L;
+
+      private int count;
+
+      public AnythingSerializable(int count) {
+         this.count = count;
+      }
+
+      public int getCount() {
+         return count;
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testPropertiesArePreserved() throws Exception {
+      Connection connection = createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      Queue queue = session.createQueue(getQueueName());
+      MessageProducer producer = session.createProducer(queue);
+      TextMessage message = session.createTextMessage();
+      message.setText("msg:0");
+      message.setBooleanProperty("true", true);
+      message.setBooleanProperty("false", false);
+      message.setStringProperty("foo", "bar");
+      message.setDoubleProperty("double", 66.6);
+      message.setFloatProperty("float", 56.789f);
+      message.setIntProperty("int", 8);
+      message.setByteProperty("byte", (byte) 10);
+
+      producer.send(message);
+      producer.send(message);
+
+      connection.start();
+
+      MessageConsumer messageConsumer = session.createConsumer(queue);
+      TextMessage received = (TextMessage) messageConsumer.receive(5000);
+      Assert.assertNotNull(received);
+      Assert.assertEquals("msg:0", received.getText());
+      Assert.assertEquals(received.getBooleanProperty("true"), true);
+      Assert.assertEquals(received.getBooleanProperty("false"), false);
+      Assert.assertEquals(received.getStringProperty("foo"), "bar");
+      Assert.assertEquals(received.getDoubleProperty("double"), 66.6, 0.0001);
+      Assert.assertEquals(received.getFloatProperty("float"), 56.789f, 0.0001);
+      Assert.assertEquals(received.getIntProperty("int"), 8);
+      Assert.assertEquals(received.getByteProperty("byte"), (byte) 10);
+
+      received = (TextMessage) messageConsumer.receive(5000);
+      Assert.assertNotNull(received);
+
+      connection.close();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSQueueBrowserTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSQueueBrowserTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSQueueBrowserTest.java
new file mode 100644
index 0000000..45bec32
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSQueueBrowserTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.Enumeration;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for various QueueBrowser scenarios with an AMQP JMS client.
+ */
+public class JMSQueueBrowserTest extends JMSClientTestSupport {
+
+   protected static final Logger LOG = LoggerFactory.getLogger(JMSQueueBrowserTest.class);
+
+   @Test(timeout = 60000)
+   public void testBrowseAllInQueueZeroPrefetch() throws Exception {
+
+      final int MSG_COUNT = 5;
+
+      JmsConnection connection = (JmsConnection) createConnection();
+      ((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setAll(0);
+
+      connection.start();
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      assertNotNull(session);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+      sendMessages(name.getMethodName(), MSG_COUNT, false);
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT));
+
+      QueueBrowser browser = session.createBrowser(queue);
+      assertNotNull(browser);
+      Enumeration<?> enumeration = browser.getEnumeration();
+      int count = 0;
+      while (count < MSG_COUNT && enumeration.hasMoreElements()) {
+         Message msg = (Message) enumeration.nextElement();
+         assertNotNull(msg);
+         LOG.debug("Recv: {}", msg);
+         count++;
+      }
+
+      LOG.debug("Received all expected message, checking that hasMoreElements returns false");
+      assertFalse(enumeration.hasMoreElements());
+      assertEquals(5, count);
+   }
+
+   @Test(timeout = 40000)
+   public void testCreateQueueBrowser() throws Exception {
+      Connection connection = createConnection();
+      connection.start();
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      assertNotNull(session);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+      session.createConsumer(queue).close();
+
+      QueueBrowser browser = session.createBrowser(queue);
+      assertNotNull(browser);
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertEquals(0, queueView.getMessageCount());
+   }
+
+   @Test(timeout = 40000)
+   public void testNoMessagesBrowserHasNoElements() throws Exception {
+      Connection connection = createConnection();
+      connection.start();
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      assertNotNull(session);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+      session.createConsumer(queue).close();
+
+      QueueBrowser browser = session.createBrowser(queue);
+      assertNotNull(browser);
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertEquals(0, queueView.getMessageCount());
+
+      Enumeration<?> enumeration = browser.getEnumeration();
+      assertFalse(enumeration.hasMoreElements());
+   }
+
+   @Test(timeout = 30000)
+   public void testBroseOneInQueue() throws Exception {
+      Connection connection = createConnection();
+      connection.start();
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+      MessageProducer producer = session.createProducer(queue);
+      producer.send(session.createTextMessage("hello"));
+      producer.close();
+
+      QueueBrowser browser = session.createBrowser(queue);
+      Enumeration<?> enumeration = browser.getEnumeration();
+      while (enumeration.hasMoreElements()) {
+         Message m = (Message) enumeration.nextElement();
+         assertTrue(m instanceof TextMessage);
+         LOG.debug("Browsed message {} from Queue {}", m, queue);
+      }
+
+      browser.close();
+
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message msg = consumer.receive(5000);
+      assertNotNull(msg);
+      assertTrue(msg instanceof TextMessage);
+   }
+
+   @Test(timeout = 60000)
+   public void testBrowseAllInQueue() throws Exception {
+      Connection connection = createConnection();
+      connection.start();
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      assertNotNull(session);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+      sendMessages(name.getMethodName(), 5, false);
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == 5));
+
+      QueueBrowser browser = session.createBrowser(queue);
+      assertNotNull(browser);
+      Enumeration<?> enumeration = browser.getEnumeration();
+      int count = 0;
+      while (enumeration.hasMoreElements()) {
+         Message msg = (Message) enumeration.nextElement();
+         assertNotNull(msg);
+         LOG.debug("Recv: {}", msg);
+         count++;
+         TimeUnit.MILLISECONDS.sleep(50);
+      }
+      assertFalse(enumeration.hasMoreElements());
+      assertEquals(5, count);
+   }
+
+   @Test(timeout = 60000)
+   public void testBrowseAllInQueuePrefetchOne() throws Exception {
+      Connection connection = createConnection();
+      connection.start();
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      assertNotNull(session);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+      sendMessages(name.getMethodName(), 5, false);
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == 5));
+
+      QueueBrowser browser = session.createBrowser(queue);
+      assertNotNull(browser);
+      Enumeration<?> enumeration = browser.getEnumeration();
+      int count = 0;
+      while (enumeration.hasMoreElements()) {
+         Message msg = (Message) enumeration.nextElement();
+         assertNotNull(msg);
+         LOG.debug("Recv: {}", msg);
+         count++;
+      }
+      assertFalse(enumeration.hasMoreElements());
+      assertEquals(5, count);
+   }
+
+   @Test(timeout = 40000)
+   public void testBrowseAllInQueueTxSession() throws Exception {
+      Connection connection = createConnection();
+      connection.start();
+
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      assertNotNull(session);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+      sendMessages(name.getMethodName(), 5, false);
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == 5));
+
+      QueueBrowser browser = session.createBrowser(queue);
+      assertNotNull(browser);
+      Enumeration<?> enumeration = browser.getEnumeration();
+      int count = 0;
+      while (enumeration.hasMoreElements()) {
+         Message msg = (Message) enumeration.nextElement();
+         assertNotNull(msg);
+         LOG.debug("Recv: {}", msg);
+         count++;
+      }
+      assertFalse(enumeration.hasMoreElements());
+      assertEquals(5, count);
+   }
+
+   @Test(timeout = 40000)
+   public void testQueueBrowserInTxSessionLeavesOtherWorkUnaffected() throws Exception {
+      Connection connection = createConnection();
+      connection.start();
+
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      assertNotNull(session);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+      sendMessages(name.getMethodName(), 5, false);
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == 5));
+
+      // Send some TX work but don't commit.
+      MessageProducer txProducer = session.createProducer(queue);
+      for (int i = 0; i < 5; ++i) {
+         txProducer.send(session.createMessage());
+      }
+
+      assertEquals(5, queueView.getMessageCount());
+
+      QueueBrowser browser = session.createBrowser(queue);
+      assertNotNull(browser);
+      Enumeration<?> enumeration = browser.getEnumeration();
+      int count = 0;
+      while (enumeration.hasMoreElements()) {
+         Message msg = (Message) enumeration.nextElement();
+         assertNotNull(msg);
+         LOG.debug("Recv: {}", msg);
+         count++;
+      }
+
+      assertFalse(enumeration.hasMoreElements());
+      assertEquals(5, count);
+
+      browser.close();
+
+      // Now check that all browser work did not affect the session transaction.
+      assertEquals(5, queueView.getMessageCount());
+      session.commit();
+      assertEquals(10, queueView.getMessageCount());
+   }
+
+   @Test(timeout = 60000)
+   public void testBrowseAllInQueueSmallPrefetch() throws Exception {
+      Connection connection = createConnection();
+      connection.start();
+
+      final int MSG_COUNT = 30;
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      assertNotNull(session);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+      sendMessages(name.getMethodName(), MSG_COUNT, false);
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT));
+
+      QueueBrowser browser = session.createBrowser(queue);
+      assertNotNull(browser);
+      Enumeration<?> enumeration = browser.getEnumeration();
+      int count = 0;
+      while (enumeration.hasMoreElements()) {
+         Message msg = (Message) enumeration.nextElement();
+         assertNotNull(msg);
+         LOG.debug("Recv: {}", msg);
+         count++;
+      }
+      assertFalse(enumeration.hasMoreElements());
+      assertEquals(MSG_COUNT, count);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java
new file mode 100644
index 0000000..776d553
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Test;
+
+public class JMSTemporaryDestinationTest extends JMSClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testCreateTemporaryQueue() throws Throwable {
+      Connection connection = createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         TemporaryQueue queue = session.createTemporaryQueue();
+         System.out.println("queue:" + queue.getQueueName());
+         MessageProducer producer = session.createProducer(queue);
+
+         TextMessage message = session.createTextMessage();
+         message.setText("Message temporary");
+         producer.send(message);
+
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+
+         message = (TextMessage) consumer.receive(5000);
+
+         assertNotNull(message);
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 30000)
+   public void testDeleteTemporaryQueue() throws Exception {
+      Connection connection = createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         final javax.jms.Queue queue = session.createTemporaryQueue();
+         assertNotNull(queue);
+         assertTrue(queue instanceof TemporaryQueue);
+
+         Queue queueView = getProxyToQueue(queue.getQueueName());
+         assertNotNull(queueView);
+
+         TemporaryQueue tempQueue = (TemporaryQueue) queue;
+         tempQueue.delete();
+
+         assertTrue("Temp Queue should be deleted.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisfied() throws Exception {
+               return getProxyToQueue(queue.getQueueName()) == null;
+            }
+         }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testCreateTemporaryTopic() throws Throwable {
+      Connection connection = createConnection();
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      TemporaryTopic topic = session.createTemporaryTopic();
+
+      System.out.println("topic:" + topic.getTopicName());
+      MessageConsumer consumer = session.createConsumer(topic);
+      MessageProducer producer = session.createProducer(topic);
+
+      TextMessage message = session.createTextMessage();
+      message.setText("Message temporary");
+      producer.send(message);
+
+      connection.start();
+
+      message = (TextMessage) consumer.receive(5000);
+
+      assertNotNull(message);
+   }
+
+   @Test(timeout = 30000)
+   public void testDeleteTemporaryTopic() throws Exception {
+      Connection connection = createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         final javax.jms.Topic topic = session.createTemporaryTopic();
+         assertNotNull(topic);
+         assertTrue(topic instanceof TemporaryTopic);
+
+         Queue queueView = getProxyToQueue(topic.getTopicName());
+         assertNotNull(queueView);
+
+         TemporaryTopic tempTopic = (TemporaryTopic) topic;
+         tempTopic.delete();
+
+         assertTrue("Temp Queue should be deleted.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisfied() throws Exception {
+               return getProxyToQueue(topic.getTopicName()) == null;
+            }
+         }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
+      } finally {
+         connection.close();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTopicConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTopicConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTopicConsumerTest.java
new file mode 100644
index 0000000..52bd247
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTopicConsumerTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JMSTopicConsumerTest extends JMSClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testSendAndReceiveOnTopic() throws Exception {
+      Connection connection = createConnection("myClientId");
+
+      try {
+         TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(getTopicName());
+         TopicSubscriber consumer = session.createSubscriber(topic);
+         TopicPublisher producer = session.createPublisher(topic);
+
+         TextMessage message = session.createTextMessage("test-message");
+         producer.send(message);
+
+         producer.close();
+         connection.start();
+
+         message = (TextMessage) consumer.receive(1000);
+
+         assertNotNull(message);
+         assertNotNull(message.getText());
+         assertEquals("test-message", message.getText());
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testSendWithMultipleReceiversOnTopic() throws Exception {
+      Connection connection = createConnection();
+
+      try {
+         TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(getTopicName());
+         TopicSubscriber consumer1 = session.createSubscriber(topic);
+         TopicSubscriber consumer2 = session.createSubscriber(topic);
+         TopicPublisher producer = session.createPublisher(topic);
+
+         TextMessage message = session.createTextMessage("test-message");
+         producer.send(message);
+
+         producer.close();
+         connection.start();
+
+         message = (TextMessage) consumer1.receive(1000);
+
+         assertNotNull(message);
+         assertNotNull(message.getText());
+         assertEquals("test-message", message.getText());
+
+         message = (TextMessage) consumer2.receive(1000);
+
+         assertNotNull(message);
+         assertNotNull(message.getText());
+         assertEquals("test-message", message.getText());
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testDurableSubscriptionUnsubscribe() throws Exception {
+      Connection connection = createConnection("myClientId");
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(getTopicName());
+         TopicSubscriber myDurSub = session.createDurableSubscriber(topic, "myDurSub");
+         session.close();
+         connection.close();
+
+         connection = createConnection("myClientId");
+         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         myDurSub = session.createDurableSubscriber(topic, "myDurSub");
+         myDurSub.close();
+
+         Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")));
+         session.unsubscribe("myDurSub");
+         Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")));
+         session.close();
+         connection.close();
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testTemporarySubscriptionDeleted() throws Exception {
+      Connection connection = createConnection();
+
+      try {
+         TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(getTopicName());
+         TopicSubscriber myNonDurSub = session.createSubscriber(topic);
+         assertNotNull(myNonDurSub);
+
+         Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString(getTopicName()));
+         Assert.assertEquals(2, bindingsForAddress.getBindings().size());
+         session.close();
+
+         final CountDownLatch latch = new CountDownLatch(1);
+         server.getRemotingService().getConnections().iterator().next().addCloseListener(new CloseListener() {
+            @Override
+            public void connectionClosed() {
+               latch.countDown();
+            }
+         });
+
+         connection.close();
+         latch.await(5, TimeUnit.SECONDS);
+         bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString(getTopicName()));
+         Assert.assertEquals(1, bindingsForAddress.getBindings().size());
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testMultipleDurableConsumersSendAndReceive() throws Exception {
+      Connection connection = createConnection("myClientId");
+
+      try {
+         TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(getTopicName());
+
+         int numMessages = 100;
+         TopicSubscriber sub1 = session.createDurableSubscriber(topic, "myPubId1");
+         TopicSubscriber sub2 = session.createDurableSubscriber(topic, "myPubId2");
+         TopicSubscriber sub3 = session.createDurableSubscriber(topic, "myPubId3");
+
+         Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = sendSession.createProducer(topic);
+         connection.start();
+         for (int i = 0; i < numMessages; i++) {
+            producer.send(sendSession.createTextMessage("message:" + i));
+         }
+
+         for (int i = 0; i < numMessages; i++) {
+            TextMessage receive = (TextMessage) sub1.receive(5000);
+            Assert.assertNotNull(receive);
+            Assert.assertEquals(receive.getText(), "message:" + i);
+            receive = (TextMessage) sub2.receive(5000);
+            Assert.assertNotNull(receive);
+            Assert.assertEquals(receive.getText(), "message:" + i);
+            receive = (TextMessage) sub3.receive(5000);
+            Assert.assertNotNull(receive);
+            Assert.assertEquals(receive.getText(), "message:" + i);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testDurableSubscriptionReconnection() throws Exception {
+      Connection connection = createConnection("myClientId");
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(getTopicName());
+
+         int numMessages = 100;
+         TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+
+         Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = sendSession.createProducer(topic);
+         connection.start();
+
+         for (int i = 0; i < numMessages; i++) {
+            producer.send(sendSession.createTextMessage("message:" + i));
+         }
+
+         for (int i = 0; i < numMessages; i++) {
+            TextMessage receive = (TextMessage) sub.receive(5000);
+            Assert.assertNotNull(receive);
+            Assert.assertEquals(receive.getText(), "message:" + i);
+         }
+
+         connection.close();
+         connection = createConnection("myClientId");
+         connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+               exception.printStackTrace();
+            }
+         });
+         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         sub = session.createDurableSubscriber(topic, "myPubId");
+
+         sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         producer = sendSession.createProducer(topic);
+         connection.start();
+         for (int i = 0; i < numMessages; i++) {
+            producer.send(sendSession.createTextMessage("message:" + i));
+         }
+         for (int i = 0; i < numMessages; i++) {
+            TextMessage receive = (TextMessage) sub.receive(5000);
+            Assert.assertNotNull(receive);
+            Assert.assertEquals(receive.getText(), "message:" + i);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTransactionTest.java
new file mode 100644
index 0000000..c7f73c1
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTransactionTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JMSTransactionTest extends JMSClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testProduceMessageAndCommit() throws Throwable {
+      Connection connection = createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+
+      System.out.println("queue:" + queue.getQueueName());
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("Message:" + i);
+         p.send(message);
+      }
+
+      session.commit();
+      session.close();
+
+      Queue queueView = getProxyToQueue(getQueueName());
+
+      assertTrue("Message didn't arrive on queue", Wait.waitFor(() -> queueView.getMessageCount() == 10));
+   }
+
+   @Test(timeout = 60000)
+   public void testProduceMessageAndRollback() throws Throwable {
+      Connection connection = createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+
+      System.out.println("queue:" + queue.getQueueName());
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("Message:" + i);
+         p.send(message);
+      }
+
+      session.rollback();
+      session.close();
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertTrue("Messages arrived on queue", Wait.waitFor(() -> queueView.getMessageCount() == 0));
+   }
+
+   @Test(timeout = 60000)
+   public void testProducedMessageAreRolledBackOnSessionClose() throws Exception {
+      int numMessages = 10;
+
+      Connection connection = createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+
+      MessageProducer p = session.createProducer(queue);
+      byte[] bytes = new byte[2048];
+      new Random().nextBytes(bytes);
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("msg:" + i);
+         p.send(message);
+      }
+
+      session.close();
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertTrue("Messages arrived on queue", Wait.waitFor(() -> queueView.getMessageCount() == 0));
+   }
+
+   @Test(timeout = 60000)
+   public void testConsumeMessagesAndCommit() throws Throwable {
+      Connection connection = createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+
+      System.out.println("queue:" + queue.getQueueName());
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("Message:" + i);
+         p.send(message);
+      }
+      session.close();
+
+      session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      MessageConsumer cons = session.createConsumer(queue);
+      connection.start();
+
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = (TextMessage) cons.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals("Message:" + i, message.getText());
+      }
+      session.commit();
+      session.close();
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertTrue("Messages not consumed", Wait.waitFor(() -> queueView.getMessageCount() == 0));
+   }
+
+   @Test(timeout = 60000)
+   public void testConsumeMessagesAndRollback() throws Throwable {
+      Connection connection = createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("Message:" + i);
+         p.send(message);
+      }
+      session.close();
+
+      session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      MessageConsumer cons = session.createConsumer(queue);
+      connection.start();
+
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = (TextMessage) cons.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals("Message:" + i, message.getText());
+      }
+
+      session.rollback();
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertTrue("Messages consumed", Wait.waitFor(() -> queueView.getMessageCount() == 10));
+   }
+
+   @Test(timeout = 60000)
+   public void testRollbackSomeThenReceiveAndCommit() throws Exception {
+      final int MSG_COUNT = 5;
+      final int consumeBeforeRollback = 2;
+
+      Connection connection = createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < MSG_COUNT; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("Message:" + i);
+         message.setIntProperty("MESSAGE_NUMBER", i + 1);
+         p.send(message);
+      }
+
+      session.commit();
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertTrue("Messages not enqueued", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT));
+
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      for (int i = 1; i <= consumeBeforeRollback; i++) {
+         Message message = consumer.receive(1000);
+         assertNotNull(message);
+         assertEquals("Unexpected message number", i, message.getIntProperty("MESSAGE_NUMBER"));
+      }
+
+      session.rollback();
+
+      assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+      // Consume again..check we receive all the messages.
+      Set<Integer> messageNumbers = new HashSet<>();
+      for (int i = 1; i <= MSG_COUNT; i++) {
+         messageNumbers.add(i);
+      }
+
+      for (int i = 1; i <= MSG_COUNT; i++) {
+         Message message = consumer.receive(1000);
+         assertNotNull(message);
+         int msgNum = message.getIntProperty("MESSAGE_NUMBER");
+         messageNumbers.remove(msgNum);
+      }
+
+      session.commit();
+
+      assertTrue("Did not consume all expected messages, missing messages: " + messageNumbers, messageNumbers.isEmpty());
+      assertEquals("Queue should have no messages left after commit", 0, queueView.getMessageCount());
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonFullQualifiedNameTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonFullQualifiedNameTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonFullQualifiedNameTest.java
deleted file mode 100644
index 22ba64d..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonFullQualifiedNameTest.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.amqp;
-
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.api.core.client.ClientProducer;
-import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
-import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.utils.CompositeAddress;
-import org.apache.qpid.jms.JmsConnectionFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import javax.jms.Connection;
-import javax.jms.InvalidDestinationException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.Topic;
-import java.util.HashMap;
-import java.util.Map;
-
-public class ProtonFullQualifiedNameTest extends ProtonTestBase {
-
-   private static final String amqpConnectionUri = "amqp://localhost:5672";
-
-   private SimpleString anycastAddress = new SimpleString("address.anycast");
-   private SimpleString multicastAddress = new SimpleString("address.multicast");
-
-   private SimpleString anycastQ1 = new SimpleString("q1");
-   private SimpleString anycastQ2 = new SimpleString("q2");
-   private SimpleString anycastQ3 = new SimpleString("q3");
-
-   JmsConnectionFactory factory = new JmsConnectionFactory(amqpConnectionUri);
-   private ServerLocator locator;
-
-   @Override
-   @Before
-   public void setUp() throws Exception {
-      super.setUp();
-
-      Configuration serverConfig = server.getConfiguration();
-
-      Map<String, AddressSettings> settings = serverConfig.getAddressesSettings();
-      assertNotNull(settings);
-      AddressSettings addressSetting = settings.get("#");
-      if (addressSetting == null) {
-         addressSetting = new AddressSettings();
-         settings.put("#", addressSetting);
-      }
-      addressSetting.setAutoCreateQueues(true);
-      locator = createNettyNonHALocator();
-   }
-
-   @Override
-   @After
-   public void tearDown() throws Exception {
-      super.tearDown();
-   }
-
-   @Override
-   protected void configureServer(Configuration serverConfig) {
-      serverConfig.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, new HashMap<String, Object>(), "netty", new HashMap<String, Object>()));
-   }
-
-   @Test
-   //there isn't much use of FQQN for topics
-   //however we can test query functionality
-   public void testTopic() throws Exception {
-
-      Connection connection = factory.createConnection();
-      try {
-         connection.setClientID("FQQNconn");
-         connection.start();
-         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Topic topic = session.createTopic(multicastAddress.toString());
-
-         MessageConsumer consumer1 = session.createConsumer(topic);
-         MessageConsumer consumer2 = session.createConsumer(topic);
-         MessageConsumer consumer3 = session.createConsumer(topic);
-
-         MessageProducer producer = session.createProducer(topic);
-
-         producer.send(session.createMessage());
-
-         //each consumer receives one
-         Message m = consumer1.receive(2000);
-         assertNotNull(m);
-         m = consumer2.receive(2000);
-         assertNotNull(m);
-         m = consumer3.receive(2000);
-         assertNotNull(m);
-
-         Bindings bindings = server.getPostOffice().getBindingsForAddress(multicastAddress);
-         for (Binding b : bindings.getBindings()) {
-            System.out.println("checking binidng " + b.getUniqueName() + " " + ((LocalQueueBinding)b).getQueue().getDeliveringMessages());
-            SimpleString qName = b.getUniqueName();
-            //do FQQN query
-            QueueQueryResult result = server.queueQuery(CompositeAddress.toFullQN(multicastAddress, qName));
-            assertTrue(result.isExists());
-            assertEquals(result.getName(), CompositeAddress.toFullQN(multicastAddress, qName));
-            //do qname query
-            result = server.queueQuery(qName);
-            assertTrue(result.isExists());
-            assertEquals(result.getName(), qName);
-         }
-      } finally {
-         connection.close();
-      }
-   }
-
-   @Test
-   public void testQueue() throws Exception {
-      server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);
-      server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ2, null, true, false, -1, false, true);
-      server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ3, null, true, false, -1, false, true);
-
-      Connection connection = factory.createConnection();
-      try {
-         connection.start();
-         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         Queue q1 = session.createQueue(CompositeAddress.toFullQN(anycastAddress, anycastQ1).toString());
-         Queue q2 = session.createQueue(CompositeAddress.toFullQN(anycastAddress, anycastQ2).toString());
-         Queue q3 = session.createQueue(CompositeAddress.toFullQN(anycastAddress, anycastQ3).toString());
-
-         //send 3 messages to anycastAddress
-         ClientSessionFactory cf = createSessionFactory(locator);
-         ClientSession coreSession = cf.createSession();
-
-         //send 3 messages
-         ClientProducer coreProducer = coreSession.createProducer(anycastAddress);
-         sendMessages(coreSession, coreProducer, 3);
-
-         MessageConsumer consumer1 = session.createConsumer(q1);
-         MessageConsumer consumer2 = session.createConsumer(q2);
-         MessageConsumer consumer3 = session.createConsumer(q3);
-
-         //each consumer receives one
-         assertNotNull(consumer1.receive(2000));
-         assertNotNull(consumer2.receive(2000));
-         assertNotNull(consumer3.receive(2000));
-
-         connection.close();
-         //queues are empty now
-         for (SimpleString q : new SimpleString[]{anycastQ1, anycastQ2, anycastQ3}) {
-            //FQQN query
-            QueueQueryResult query = server.queueQuery(CompositeAddress.toFullQN(anycastAddress, q));
-            assertTrue(query.isExists());
-            assertEquals(anycastAddress, query.getAddress());
-            assertEquals(CompositeAddress.toFullQN(anycastAddress, q), query.getName());
-            assertEquals(0, query.getMessageCount());
-            //try query again using qName
-            query = server.queueQuery(q);
-            assertEquals(q, query.getName());
-         }
-      } finally {
-         connection.close();
-      }
-   }
-
-   @Test
-   public void testQueueSpecial() throws Exception {
-      server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);
-
-      Connection connection = factory.createConnection();
-      try {
-         connection.start();
-         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         //::queue ok!
-         String specialName = CompositeAddress.toFullQN(new SimpleString(""), anycastQ1).toString();
-         Queue q1 = session.createQueue(specialName);
-
-         ClientSessionFactory cf = createSessionFactory(locator);
-         ClientSession coreSession = cf.createSession();
-
-         ClientProducer coreProducer = coreSession.createProducer(anycastAddress);
-         sendMessages(coreSession, coreProducer, 1);
-
-         System.out.println("create consumer: " + q1);
-         MessageConsumer consumer1 = session.createConsumer(q1);
-
-         assertNotNull(consumer1.receive(2000));
-
-         //queue::
-         specialName = CompositeAddress.toFullQN(anycastQ1, new SimpleString("")).toString();
-         q1 = session.createQueue(specialName);
-         try {
-            session.createConsumer(q1);
-            fail("should get exception");
-         } catch (InvalidDestinationException e) {
-            //expected
-         }
-
-         //::
-         specialName = CompositeAddress.toFullQN(new SimpleString(""), new SimpleString("")).toString();
-         q1 = session.createQueue(specialName);
-         try {
-            session.createConsumer(q1);
-            fail("should get exception");
-         } catch (InvalidDestinationException e) {
-            //expected
-         }
-
-      } finally {
-         connection.close();
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java
deleted file mode 100644
index 851ee2f..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.amqp;
-
-import java.net.URI;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.transport.amqp.client.AmqpClient;
-import org.apache.activemq.transport.amqp.client.AmqpConnection;
-import org.apache.activemq.transport.amqp.client.AmqpMessage;
-import org.apache.activemq.transport.amqp.client.AmqpReceiver;
-import org.apache.activemq.transport.amqp.client.AmqpSender;
-import org.apache.activemq.transport.amqp.client.AmqpSession;
-import org.apache.qpid.proton.amqp.messaging.Data;
-import org.apache.qpid.proton.message.impl.MessageImpl;
-import org.junit.Test;
-
-public class ProtonMaxFrameSizeTest extends ProtonTestBase {
-
-   private static final int FRAME_SIZE = 512;
-
-   @Override
-   protected void configureAmqp(Map<String, Object> params) {
-      params.put("maxFrameSize", FRAME_SIZE);
-   }
-
-   @Test
-   public void testMultipleTransfers() throws Exception {
-
-      String testQueueName = "ConnectionFrameSize";
-      int nMsgs = 200;
-
-      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
-
-      AmqpConnection amqpConnection = client.createConnection();
-
-      try {
-         amqpConnection.connect();
-
-         AmqpSession session = amqpConnection.createSession();
-         AmqpSender sender = session.createSender(testQueueName);
-
-         final int payload = FRAME_SIZE * 16;
-
-         for (int i = 0; i < nMsgs; ++i) {
-            AmqpMessage message = createAmqpMessage((byte) 'A', payload);
-            sender.send(message);
-         }
-
-         int count = getMessageCount(server.getPostOffice(), testQueueName);
-         assertEquals(nMsgs, count);
-
-         AmqpReceiver receiver = session.createReceiver(testQueueName);
-         receiver.flow(nMsgs);
-
-         for (int i = 0; i < nMsgs; ++i) {
-            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-            assertNotNull("failed at " + i, message);
-            MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
-            Data data = (Data) wrapped.getBody();
-            System.out.println("received : message: " + data.getValue().getLength());
-            assertEquals(payload, data.getValue().getLength());
-            message.accept();
-         }
-
-      } finally {
-         amqpConnection.close();
-      }
-   }
-
-   private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
-      AmqpMessage message = new AmqpMessage();
-      byte[] payload = new byte[payloadSize];
-      for (int i = 0; i < payload.length; i++) {
-         payload[i] = value;
-      }
-      message.setBytes(payload);
-      return message;
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
deleted file mode 100644
index 42f30ac..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.amqp;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import java.util.Map;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
-import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.qpid.jms.JmsConnectionFactory;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ProtonPubSubTest extends ProtonTestBase {
-
-   private final String prefix = "foo.bar.";
-   private final String pubAddress = "pubAddress";
-   private final String prefixedPubAddress = prefix + "pubAddress";
-   private final SimpleString ssPubAddress = new SimpleString(pubAddress);
-   private final SimpleString ssprefixedPubAddress = new SimpleString(prefixedPubAddress);
-   private Connection connection;
-   private JmsConnectionFactory factory;
-
-   @Override
-   protected void configureAmqp(Map<String, Object> params) {
-      params.put("pubSubPrefix", prefix);
-   }
-
-   @Override
-   @Before
-   public void setUp() throws Exception {
-      super.setUp();
-      server.addAddressInfo(new AddressInfo(ssPubAddress, RoutingType.MULTICAST));
-      server.addAddressInfo(new AddressInfo(ssprefixedPubAddress, RoutingType.MULTICAST));
-      server.createQueue(ssPubAddress, RoutingType.MULTICAST, ssPubAddress, new SimpleString("foo=bar"), false, true);
-      server.createQueue(ssprefixedPubAddress, RoutingType.MULTICAST, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
-      factory = new JmsConnectionFactory("amqp://localhost:5672");
-      factory.setClientID("myClientID");
-      connection = factory.createConnection();
-      connection.setExceptionListener(new ExceptionListener() {
-         @Override
-         public void onException(JMSException exception) {
-            exception.printStackTrace();
-         }
-      });
-
-   }
-
-   @Override
-   @After
-   public void tearDown() throws Exception {
-      try {
-         Thread.sleep(250);
-         if (connection != null) {
-            connection.close();
-         }
-      } finally {
-         super.tearDown();
-      }
-   }
-
-   @Test
-   public void testNonDurablePubSub() throws Exception {
-      int numMessages = 100;
-      Topic topic = createTopic(pubAddress);
-      TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer sub = session.createSubscriber(topic);
-
-      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = sendSession.createProducer(topic);
-      connection.start();
-      for (int i = 0; i < numMessages; i++) {
-         producer.send(sendSession.createTextMessage("message:" + i));
-      }
-      for (int i = 0; i < numMessages; i++) {
-         TextMessage receive = (TextMessage) sub.receive(5000);
-         Assert.assertNotNull(receive);
-         Assert.assertEquals(receive.getText(), "message:" + i);
-      }
-   }
-
-   @Test
-   public void testNonDurablePubSubQueueDeleted() throws Exception {
-      int numMessages = 100;
-      Topic topic = createTopic(pubAddress);
-      TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer sub = session.createSubscriber(topic);
-      Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString(pubAddress));
-      assertEquals(2, bindingsForAddress.getBindings().size());
-      sub.close();
-      Thread.sleep(1000);
-      assertEquals(1, bindingsForAddress.getBindings().size());
-   }
-
-   @Test
-   public void testNonDurableMultiplePubSub() throws Exception {
-      int numMessages = 100;
-      Topic topic = createTopic(pubAddress);
-      TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer sub = session.createSubscriber(topic);
-      MessageConsumer sub2 = session.createSubscriber(topic);
-      MessageConsumer sub3 = session.createSubscriber(topic);
-
-      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = sendSession.createProducer(topic);
-      connection.start();
-      for (int i = 0; i < numMessages; i++) {
-         producer.send(sendSession.createTextMessage("message:" + i));
-      }
-      for (int i = 0; i < numMessages; i++) {
-         TextMessage receive = (TextMessage) sub.receive(5000);
-         Assert.assertNotNull(receive);
-         Assert.assertEquals(receive.getText(), "message:" + i);
-         receive = (TextMessage) sub2.receive(5000);
-         Assert.assertNotNull(receive);
-         Assert.assertEquals(receive.getText(), "message:" + i);
-         receive = (TextMessage) sub3.receive(5000);
-         Assert.assertNotNull(receive);
-         Assert.assertEquals(receive.getText(), "message:" + i);
-      }
-   }
-
-   @Test
-   public void testDurablePubSub() throws Exception {
-      int numMessages = 100;
-      Topic topic = createTopic(pubAddress);
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
-
-      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = sendSession.createProducer(topic);
-      connection.start();
-      for (int i = 0; i < numMessages; i++) {
-         producer.send(sendSession.createTextMessage("message:" + i));
-      }
-      for (int i = 0; i < numMessages; i++) {
-         TextMessage receive = (TextMessage) sub.receive(5000);
-         Assert.assertNotNull(receive);
-         Assert.assertEquals(receive.getText(), "message:" + i);
-      }
-   }
-
-   @Test
-   public void testDurableMultiplePubSub() throws Exception {
-      int numMessages = 100;
-      Topic topic = createTopic(pubAddress);
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
-      TopicSubscriber sub2 = session.createDurableSubscriber(topic, "myPubId2");
-      TopicSubscriber sub3 = session.createDurableSubscriber(topic, "myPubId3");
-
-      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = sendSession.createProducer(topic);
-      connection.start();
-      for (int i = 0; i < numMessages; i++) {
-         producer.send(sendSession.createTextMessage("message:" + i));
-      }
-      for (int i = 0; i < numMessages; i++) {
-         TextMessage receive = (TextMessage) sub.receive(5000);
-         Assert.assertNotNull(receive);
-         Assert.assertEquals(receive.getText(), "message:" + i);
-         receive = (TextMessage) sub2.receive(5000);
-         Assert.assertNotNull(receive);
-         Assert.assertEquals(receive.getText(), "message:" + i);
-         receive = (TextMessage) sub3.receive(5000);
-         Assert.assertNotNull(receive);
-         Assert.assertEquals(receive.getText(), "message:" + i);
-      }
-   }
-
-   @Test
-   public void testDurablePubSubReconnect() throws Exception {
-      int numMessages = 100;
-      Topic topic = createTopic(pubAddress);
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
-
-      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = sendSession.createProducer(topic);
-      connection.start();
-      for (int i = 0; i < numMessages; i++) {
-         producer.send(sendSession.createTextMessage("message:" + i));
-      }
-      for (int i = 0; i < numMessages; i++) {
-         TextMessage receive = (TextMessage) sub.receive(5000);
-         Assert.assertNotNull(receive);
-         Assert.assertEquals(receive.getText(), "message:" + i);
-      }
-      connection.close();
-      connection = factory.createConnection();
-      connection.setExceptionListener(new ExceptionListener() {
-         @Override
-         public void onException(JMSException exception) {
-            exception.printStackTrace();
-         }
-      });
-      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      sub = session.createDurableSubscriber(topic, "myPubId");
-
-      sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      producer = sendSession.createProducer(topic);
-      connection.start();
-      for (int i = 0; i < numMessages; i++) {
-         producer.send(sendSession.createTextMessage("message:" + i));
-      }
-      for (int i = 0; i < numMessages; i++) {
-         TextMessage receive = (TextMessage) sub.receive(5000);
-         Assert.assertNotNull(receive);
-         Assert.assertEquals(receive.getText(), "message:" + i);
-      }
-   }
-
-   @Test
-   public void testDurablePubSubUnsubscribe() throws Exception {
-      int numMessages = 100;
-      Topic topic = createTopic(pubAddress);
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
-
-      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = sendSession.createProducer(topic);
-      connection.start();
-      for (int i = 0; i < numMessages; i++) {
-         producer.send(sendSession.createTextMessage("message:" + i));
-      }
-      for (int i = 0; i < numMessages; i++) {
-         TextMessage receive = (TextMessage) sub.receive(5000);
-         Assert.assertNotNull(receive);
-         Assert.assertEquals(receive.getText(), "message:" + i);
-      }
-      sub.close();
-      session.unsubscribe("myPubId");
-   }
-
-   private javax.jms.Topic createTopic(String address) throws Exception {
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      try {
-         return session.createTopic(address);
-      } finally {
-         session.close();
-      }
-   }
-}


Mime
View raw message