activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [5/7] activemq-artemis git commit: ARTEMIS-1123 Major AMQP Test Suite refactoring
Date Fri, 28 Apr 2017 09:17:02 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
index a360eb8..216b0ec 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
@@ -30,6 +30,8 @@ import org.junit.After;
  */
 public class AmqpTestSupport extends ActiveMQTestBase {
 
+   protected static final int AMQP_PORT = 5672;
+
    protected LinkedList<AmqpConnection> connections = new LinkedList<>();
 
    protected boolean useSSL;
@@ -65,7 +67,7 @@ public class AmqpTestSupport extends ActiveMQTestBase {
       boolean webSocket = false;
 
       try {
-         int port = 61616;
+         int port = AMQP_PORT;
 
          String uri = null;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index 3b231fa..493079a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.activemq.artemis.tests.integration.amqp;
 
 import java.io.IOException;
@@ -33,6 +32,7 @@ import javax.jms.Session;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -45,6 +45,7 @@ import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -98,7 +99,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
       sender.setStateInspector(new AmqpValidator() {
 
          @Override
-         public void inspectDeliveryUpdate(Delivery delivery) {
+         public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
             if (delivery.remotelySettled()) {
                DeliveryState state = delivery.getRemoteState();
                if (state instanceof TransactionalState) {
@@ -161,7 +162,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
       session.commit();
 
-      assertEquals(1, queue.getMessageCount());
+      assertTrue("Message was not queued", Wait.waitFor(() -> queue.getMessageCount() == 1));
 
       sender.close();
       connection.close();
@@ -205,7 +206,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
       message.setText("Test-Message");
       sender.send(message);
 
-      assertEquals(1, queue.getMessageCount());
+      assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
 
       AmqpReceiver receiver = session.createReceiver(getQueueName());
 
@@ -237,7 +238,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
       message.setText("Test-Message");
       sender.send(message);
 
-      assertEquals(1, queue.getMessageCount());
+      assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
 
       AmqpReceiver receiver = session.createReceiver(getQueueName());
 
@@ -281,7 +282,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
       message.setText("Test-Message");
       sender.send(message);
 
-      assertEquals(1, queue.getMessageCount());
+      assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
 
       AmqpReceiver receiver = session.createReceiver(getQueueName());
 
@@ -853,10 +854,10 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
    @Test(timeout = 120000)
    public void testSendPersistentTX() throws Exception {
-      int MESSAGE_COUNT = 100000;
+      int MESSAGE_COUNT = 2000;
       AtomicInteger errors = new AtomicInteger(0);
       server.createQueue(SimpleString.toSimpleString("q1"), RoutingType.ANYCAST, SimpleString.toSimpleString("q1"), null, true, false, 1, false, true);
-      ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
+      ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + AMQP_PORT);
       Connection sendConnection = factory.createConnection();
       Connection consumerConnection = factory.createConnection();
       try {
@@ -939,7 +940,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
       receiver.setStateInspector(new AmqpValidator() {
 
          @Override
-         public void inspectDeliveryUpdate(Delivery delivery) {
+         public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
             if (delivery.remotelySettled()) {
                LOG.info("Receiver got delivery update for: {}", delivery);
                if (!(delivery.getRemoteState() instanceof TransactionalState)) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
index a0f0393..e10c73d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,8 +16,13 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
-import org.apache.activemq.artemis.api.core.SimpleString;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
+
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -29,12 +34,6 @@ import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.junit.Test;
 
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
-import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
-
-
 public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport  {
 
    SimpleString address = new SimpleString("testAddress");
@@ -46,7 +45,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport  {
       server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
       server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
 
-      sendMessages(1, address.toString());
+      sendMessages(address.toString(), 1);
 
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
@@ -68,7 +67,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport  {
       server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
       server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
 
-      sendMessages(2, address.toString());
+      sendMessages(address.toString(), 2);
 
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
@@ -89,7 +88,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport  {
       server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
       server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
 
-      sendMessages(1, address.toString());
+      sendMessages(address.toString(), 1);
 
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
@@ -111,7 +110,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport  {
       server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
       server.createQueue(address, RoutingType.ANYCAST, queue2, null, true, false);
 
-      sendMessages(1, address.toString());
+      sendMessages(address.toString(), 1);
 
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
@@ -132,7 +131,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport  {
       server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
       server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
 
-      sendMessages(1, address.toString());
+      sendMessages(address.toString(), 1);
 
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
@@ -152,7 +151,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport  {
    public void testConsumeWhenOnlyMulticast() throws Exception {
       server.addAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
 
-      sendMessages(1, address.toString());
+      sendMessages(address.toString(), 1);
 
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
@@ -195,7 +194,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport  {
       AmqpConnection connection = addConnection(client.connect());
       AmqpSession session = connection.createSession();
       AmqpReceiver receiver = session.createReceiver(address.toString());
-      sendMessages(1, address.toString());
+      sendMessages(address.toString(), 1);
       receiver.flow(1);
       AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
@@ -223,7 +222,6 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport  {
       connection.close();
    }
 
-
    protected Source createJmsSource(boolean topic) {
 
       Source source = new Source();
@@ -236,5 +234,4 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport  {
 
       return source;
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java
index b413ad8..bdf6258 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,8 +16,13 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
-import org.apache.activemq.artemis.api.core.SimpleString;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
+
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -28,12 +33,6 @@ import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.junit.Test;
 
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
-import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
-
-
 public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport  {
 
    SimpleString address = new SimpleString("testAddress");
@@ -45,7 +44,7 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport  {
       server.addAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
       server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
 
-      sendMessages(1, address.toString());
+      sendMessages(address.toString(), 1);
 
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
@@ -65,7 +64,7 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport  {
    public void testConsumeWhenOnlyAnycast() throws Exception {
       server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
 
-      sendMessages(1, address.toString());
+      sendMessages(address.toString(), 1);
 
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
@@ -102,7 +101,6 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport  {
       connection.close();
    }
 
-
    protected Source createJmsSource(boolean topic) {
 
       Source source = new Source();
@@ -115,5 +113,4 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport  {
 
       return source;
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
index 377cf86..3e504d7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -25,8 +27,6 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.junit.Test;
 
-import java.util.concurrent.TimeUnit;
-
 public class ClientDefinedAnycastConsumerTest  extends AmqpClientTestSupport  {
 
    SimpleString address = new SimpleString("testAddress");
@@ -39,7 +39,7 @@ public class ClientDefinedAnycastConsumerTest  extends AmqpClientTestSupport  {
       AmqpSession session = connection.createSession();
 
       AmqpReceiver receiver = session.createReceiver(address.toString());
-      sendMessages(1, address.toString());
+      sendMessages(address.toString(), 1);
       receiver.flow(1);
       AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
@@ -48,5 +48,4 @@ public class ClientDefinedAnycastConsumerTest  extends AmqpClientTestSupport  {
       receiver.close();
       connection.close();
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
index 84bdb86..51c70ee 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,8 +16,12 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
-import org.apache.activemq.artemis.api.core.SimpleString;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
+
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.tests.util.Wait;
@@ -30,10 +34,6 @@ import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
 import org.junit.Test;
 
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
-
 public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
 
    SimpleString address = new SimpleString("testAddress");
@@ -52,7 +52,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
       AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
       receiver.flow(1);
       receiver2.flow(1);
-      sendMessages(2, address.toString());
+      sendMessages(address.toString(), 2);
       AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
@@ -86,7 +86,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
       AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|1");
       receiver.flow(1);
       receiver2.flow(1);
-      sendMessages(2, address.toString());
+      sendMessages(address.toString(), 2);
       AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
@@ -114,7 +114,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
       AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
       receiver.flow(1);
       receiver2.flow(1);
-      sendMessages(2, address.toString());
+      sendMessages(address.toString(), 2);
       AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
@@ -145,7 +145,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
       AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
       receiver.flow(1);
       receiver2.flow(1);
-      sendMessages(2, address.toString());
+      sendMessages(address.toString(), 2);
       AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
@@ -178,7 +178,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
       AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
       receiver.flow(1);
       receiver2.flow(1);
-      sendMessages(2, address.toString());
+      sendMessages(address.toString(), 2);
       AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
@@ -206,7 +206,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
       AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
       receiver.flow(1);
       receiver2.flow(1);
-      sendMessages(2, address.toString());
+      sendMessages(address.toString(), 2);
       AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
@@ -244,7 +244,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
       AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
       receiver.flow(1);
       receiver2.flow(1);
-      sendMessages(2, address.toString());
+      sendMessages(address.toString(), 2);
       AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
@@ -282,7 +282,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
       AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
       receiver.flow(1);
       receiver2.flow(1);
-      sendMessages(2, address.toString());
+      sendMessages(address.toString(), 2);
       AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
@@ -313,7 +313,10 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
          fail("Exception expected");
       } catch (Exception e) {
          //expected
+      } finally {
+         receiver.close();
       }
+
       connection.close();
    }
 
@@ -331,7 +334,10 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
          fail("Exception expected");
       } catch (Exception e) {
          //expected
+      } finally {
+         receiver.close();
       }
+
       connection.close();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
new file mode 100644
index 0000000..7de05aa
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.net.URI;
+import java.util.LinkedList;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
+
+   protected LinkedList<Connection> jmsConnections = new LinkedList<>();
+
+   @Before
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+
+      // Bug in Qpid JMS not shutting down a connection thread on certain errors
+      // TODO - Reevaluate after Qpid JMS 0.23.0 is released.
+      disableCheckThread();
+   }
+
+   @After
+   @Override
+   public void tearDown() throws Exception {
+      for (Connection connection : jmsConnections) {
+         try {
+            connection.close();
+         } catch (Throwable ignored) {
+            ignored.printStackTrace();
+         }
+      }
+      jmsConnections.clear();
+
+      super.tearDown();
+   }
+
+   protected Connection trackJMSConnection(Connection connection) {
+      jmsConnections.add(connection);
+
+      return connection;
+   }
+
+   protected String getJmsConnectionURIOptions() {
+      return "";
+   }
+
+   protected URI getBrokerQpidJMSConnectionURI() {
+      boolean webSocket = false;
+
+      try {
+         int port = AMQP_PORT;
+
+         String uri = null;
+
+         if (isUseSSL()) {
+            if (webSocket) {
+               uri = "amqpwss://127.0.0.1:" + port;
+            } else {
+               uri = "amqps://127.0.0.1:" + port;
+            }
+         } else {
+            if (webSocket) {
+               uri = "amqpws://127.0.0.1:" + port;
+            } else {
+               uri = "amqp://127.0.0.1:" + port;
+            }
+         }
+
+         if (!getJmsConnectionURIOptions().isEmpty()) {
+            uri = uri + "?" + getJmsConnectionURIOptions();
+         }
+
+         return new URI(uri);
+      } catch (Exception e) {
+         throw new RuntimeException();
+      }
+   }
+
+   protected Connection createConnection() throws JMSException {
+      return createConnection(getBrokerQpidJMSConnectionURI(), null, null, null, true);
+   }
+
+   protected Connection createConnection(boolean start) throws JMSException {
+      return createConnection(getBrokerQpidJMSConnectionURI(), null, null, null, start);
+   }
+
+   protected Connection createConnection(String clientId) throws JMSException {
+      return createConnection(getBrokerQpidJMSConnectionURI(), null, null, clientId, true);
+   }
+
+   protected Connection createConnection(String clientId, boolean start) throws JMSException {
+      return createConnection(getBrokerQpidJMSConnectionURI(), null, null, clientId, start);
+   }
+
+   protected Connection createConnection(String username, String password) throws JMSException {
+      return createConnection(getBrokerQpidJMSConnectionURI(), username, password, null, true);
+   }
+
+   protected Connection createConnection(String username, String password, String clientId) throws JMSException {
+      return createConnection(getBrokerQpidJMSConnectionURI(), username, password, clientId, true);
+   }
+
+   protected Connection createConnection(String username, String password, String clientId, boolean start) throws JMSException {
+      return createConnection(getBrokerQpidJMSConnectionURI(), username, password, clientId, start);
+   }
+
+   private Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean start) throws JMSException {
+      JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+
+      Connection connection = trackJMSConnection(factory.createConnection(username, password));
+
+      connection.setExceptionListener(new ExceptionListener() {
+         @Override
+         public void onException(JMSException exception) {
+            exception.printStackTrace();
+         }
+      });
+
+      if (clientId != null && !clientId.isEmpty()) {
+         connection.setClientID(clientId);
+      }
+
+      if (start) {
+         connection.start();
+      }
+
+      return connection;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionTest.java
new file mode 100644
index 0000000..e261468
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.InvalidClientIDException;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Test;
+
+public class JMSConnectionTest extends JMSClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testConnection() throws Exception {
+      Connection connection = createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(getQueueName());
+         session.createConsumer(queue);
+
+         Queue queueView = getProxyToQueue(getQueueName());
+
+         assertTrue("Connection not counted", Wait.waitFor(() -> server.getConnectionCount() == 1));
+         assertTrue("Consumer not counted", Wait.waitFor(() -> queueView.getConsumerCount() == 1));
+
+         assertEquals(1, queueView.getConsumerCount());
+
+         connection.close();
+
+         assertTrue("Consumer not closed", Wait.waitFor(() -> queueView.getConsumerCount() == 0));
+         assertTrue("Connection not released", Wait.waitFor(() -> server.getConnectionCount() == 0));
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testClientIDsAreExclusive() throws Exception {
+      Connection testConn1 = createConnection(false);
+      Connection testConn2 = createConnection(false);
+
+      try {
+         testConn1.setClientID("client-id1");
+         try {
+            testConn1.setClientID("client-id2");
+            fail("didn't get expected exception");
+         } catch (javax.jms.IllegalStateException e) {
+            // expected
+         }
+
+         try {
+            testConn2.setClientID("client-id1");
+            fail("didn't get expected exception");
+         } catch (InvalidClientIDException e) {
+            // expected
+         }
+      } finally {
+         testConn1.close();
+         testConn2.close();
+      }
+
+      try {
+         testConn1 = createConnection(false);
+         testConn2 = createConnection(false);
+         testConn1.setClientID("client-id1");
+         testConn2.setClientID("client-id2");
+      } finally {
+         testConn1.close();
+         testConn2.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testParallelConnections() throws Exception {
+      final int numThreads = 40;
+      ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+      for (int i = 0; i < numThreads; i++) {
+         executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+
+               try {
+                  Connection connection = createConnection(fullUser, fullPass);
+                  connection.start();
+                  connection.close();
+               } catch (JMSException e) {
+                  e.printStackTrace();
+               }
+            }
+         });
+      }
+
+      executorService.shutdown();
+      assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionWithSecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionWithSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionWithSecurityTest.java
new file mode 100644
index 0000000..bfd31ac
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionWithSecurityTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.junit.Test;
+
+public class JMSConnectionWithSecurityTest extends JMSClientTestSupport {
+
+   @Override
+   protected boolean isSecurityEnabled() {
+      return true;
+   }
+
+   @Test(timeout = 10000)
+   public void testNoUserOrPassword() throws Exception {
+      try {
+         Connection connection = createConnection("", "", null, false);
+         connection.start();
+         fail("Expected JMSException");
+      } catch (JMSSecurityException ex) {
+         IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with no user / password.");
+      }
+   }
+
+   @Test(timeout = 10000)
+   public void testUnknownUser() throws Exception {
+      try {
+         Connection connection = createConnection("nosuchuser", "blah", null, false);
+         connection.start();
+         fail("Expected JMSException");
+      } catch (JMSSecurityException ex) {
+         IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with unknown user ID");
+      }
+   }
+
+   @Test(timeout = 10000)
+   public void testKnownUserWrongPassword() throws Exception {
+      try {
+         Connection connection = createConnection(fullUser, "wrongPassword", null, false);
+         connection.start();
+         fail("Expected JMSException");
+      } catch (JMSSecurityException ex) {
+         IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with incorrect password.");
+      }
+   }
+
+   @Test(timeout = 30000)
+   public void testRepeatedWrongPasswordAttempts() throws Exception {
+      for (int i = 0; i < 25; ++i) {
+         Connection connection = null;
+         try {
+            connection = createConnection(fullUser, "wrongPassword", null, false);
+            connection.start();
+            fail("Expected JMSException");
+         } catch (JMSSecurityException ex) {
+            IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with incorrect password.");
+         } finally {
+            if (connection != null) {
+               connection.close();
+            }
+         }
+      }
+   }
+
+   @Test(timeout = 30000)
+   public void testSendReceive() throws Exception {
+      Connection connection = createConnection(fullUser, fullPass);
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(getQueueName());
+         MessageProducer p = session.createProducer(queue);
+         TextMessage message = null;
+         message = session.createTextMessage();
+         String messageText = "hello  sent at " + new java.util.Date().toString();
+         message.setText(messageText);
+         p.send(message);
+
+         // Get the message we just sent
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+         Message msg = consumer.receive(5000);
+         assertNotNull(msg);
+         assertTrue(msg instanceof TextMessage);
+         TextMessage textMessage = (TextMessage) msg;
+         assertEquals(messageText, textMessage.getText());
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 30000)
+   public void testCreateTemporaryQueueNotAuthorized() throws JMSException {
+      Connection connection = createConnection(guestUser, guestPass);
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         try {
+            session.createTemporaryQueue();
+         } catch (JMSSecurityException jmsse) {
+         } catch (JMSException jmse) {
+            IntegrationTestLogger.LOGGER.info("Client should have thrown a JMSSecurityException but only threw JMSException");
+         }
+
+         // Should not be fatal
+         assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 30000)
+   public void testCreateTemporaryTopicNotAuthorized() throws JMSException {
+      Connection connection = createConnection(guestUser, guestPass);
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         try {
+            session.createTemporaryTopic();
+         } catch (JMSSecurityException jmsse) {
+         } catch (JMSException jmse) {
+            IntegrationTestLogger.LOGGER.info("Client should have thrown a JMSSecurityException but only threw JMSException");
+         }
+
+         // Should not be fatal
+         assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
+      } finally {
+         connection.close();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java
new file mode 100644
index 0000000..26097f6
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Test;
+
+public class JMSDurableConsumerTest extends JMSClientTestSupport {
+
+   @Test(timeout = 30000)
+   public void testDurableConsumerAsync() throws Exception {
+      final CountDownLatch latch = new CountDownLatch(1);
+      final AtomicReference<Message> received = new AtomicReference<>();
+      String durableClientId = getTopicName() + "-ClientId";
+
+      Connection connection = createConnection(durableClientId);
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(getTopicName());
+         MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
+         consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+               received.set(message);
+               latch.countDown();
+            }
+         });
+
+         MessageProducer producer = session.createProducer(topic);
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+         connection.start();
+
+         TextMessage message = session.createTextMessage();
+         message.setText("hello");
+         producer.send(message);
+
+         assertTrue(latch.await(10, TimeUnit.SECONDS));
+         assertNotNull("Should have received a message by now.", received.get());
+         assertTrue("Should be an instance of TextMessage", received.get() instanceof TextMessage);
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 30000)
+   public void testDurableConsumerSync() throws Exception {
+      String durableClientId = getTopicName() + "-ClientId";
+
+      Connection connection = createConnection(durableClientId);
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(getTopicName());
+         final MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
+         MessageProducer producer = session.createProducer(topic);
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+         connection.start();
+
+         TextMessage message = session.createTextMessage();
+         message.setText("hello");
+         producer.send(message);
+
+         final AtomicReference<Message> msg = new AtomicReference<>();
+         assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisfied() throws Exception {
+               msg.set(consumer.receiveNoWait());
+               return msg.get() != null;
+            }
+         }, TimeUnit.SECONDS.toMillis(25), TimeUnit.MILLISECONDS.toMillis(200)));
+
+         assertNotNull("Should have received a message by now.", msg.get());
+         assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 30000)
+   public void testDurableConsumerUnsubscribe() throws Exception {
+      String durableClientId = getTopicName() + "-ClientId";
+
+      Connection connection = createConnection(durableClientId);
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(getTopicName());
+         MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
+
+         assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisfied() throws Exception {
+               return server.getTotalConsumerCount() == 1;
+            }
+         }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250)));
+
+         consumer.close();
+
+         assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisfied() throws Exception {
+               return server.getTotalConsumerCount() == 0;
+            }
+         }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250)));
+
+         session.unsubscribe("DurbaleTopic");
+         assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisfied() throws Exception {
+               return server.getTotalConsumerCount() == 0;
+            }
+         }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250)));
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 30000)
+   public void testDurableConsumerUnsubscribeWhileNoSubscription() throws Exception {
+      Connection connection = createConnection();
+
+      try {
+         connection.start();
+
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisfied() throws Exception {
+               return server.getTotalConsumerCount() == 0;
+            }
+         }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250)));
+
+         try {
+            session.unsubscribe("DurbaleTopic");
+            fail("Should have thrown as subscription is in use.");
+         } catch (JMSException ex) {
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 30000)
+   public void testDurableConsumerUnsubscribeWhileActive() throws Exception {
+      String durableClientId = getTopicName() + "-ClientId";
+
+      Connection connection = createConnection(durableClientId);
+      try {
+         connection.start();
+
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(getTopicName());
+         MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
+
+         assertNotNull(consumer);
+         assertNull(consumer.receive(10));
+
+         try {
+            session.unsubscribe("DurbaleTopic");
+            fail("Should have thrown as subscription is in use.");
+         } catch (JMSException ex) {
+         }
+      } finally {
+         connection.close();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
new file mode 100644
index 0000000..c5372ac
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSMessageConsumerTest extends JMSClientTestSupport {
+
+   protected static final Logger LOG = LoggerFactory.getLogger(JMSMessageConsumerTest.class);
+
+   @Test(timeout = 60000)
+   public void testSelector() throws Exception {
+      Connection connection = createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(getQueueName());
+         MessageProducer producer = session.createProducer(queue);
+
+         TextMessage message = session.createTextMessage();
+         message.setText("msg:0");
+         producer.send(message);
+         message = session.createTextMessage();
+         message.setText("msg:1");
+         message.setStringProperty("color", "RED");
+         producer.send(message);
+
+         connection.start();
+
+         MessageConsumer messageConsumer = session.createConsumer(queue, "color = 'RED'");
+         TextMessage m = (TextMessage) messageConsumer.receive(5000);
+         assertNotNull(m);
+         assertEquals("msg:1", m.getText());
+         assertEquals(m.getStringProperty("color"), "RED");
+      } finally {
+         connection.close();
+      }
+   }
+
+   @SuppressWarnings("rawtypes")
+   @Test(timeout = 30000)
+   public void testSelectorsWithJMSType() throws Exception {
+      Connection connection = createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(getQueueName());
+         MessageProducer p = session.createProducer(queue);
+
+         TextMessage message = session.createTextMessage();
+         message.setText("text");
+         p.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+         TextMessage message2 = session.createTextMessage();
+         String type = "myJMSType";
+         message2.setJMSType(type);
+         message2.setText("text + type");
+         p.send(message2, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+         QueueBrowser browser = session.createBrowser(queue);
+         Enumeration enumeration = browser.getEnumeration();
+         int count = 0;
+         while (enumeration.hasMoreElements()) {
+            Message m = (Message) enumeration.nextElement();
+            assertTrue(m instanceof TextMessage);
+            count++;
+         }
+
+         assertEquals(2, count);
+
+         MessageConsumer consumer = session.createConsumer(queue, "JMSType = '" + type + "'");
+         Message msg = consumer.receive(2000);
+         assertNotNull(msg);
+         assertTrue(msg instanceof TextMessage);
+         assertEquals("Unexpected JMSType value", type, msg.getJMSType());
+         assertEquals("Unexpected message content", "text + type", ((TextMessage) msg).getText());
+      } finally {
+         connection.close();
+      }
+   }
+
+   @SuppressWarnings("rawtypes")
+   @Test(timeout = 30000)
+   public void testSelectorsWithJMSPriority() throws Exception {
+      Connection connection = createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(getQueueName());
+         MessageProducer p = session.createProducer(queue);
+
+         TextMessage message = session.createTextMessage();
+         message.setText("hello");
+         p.send(message, DeliveryMode.PERSISTENT, 5, 0);
+
+         message = session.createTextMessage();
+         message.setText("hello + 9");
+         p.send(message, DeliveryMode.PERSISTENT, 9, 0);
+
+         QueueBrowser browser = session.createBrowser(queue);
+         Enumeration enumeration = browser.getEnumeration();
+         int count = 0;
+         while (enumeration.hasMoreElements()) {
+            Message m = (Message) enumeration.nextElement();
+            assertTrue(m instanceof TextMessage);
+            count++;
+         }
+
+         assertEquals(2, count);
+
+         MessageConsumer consumer = session.createConsumer(queue, "JMSPriority > 8");
+         Message msg = consumer.receive(2000);
+         assertNotNull(msg);
+         assertTrue(msg instanceof TextMessage);
+         assertEquals("hello + 9", ((TextMessage) msg).getText());
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testJMSSelectorFiltersJMSMessageID() throws Exception {
+      Connection connection = createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(getQueueName());
+         MessageProducer producer = session.createProducer(queue);
+
+         // Send one to receive
+         TextMessage message = session.createTextMessage();
+         producer.send(message);
+
+         // Send another to filter
+         producer.send(session.createTextMessage());
+
+         connection.start();
+
+         // First one should make it through
+         MessageConsumer messageConsumer = session.createConsumer(queue, "JMSMessageID = '" + message.getJMSMessageID() + "'");
+         TextMessage m = (TextMessage) messageConsumer.receive(5000);
+         assertNotNull(m);
+         assertEquals(message.getJMSMessageID(), m.getJMSMessageID());
+
+         // The second one should not be received.
+         assertNull(messageConsumer.receive(1000));
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testZeroPrefetchWithTwoConsumers() throws Exception {
+      JmsConnection connection = (JmsConnection) createConnection();
+      ((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setAll(0);
+      connection.start();
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+
+      MessageProducer producer = session.createProducer(queue);
+      producer.send(session.createTextMessage("Msg1"));
+      producer.send(session.createTextMessage("Msg2"));
+
+      // now lets receive it
+      MessageConsumer consumer1 = session.createConsumer(queue);
+      MessageConsumer consumer2 = session.createConsumer(queue);
+      TextMessage answer = (TextMessage) consumer1.receive(5000);
+      assertNotNull(answer);
+      assertEquals("Should have received a message!", answer.getText(), "Msg1");
+      answer = (TextMessage) consumer2.receive(5000);
+      assertNotNull(answer);
+      assertEquals("Should have received a message!", answer.getText(), "Msg2");
+
+      answer = (TextMessage) consumer2.receiveNoWait();
+      assertNull("Should have not received a message!", answer);
+   }
+
+   @Test(timeout = 30000)
+   public void testProduceAndConsumeLargeNumbersOfTopicMessagesClientAck() throws Exception {
+      doTestProduceAndConsumeLargeNumbersOfMessages(true, Session.CLIENT_ACKNOWLEDGE);
+   }
+
+   @Test(timeout = 30000)
+   public void testProduceAndConsumeLargeNumbersOfQueueMessagesClientAck() throws Exception {
+      doTestProduceAndConsumeLargeNumbersOfMessages(false, Session.CLIENT_ACKNOWLEDGE);
+   }
+
+   @Test(timeout = 30000)
+   public void testProduceAndConsumeLargeNumbersOfTopicMessagesAutoAck() throws Exception {
+      doTestProduceAndConsumeLargeNumbersOfMessages(true, Session.AUTO_ACKNOWLEDGE);
+   }
+
+   @Test(timeout = 30000)
+   public void testProduceAndConsumeLargeNumbersOfQueueMessagesAutoAck() throws Exception {
+      doTestProduceAndConsumeLargeNumbersOfMessages(false, Session.AUTO_ACKNOWLEDGE);
+   }
+
+   public void doTestProduceAndConsumeLargeNumbersOfMessages(boolean topic, int ackMode) throws Exception {
+
+      final int MSG_COUNT = 1000;
+      final CountDownLatch done = new CountDownLatch(MSG_COUNT);
+
+      JmsConnection connection = (JmsConnection) createConnection();
+      connection.setForceAsyncSend(true);
+      connection.start();
+
+      Session session = connection.createSession(false, ackMode);
+      final Destination destination;
+      if (topic) {
+         destination = session.createTopic(getTopicName());
+      } else {
+         destination = session.createQueue(getQueueName());
+      }
+
+      MessageConsumer consumer = session.createConsumer(destination);
+      consumer.setMessageListener(new MessageListener() {
+
+         @Override
+         public void onMessage(Message message) {
+            try {
+               message.acknowledge();
+               done.countDown();
+            } catch (JMSException ex) {
+               LOG.info("Caught exception.", ex);
+            }
+         }
+      });
+
+      MessageProducer producer = session.createProducer(destination);
+
+      TextMessage textMessage = session.createTextMessage();
+      textMessage.setText("messageText");
+
+      for (int i = 0; i < MSG_COUNT; i++) {
+         producer.send(textMessage);
+      }
+
+      assertTrue("Did not receive all messages: " + MSG_COUNT, done.await(15, TimeUnit.SECONDS));
+   }
+
+   @Test(timeout = 60000)
+   public void testPrefetchedMessagesAreNotConsumedOnConsumerClose() throws Exception {
+      final int NUM_MESSAGES = 10;
+
+      Connection connection = createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(getQueueName());
+         MessageProducer producer = session.createProducer(queue);
+
+         byte[] bytes = new byte[2048];
+         new Random().nextBytes(bytes);
+         for (int i = 0; i < NUM_MESSAGES; i++) {
+            TextMessage message = session.createTextMessage();
+            message.setText("msg:" + i);
+            producer.send(message);
+         }
+
+         connection.close();
+
+         Queue queueView = getProxyToQueue(getQueueName());
+         assertTrue("Not all messages were enqueud", Wait.waitFor(() -> queueView.getMessageCount() == NUM_MESSAGES));
+
+         // Create a consumer and prefetch the messages
+         connection = createConnection();
+         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         Thread.sleep(100);
+
+         consumer.close();
+         connection.close();
+
+         assertTrue("Not all messages were enqueud", Wait.waitFor(() -> queueView.getMessageCount() == NUM_MESSAGES));
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testMessagesReceivedInParallel() throws Throwable {
+      final int numMessages = 50000;
+      long time = System.currentTimeMillis();
+
+      final ArrayList<Throwable> exceptions = new ArrayList<>();
+
+      Thread t = new Thread(new Runnable() {
+         @Override
+         public void run() {
+            Connection connectionConsumer = null;
+            try {
+               connectionConsumer = createConnection();
+               connectionConsumer.start();
+               Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               final javax.jms.Queue queue = sessionConsumer.createQueue(getQueueName());
+               final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+               long n = 0;
+               int count = numMessages;
+               while (count > 0) {
+                  try {
+                     if (++n % 1000 == 0) {
+                        System.out.println("received " + n + " messages");
+                     }
+
+                     Message m = consumer.receive(5000);
+                     Assert.assertNotNull("Could not receive message count=" + count + " on consumer", m);
+                     count--;
+                  } catch (JMSException e) {
+                     e.printStackTrace();
+                     break;
+                  }
+               }
+            } catch (Throwable e) {
+               exceptions.add(e);
+               e.printStackTrace();
+            } finally {
+               try {
+                  connectionConsumer.close();
+               } catch (Throwable ignored) {
+                  // NO OP
+               }
+            }
+         }
+      });
+
+      Connection connection = createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue queue = session.createQueue(getQueueName());
+
+      t.start();
+
+      MessageProducer p = session.createProducer(queue);
+      p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      for (int i = 0; i < numMessages; i++) {
+         BytesMessage message = session.createBytesMessage();
+         message.writeUTF("Hello world!!!!" + i);
+         message.setIntProperty("count", i);
+         p.send(message);
+      }
+
+      // Wait for the consumer thread to completely read the Queue
+      t.join();
+
+      if (!exceptions.isEmpty()) {
+         throw exceptions.get(0);
+      }
+
+      Queue queueView = getProxyToQueue(getQueueName());
+
+      connection.close();
+      assertTrue("Not all messages consumed", Wait.waitFor(() -> queueView.getMessageCount() == 0));
+
+      long taken = (System.currentTimeMillis() - time);
+      System.out.println("Microbenchamrk ran in " + taken + " milliseconds, sending/receiving " + numMessages);
+
+      double messagesPerSecond = ((double) numMessages / (double) taken) * 1000;
+
+      System.out.println(((int) messagesPerSecond) + " messages per second");
+   }
+
+   @Test(timeout = 60000)
+   public void testClientAckMessages() throws Exception {
+      final int numMessages = 10;
+
+      Connection connection = createConnection();
+
+      try {
+         long time = System.currentTimeMillis();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(getQueueName());
+         MessageProducer producer = session.createProducer(queue);
+
+         byte[] bytes = new byte[2048];
+         new Random().nextBytes(bytes);
+         for (int i = 0; i < numMessages; i++) {
+            TextMessage message = session.createTextMessage();
+            message.setText("msg:" + i);
+            producer.send(message);
+         }
+         connection.close();
+         Queue queueView = getProxyToQueue(getQueueName());
+
+         assertTrue("Not all messages enqueued", Wait.waitFor(() -> queueView.getMessageCount() == numMessages));
+
+         // Now create a new connection and receive and acknowledge
+         connection = createConnection();
+         session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         for (int i = 0; i < numMessages; i++) {
+            Message msg = consumer.receive(5000);
+            if (msg == null) {
+               System.out.println("ProtonTest.testManyMessages");
+            }
+            Assert.assertNotNull("" + i, msg);
+            Assert.assertTrue("" + msg, msg instanceof TextMessage);
+            String text = ((TextMessage) msg).getText();
+            // System.out.println("text = " + text);
+            Assert.assertEquals(text, "msg:" + i);
+            msg.acknowledge();
+         }
+
+         consumer.close();
+         connection.close();
+
+         // Wait for Acks to be processed and message removed from queue.
+         Thread.sleep(500);
+
+         assertTrue("Not all messages consumed", Wait.waitFor(() -> queueView.getMessageCount() == 0));
+         long taken = (System.currentTimeMillis() - time) / 1000;
+         System.out.println("taken = " + taken);
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 240000)
+   public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable {
+      String name = "exampleQueue1";
+
+      final int numMessages = 40;
+
+      Connection connection = createConnection();
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(name);
+         MessageProducer producer = session.createProducer(queue);
+         for (int i = 0; i < numMessages; i++) {
+            TextMessage message = session.createTextMessage();
+            message.setText("Message temporary");
+            producer.send(message);
+         }
+         producer.close();
+         session.close();
+
+         for (int i = 0; i < numMessages; i++) {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            queue = session.createQueue(name);
+            MessageConsumer c = session.createConsumer(queue);
+            c.receive(1000);
+            producer.close();
+            session.close();
+         }
+
+         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         queue = session.createQueue(name);
+         MessageConsumer c = session.createConsumer(queue);
+         for (int i = 0; i < numMessages; i++) {
+            c.receive(1000);
+         }
+         producer.close();
+         session.close();
+      } finally {
+         connection.close();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java
new file mode 100644
index 0000000..628c814
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSMessageGroupsTest extends JMSClientTestSupport {
+
+   protected static final Logger LOG = LoggerFactory.getLogger(JMSMessageGroupsTest.class);
+
+   private static final int ITERATIONS = 10;
+   private static final int MESSAGE_COUNT = 10;
+   private static final int MESSAGE_SIZE = 10 * 1024;
+   private static final int RECEIVE_TIMEOUT = 3000;
+   private static final String JMSX_GROUP_ID = "JmsGroupsTest";
+
+   @Test(timeout = 60000)
+   public void testGroupSeqIsNeverLost() throws Exception {
+      AtomicInteger sequenceCounter = new AtomicInteger();
+
+      for (int i = 0; i < ITERATIONS; ++i) {
+         Connection connection = createConnection();
+         try {
+            sendMessagesToBroker(connection, MESSAGE_COUNT, sequenceCounter);
+            readMessagesOnBroker(connection, MESSAGE_COUNT);
+         } finally {
+            connection.close();
+         }
+      }
+   }
+
+   protected void readMessagesOnBroker(Connection connection, int count) throws Exception {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      for (int i = 0; i < MESSAGE_COUNT; ++i) {
+         Message message = consumer.receive(RECEIVE_TIMEOUT);
+         assertNotNull(message);
+         LOG.debug("Read message #{}: type = {}", i, message.getClass().getSimpleName());
+         String gid = message.getStringProperty("JMSXGroupID");
+         String seq = message.getStringProperty("JMSXGroupSeq");
+         LOG.debug("Message assigned JMSXGroupID := {}", gid);
+         LOG.debug("Message assigned JMSXGroupSeq := {}", seq);
+      }
+
+      session.close();
+   }
+
+   protected void sendMessagesToBroker(Connection connection, int count, AtomicInteger sequence) throws Exception {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+      MessageProducer producer = session.createProducer(queue);
+
+      byte[] buffer = new byte[MESSAGE_SIZE];
+      for (count = 0; count < MESSAGE_SIZE; count++) {
+         String s = String.valueOf(count % 10);
+         Character c = s.charAt(0);
+         int value = c.charValue();
+         buffer[count] = (byte) value;
+      }
+
+      LOG.debug("Sending {} messages to destination: {}", MESSAGE_COUNT, queue);
+      for (int i = 1; i <= MESSAGE_COUNT; i++) {
+         BytesMessage message = session.createBytesMessage();
+         message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+         message.setStringProperty("JMSXGroupID", JMSX_GROUP_ID);
+         message.setIntProperty("JMSXGroupSeq", sequence.incrementAndGet());
+         message.writeBytes(buffer);
+         producer.send(message);
+      }
+
+      session.close();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java
new file mode 100644
index 0000000..2287238
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JMSMessageProducerTest extends JMSClientTestSupport {
+
+   @Test(timeout = 30000)
+   public void testAnonymousProducer() throws Exception {
+      Connection connection = createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue1 = session.createQueue(getQueueName(1));
+         Queue queue2 = session.createQueue(getQueueName(2));
+         MessageProducer p = session.createProducer(null);
+
+         TextMessage message = session.createTextMessage();
+         message.setText("hello");
+         p.send(queue1, message);
+         p.send(queue2, message);
+
+         {
+            MessageConsumer consumer = session.createConsumer(queue1);
+            Message msg = consumer.receive(2000);
+            assertNotNull(msg);
+            assertTrue(msg instanceof TextMessage);
+            consumer.close();
+         }
+         {
+            MessageConsumer consumer = session.createConsumer(queue2);
+            Message msg = consumer.receive(2000);
+            assertNotNull(msg);
+            assertTrue(msg instanceof TextMessage);
+            consumer.close();
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testAnonymousProducerAcrossManyDestinations() throws Exception {
+      Connection connection = createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer p = session.createProducer(null);
+
+         for (int i = 1; i <= getPrecreatedQueueSize(); i++) {
+            javax.jms.Queue target = session.createQueue(getQueueName(i));
+            TextMessage message = session.createTextMessage("message for " + target.getQueueName());
+            p.send(target, message);
+         }
+
+         connection.start();
+
+         MessageConsumer messageConsumer = session.createConsumer(session.createQueue(getQueueName()));
+         Message m = messageConsumer.receive(200);
+         Assert.assertNull(m);
+
+         for (int i = 1; i <= getPrecreatedQueueSize(); i++) {
+            javax.jms.Queue target = session.createQueue(getQueueName(i));
+            MessageConsumer consumer = session.createConsumer(target);
+            TextMessage tm = (TextMessage) consumer.receive(2000);
+            assertNotNull(tm);
+            assertEquals("message for " + target.getQueueName(), tm.getText());
+            consumer.close();
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testSendingBigMessage() throws Exception {
+      Connection connection = createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(getQueueName());
+         MessageProducer sender = session.createProducer(queue);
+
+         String body = createMessage(10240);
+         sender.send(session.createTextMessage(body));
+         connection.start();
+
+         MessageConsumer consumer = session.createConsumer(queue);
+         TextMessage m = (TextMessage) consumer.receive(5000);
+
+         assertEquals(body, m.getText());
+      } finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testSendWithTimeToLiveExpiresToDLQ() throws Exception {
+      Connection connection = createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(getQueueName());
+
+         MessageProducer sender = session.createProducer(queue);
+         sender.setTimeToLive(1);
+
+         Message message = session.createMessage();
+         sender.send(message);
+         connection.start();
+
+         MessageConsumer consumer = session.createConsumer(session.createQueue(getDeadLetterAddress()));
+         Message m = consumer.receive(10000);
+         assertNotNull(m);
+         consumer.close();
+
+         consumer = session.createConsumer(queue);
+         m = consumer.receiveNoWait();
+         assertNull(m);
+         consumer.close();
+      } finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testReplyToUsingQueue() throws Throwable {
+      Connection connection = createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         TemporaryQueue queue = session.createTemporaryQueue();
+         MessageProducer p = session.createProducer(queue);
+
+         TextMessage message = session.createTextMessage();
+         message.setText("Message temporary");
+         message.setJMSReplyTo(session.createQueue(getQueueName()));
+         p.send(message);
+
+         MessageConsumer cons = session.createConsumer(queue);
+         connection.start();
+
+         message = (TextMessage) cons.receive(5000);
+         assertNotNull(message);
+         Destination jmsReplyTo = message.getJMSReplyTo();
+         assertNotNull(jmsReplyTo);
+         assertNotNull(message);
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testReplyToUsingTempQueue() throws Throwable {
+      Connection connection = createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         TemporaryQueue queue = session.createTemporaryQueue();
+         MessageProducer p = session.createProducer(queue);
+
+         TextMessage message = session.createTextMessage();
+         message.setText("Message temporary");
+         message.setJMSReplyTo(session.createTemporaryQueue());
+         p.send(message);
+
+         MessageConsumer cons = session.createConsumer(queue);
+         connection.start();
+
+         message = (TextMessage) cons.receive(5000);
+         Destination jmsReplyTo = message.getJMSReplyTo();
+         assertNotNull(jmsReplyTo);
+         assertNotNull(message);
+      } finally {
+         connection.close();
+      }
+   }
+
+   private static String createMessage(int messageSize) {
+      final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+      Random rnd = new Random();
+      StringBuilder sb = new StringBuilder(messageSize);
+      for (int j = 0; j < messageSize; j++) {
+         sb.append(AB.charAt(rnd.nextInt(AB.length())));
+      }
+      String body = sb.toString();
+      return body;
+   }
+}


Mime
View raw message