activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [03/15] activemq-artemis git commit: ARTEMIS-751 Simplification of the AMQP implementation
Date Tue, 27 Sep 2016 13:54:30 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
new file mode 100644
index 0000000..53e676f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -0,0 +1,1548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.InvalidClientIDException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.QueueBrowser;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.VersionLoader;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
+
+@RunWith(Parameterized.class)
+public class ProtonTest extends ProtonTestBase {
+
+   private static final String amqpConnectionUri = "amqp://localhost:5672";
+
+   private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
+
+   private static final String userName = "guest";
+
+   private static final String password = "guest";
+
+
+   private static final String brokerName = "my-broker";
+
+   private static final long maxSizeBytes = 1 * 1024 * 1024;
+
+   private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024;
+
+   private int messagesSent = 0;
+
+   // this will ensure that all tests in this class are run twice,
+   // once with "true" passed to the class' constructor and once with "false"
+   @Parameterized.Parameters(name = "{0}")
+   public static Collection getParameters() {
+
+      // these 3 are for comparison
+      return Arrays.asList(new Object[][]{{"AMQP", 0}, {"AMQP_ANONYMOUS", 3}});
+   }
+
+   ConnectionFactory factory;
+
+   private final int protocol;
+
+   public ProtonTest(String name, int protocol) {
+      this.coreAddress = "jms.queue.exampleQueue";
+      this.protocol = protocol;
+      if (protocol == 0 || protocol == 3) {
+         this.address = coreAddress;
+      }
+      else {
+         this.address = "exampleQueue";
+      }
+   }
+
+   private final String coreAddress;
+   private final String address;
+   private Connection connection;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      server.createQueue(new SimpleString(coreAddress), new SimpleString(coreAddress), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "1"), new SimpleString(coreAddress + "1"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "2"), new SimpleString(coreAddress + "2"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "3"), new SimpleString(coreAddress + "3"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "4"), new SimpleString(coreAddress + "4"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "5"), new SimpleString(coreAddress + "5"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "6"), new SimpleString(coreAddress + "6"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "7"), new SimpleString(coreAddress + "7"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "8"), new SimpleString(coreAddress + "8"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "9"), new SimpleString(coreAddress + "9"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "10"), new SimpleString(coreAddress + "10"), null, true, false);
+      server.createQueue(new SimpleString("amqp_testtopic"), new SimpleString("amqp_testtopic"), null, true, false);
+      server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false);
+      server.createQueue(new SimpleString("amqp_testtopic" + "2"), new SimpleString("amqp_testtopic" + "2"), null, true, false);
+      server.createQueue(new SimpleString("amqp_testtopic" + "3"), new SimpleString("amqp_testtopic" + "3"), null, true, false);
+      server.createQueue(new SimpleString("amqp_testtopic" + "4"), new SimpleString("amqp_testtopic" + "4"), null, true, false);
+      server.createQueue(new SimpleString("amqp_testtopic" + "5"), new SimpleString("amqp_testtopic" + "5"), null, true, false);
+      server.createQueue(new SimpleString("amqp_testtopic" + "6"), new SimpleString("amqp_testtopic" + "6"), null, true, false);
+      server.createQueue(new SimpleString("amqp_testtopic" + "7"), new SimpleString("amqp_testtopic" + "7"), null, true, false);
+      server.createQueue(new SimpleString("amqp_testtopic" + "8"), new SimpleString("amqp_testtopic" + "8"), null, true, false);
+      server.createQueue(new SimpleString("amqp_testtopic" + "9"), new SimpleString("amqp_testtopic" + "9"), null, true, false);
+      server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);
+      connection = createConnection();
+
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         Thread.sleep(250);
+         if (connection != null) {
+            connection.close();
+         }
+      }
+      finally {
+         super.tearDown();
+      }
+   }
+
+   @Test
+   public void testDurableSubscriptionUnsubscribe() throws Exception {
+      Connection connection = createConnection("myClientId");
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic("amqp_testtopic");
+         TopicSubscriber myDurSub = session.createDurableSubscriber(topic, "myDurSub");
+         session.close();
+         connection.close();
+         connection = createConnection("myClientId");
+         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         myDurSub = session.createDurableSubscriber(topic, "myDurSub");
+         myDurSub.close();
+         Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub")));
+         session.unsubscribe("myDurSub");
+         Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub")));
+         session.close();
+         connection.close();
+      }
+      finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
+   @Test
+   public void testTemporarySubscriptionDeleted() throws Exception {
+      try {
+         TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic("amqp_testtopic");
+         TopicSubscriber myDurSub = session.createSubscriber(topic);
+         Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString("amqp_testtopic"));
+         Assert.assertEquals(2, bindingsForAddress.getBindings().size());
+         session.close();
+         final CountDownLatch latch = new CountDownLatch(1);
+         server.getRemotingService().getConnections().iterator().next().addCloseListener(new CloseListener() {
+            @Override
+            public void connectionClosed() {
+               latch.countDown();
+            }
+         });
+         connection.close();
+         latch.await(5, TimeUnit.SECONDS);
+         bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString("amqp_testtopic"));
+         Assert.assertEquals(1, bindingsForAddress.getBindings().size());
+      }
+      finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
+   @Test
+   public void testBrokerContainerId() throws Exception {
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.connect();
+      try {
+         assertTrue(brokerName.equals(amqpConnection.getEndpoint().getRemoteContainer()));
+      }
+      finally {
+         amqpConnection.close();
+      }
+   }
+
+   @Test
+   public void testBrokerConnectionProperties() throws Exception {
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.connect();
+      try {
+         Map<Symbol, Object> properties = amqpConnection.getEndpoint().getRemoteProperties();
+         assertTrue(properties != null);
+         if (properties != null) {
+            assertTrue("apache-activemq-artemis".equals(properties.get(Symbol.valueOf("product"))));
+            assertTrue(VersionLoader.getVersion().getFullVersion().equals(properties.get(Symbol.valueOf("version"))));
+         }
+      }
+      finally {
+         amqpConnection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testConnectionCarriesExpectedCapabilities() throws Exception {
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      assertNotNull(client);
+
+      client.setValidator(new AmqpValidator() {
+
+         @Override
+         public void inspectOpenedResource(org.apache.qpid.proton.engine.Connection connection) {
+
+            Symbol[] offered = connection.getRemoteOfferedCapabilities();
+
+            if (!contains(offered, DELAYED_DELIVERY)) {
+               markAsInvalid("Broker did not indicate it support delayed message delivery");
+               return;
+            }
+
+            Map<Symbol, Object> properties = connection.getRemoteProperties();
+            if (!properties.containsKey(PRODUCT)) {
+               markAsInvalid("Broker did not send a queue product name value");
+               return;
+            }
+
+            if (!properties.containsKey(VERSION)) {
+               markAsInvalid("Broker did not send a queue version value");
+               return;
+            }
+         }
+      });
+
+      AmqpConnection connection = client.connect();
+      try {
+         assertNotNull(connection);
+         connection.getStateInspector().assertValid();
+      }
+      finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testSendWithDeliveryTimeHoldsMessage() throws Exception {
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      assertNotNull(client);
+
+      AmqpConnection connection = client.connect();
+      try {
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender(address);
+         AmqpReceiver receiver = session.createReceiver(address);
+
+         AmqpMessage message = new AmqpMessage();
+         long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5);
+         message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
+         message.setText("Test-Message");
+         sender.send(message);
+
+         // Now try and get the message
+         receiver.flow(1);
+
+         // Shouldn't get this since we delayed the message.
+         assertNull(receiver.receive(5, TimeUnit.SECONDS));
+      }
+      finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws Exception {
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      assertNotNull(client);
+
+      AmqpConnection connection = client.connect();
+      try {
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender(address);
+         AmqpReceiver receiver = session.createReceiver(address);
+
+         AmqpMessage message = new AmqpMessage();
+         long deliveryTime = System.currentTimeMillis() + 2000;
+         message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
+         message.setText("Test-Message");
+         sender.send(message);
+
+         // Now try and get the message
+         receiver.flow(1);
+
+         AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+         assertNotNull(received);
+         received.accept();
+         Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time");
+         assertNotNull(msgDeliveryTime);
+         assertEquals(deliveryTime, msgDeliveryTime.longValue());
+      }
+      finally {
+         connection.close();
+      }
+   }
+
+   @Test
+   public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception {
+      // Only allow 1 credit to be submitted at a time.
+      Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
+      maxCreditAllocation.setAccessible(true);
+      int originalMaxCreditAllocation = maxCreditAllocation.getInt(null);
+      maxCreditAllocation.setInt(null, 1);
+
+      String destinationAddress = address + 1;
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.connect();
+      try {
+         AmqpSession session = amqpConnection.createSession();
+         AmqpSender sender = session.createSender(destinationAddress);
+         assertTrue(sender.getSender().getCredit() == 1);
+      }
+      finally {
+         amqpConnection.close();
+         maxCreditAllocation.setInt(null, originalMaxCreditAllocation);
+      }
+   }
+
+   @Test
+   public void testTemporaryQueue() throws Throwable {
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      TemporaryQueue queue = session.createTemporaryQueue();
+      System.out.println("queue:" + queue.getQueueName());
+      MessageProducer p = session.createProducer(queue);
+
+      TextMessage message = session.createTextMessage();
+      message.setText("Message temporary");
+      p.send(message);
+
+      MessageConsumer cons = session.createConsumer(queue);
+      connection.start();
+
+      message = (TextMessage) cons.receive(5000);
+      Assert.assertNotNull(message);
+   }
+
+   @Test
+   public void testCommitProducer() throws Throwable {
+
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      javax.jms.Queue queue = createQueue(address);
+      System.out.println("queue:" + queue.getQueueName());
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("Message:" + i);
+         p.send(message);
+      }
+      session.commit();
+      session.close();
+      Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
+      Assert.assertEquals(q.getMessageCount(), 10);
+   }
+
+   @Test
+   public void testRollbackProducer() throws Throwable {
+
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      javax.jms.Queue queue = createQueue(address);
+      System.out.println("queue:" + queue.getQueueName());
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("Message:" + i);
+         p.send(message);
+      }
+      session.rollback();
+      session.close();
+      Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
+      Assert.assertEquals(q.getMessageCount(), 0);
+   }
+
+   @Test
+   public void testCommitConsumer() throws Throwable {
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue queue = createQueue(address);
+      System.out.println("queue:" + queue.getQueueName());
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("Message:" + i);
+         p.send(message);
+      }
+      session.close();
+
+      session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      MessageConsumer cons = session.createConsumer(queue);
+      connection.start();
+
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = (TextMessage) cons.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals("Message:" + i, message.getText());
+      }
+      session.commit();
+      Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
+      Assert.assertEquals(q.getMessageCount(), 0);
+   }
+
+
+   @Test
+   public void testRollbackConsumer() throws Throwable {
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue queue = createQueue(address);
+      System.out.println("queue:" + queue.getQueueName());
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("Message:" + i);
+         p.send(message);
+      }
+      session.close();
+
+      session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      MessageConsumer cons = session.createConsumer(queue);
+      connection.start();
+
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = (TextMessage) cons.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals("Message:" + i, message.getText());
+      }
+      session.rollback();
+      Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
+      Assert.assertEquals(q.getMessageCount(), 10);
+   }
+
+   @Test
+   public void testResourceLimitExceptionOnAddressFull() throws Exception {
+      setAddressFullBlockPolicy();
+      String destinationAddress = address + 1;
+      fillAddress(destinationAddress);
+
+      long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
+      assertTrue(addressSize >= maxSizeBytesRejectThreshold);
+   }
+
+   @Test
+   public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
+      setAddressFullBlockPolicy();
+
+      String destinationAddress = address + 1;
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Destination d = session.createQueue(destinationAddress);
+      MessageProducer p = session.createProducer(d);
+
+      fillAddress(destinationAddress);
+
+      Exception e = null;
+      try {
+         p.send(session.createBytesMessage());
+      }
+      catch (ResourceAllocationException rae) {
+         e = rae;
+      }
+      assertTrue(e instanceof ResourceAllocationException);
+      assertTrue(e.getMessage().contains("resource-limit-exceeded"));
+
+      long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
+      assertTrue(addressSize >= maxSizeBytesRejectThreshold);
+   }
+
+   @Test
+   public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
+      setAddressFullBlockPolicy();
+
+      // Only allow 1 credit to be submitted at a time.
+      Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
+      maxCreditAllocation.setAccessible(true);
+      int originalMaxCreditAllocation = maxCreditAllocation.getInt(null);
+      maxCreditAllocation.setInt(null, 1);
+
+      String destinationAddress = address + 1;
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.connect();
+      try {
+         AmqpSession session = amqpConnection.createSession();
+         AmqpSender sender = session.createSender(destinationAddress);
+
+         // Use blocking send to ensure buffered messages do not interfere with credit.
+         sender.setSendTimeout(-1);
+         sendUntilFull(sender);
+
+         // This should be -1. A single message is buffered in the client, and 0 credit has been allocated.
+         assertTrue(sender.getSender().getCredit() == -1);
+
+         long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
+         assertTrue(addressSize >= maxSizeBytes && addressSize <= maxSizeBytesRejectThreshold);
+      }
+      finally {
+         amqpConnection.close();
+         maxCreditAllocation.setInt(null, originalMaxCreditAllocation);
+      }
+   }
+
+   @Test
+   public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception {
+      setAddressFullBlockPolicy();
+
+      String destinationAddress = address + 1;
+      fillAddress(destinationAddress);
+
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = amqpConnection = client.connect();
+      try {
+         AmqpSession session = amqpConnection.createSession();
+         AmqpSender sender = session.createSender(destinationAddress);
+
+         // Wait for a potential flow frame.
+         Thread.sleep(500);
+         assertEquals(0, sender.getSender().getCredit());
+
+         // Empty Address except for 1 message used later.
+         AmqpReceiver receiver = session.createReceiver(destinationAddress);
+         receiver.flow(100);
+
+         AmqpMessage m;
+         for (int i = 0; i < messagesSent - 1; i++) {
+            m = receiver.receive();
+            m.accept();
+         }
+
+         // Wait for address to unblock and flow frame to arrive
+         Thread.sleep(500);
+
+         assertTrue(sender.getSender().getCredit() >= 0);
+      }
+      finally {
+         amqpConnection.close();
+      }
+   }
+
+   @Test
+   public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws Exception {
+      setAddressFullBlockPolicy();
+
+      fillAddress(address + 1);
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.connect();
+      try {
+         AmqpSession session = amqpConnection.createSession();
+         AmqpSender sender = session.createSender(address + 1);
+         // Wait for a potential flow frame.
+         Thread.sleep(1000);
+         assertEquals(0, sender.getSender().getCredit());
+      }
+      finally {
+         amqpConnection.close();
+      }
+   }
+
+   @Test
+   public void testTxIsRolledBackOnRejectedPreSettledMessage() throws Throwable {
+      setAddressFullBlockPolicy();
+
+      // Create the link attach before filling the address to ensure the link is allocated credit.
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.connect();
+
+      AmqpSession session = amqpConnection.createSession();
+      AmqpSender sender = session.createSender(address);
+      sender.setPresettle(true);
+
+      fillAddress(address);
+
+      final AmqpMessage message = new AmqpMessage();
+      byte[] payload = new byte[50 * 1024];
+      message.setBytes(payload);
+
+      Exception expectedException = null;
+      try {
+         session.begin();
+         sender.send(message);
+         session.commit();
+      }
+      catch (Exception e) {
+         expectedException = e;
+      }
+      finally {
+         amqpConnection.close();
+      }
+
+      assertNotNull(expectedException);
+      assertTrue(expectedException.getMessage().contains("resource-limit-exceeded"));
+      assertTrue(expectedException.getMessage().contains("Address is full: " + address));
+   }
+
+   /**
+    * Fills an address.  Careful when using this method.  Only use when rejected messages are switched on.
+    * @param address
+    * @return
+    * @throws Exception
+    */
+   private void fillAddress(String address) throws Exception {
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.connect();
+      Exception exception = null;
+      try {
+         AmqpSession session = amqpConnection.createSession();
+         AmqpSender sender = session.createSender(address);
+         sendUntilFull(sender);
+      }
+      catch (Exception e) {
+         exception = e;
+      }
+      finally {
+         amqpConnection.close();
+      }
+
+      // Should receive a rejected error
+      assertNotNull(exception);
+      assertTrue(exception.getMessage().contains("amqp:resource-limit-exceeded"));
+   }
+
+   private void sendUntilFull(final AmqpSender sender) throws Exception {
+      final AmqpMessage message = new AmqpMessage();
+      byte[] payload = new byte[50 * 1024];
+      message.setBytes(payload);
+
+      final int maxMessages = 50;
+      final AtomicInteger sentMessages = new AtomicInteger(0);
+      final Exception[] errors = new Exception[1];
+      final CountDownLatch timeout = new CountDownLatch(1);
+
+      Runnable sendMessages = new Runnable() {
+         @Override
+         public void run() {
+            try {
+               for (int i = 0; i < maxMessages; i++) {
+                  sender.send(message);
+                  sentMessages.getAndIncrement();
+               }
+               timeout.countDown();
+            }
+            catch (IOException e) {
+               errors[0] = e;
+            }
+         }
+      };
+
+      Thread t = new Thread(sendMessages);
+      t.start();
+
+      timeout.await(5, TimeUnit.SECONDS);
+
+      messagesSent = sentMessages.get();
+      if (errors[0] != null) {
+         throw errors[0];
+      }
+   }
+
+   @Test
+   public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception {
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.connect();
+      AmqpSession session = amqpConnection.createSession();
+
+      Exception expectedException = null;
+      try {
+         session.createSender("AnAddressThatDoesNotExist");
+      }
+      catch (Exception e) {
+         expectedException = e;
+      }
+
+      assertNotNull(expectedException);
+      assertTrue(expectedException.getMessage().contains("amqp:not-found"));
+      assertTrue(expectedException.getMessage().contains("target address does not exist"));
+   }
+
+   @Test
+   public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception {
+
+      String queueName = "TestQueueName";
+      String address = "TestAddress";
+
+      server.createQueue(new SimpleString(address), new SimpleString(queueName), null, true, false);
+
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.connect();
+      AmqpSession session = amqpConnection.createSession();
+      AmqpSender sender = session.createSender(address);
+      AmqpReceiver receiver = session.createReceiver(queueName);
+      receiver.flow(1);
+
+      AmqpMessage message = new AmqpMessage();
+      message.setText("TestPayload");
+      sender.send(message);
+
+      AmqpMessage receivedMessage = receiver.receive();
+      assertNotNull(receivedMessage);
+   }
+
+   @Test
+   public void testManagementQueryOverAMQP() throws Throwable {
+
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.connect();
+      try {
+         String destinationAddress = address + 1;
+         AmqpSession session = amqpConnection.createSession();
+         AmqpSender sender = session.createSender("jms.queue.activemq.management");
+         AmqpReceiver receiver = session.createReceiver(destinationAddress);
+         receiver.flow(10);
+
+         //create request message for getQueueNames query
+         AmqpMessage request = new AmqpMessage();
+         request.setApplicationProperty("_AMQ_ResourceName", "core.server");
+         request.setApplicationProperty("_AMQ_OperationName", "getQueueNames");
+         request.setApplicationProperty("JMSReplyTo", destinationAddress);
+         request.setText("[]");
+
+         sender.send(request);
+         AmqpMessage response = receiver.receive(50, TimeUnit.SECONDS);
+         Assert.assertNotNull(response);
+         assertNotNull(response);
+         Object section = response.getWrappedMessage().getBody();
+         assertTrue(section instanceof AmqpValue);
+         Object value = ((AmqpValue) section).getValue();
+         assertTrue(value instanceof String);
+         assertTrue(((String) value).length() > 0);
+         assertTrue(((String) value).contains(destinationAddress));
+         response.accept();
+      }
+      finally {
+         amqpConnection.close();
+      }
+   }
+
+   @Test
+   public void testReplyTo() throws Throwable {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      TemporaryQueue queue = session.createTemporaryQueue();
+      MessageProducer p = session.createProducer(queue);
+
+      TextMessage message = session.createTextMessage();
+      message.setText("Message temporary");
+      message.setJMSReplyTo(createQueue(address));
+      p.send(message);
+
+      MessageConsumer cons = session.createConsumer(queue);
+      connection.start();
+
+      message = (TextMessage) cons.receive(5000);
+      Destination jmsReplyTo = message.getJMSReplyTo();
+      Assert.assertNotNull(jmsReplyTo);
+      Assert.assertNotNull(message);
+   }
+
+   @Test
+   public void testReplyToNonJMS() throws Throwable {
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      TemporaryQueue queue = session.createTemporaryQueue();
+      System.out.println("queue:" + queue.getQueueName());
+      MessageProducer p = session.createProducer(queue);
+
+      TextMessage message = session.createTextMessage();
+      message.setText("Message temporary");
+      message.setJMSReplyTo(createQueue("someaddress"));
+      p.send(message);
+
+      MessageConsumer cons = session.createConsumer(queue);
+      connection.start();
+
+      message = (TextMessage) cons.receive(5000);
+      Destination jmsReplyTo = message.getJMSReplyTo();
+      Assert.assertNotNull(jmsReplyTo);
+      Assert.assertNotNull(message);
+
+   }
+
+   /*
+   // Uncomment testLoopBrowser to validate the hunging on the test
+   @Test
+   public void testLoopBrowser() throws Throwable {
+      for (int i = 0 ; i < 1000; i++) {
+         System.out.println("#test " + i);
+         testBrowser();
+         tearDown();
+         setUp();
+      }
+   } */
+
+   /**
+    * This test eventually fails because of: https://issues.apache.org/jira/browse/QPID-4901
+    *
+    * @throws Throwable
+    */
+   //@Test // TODO: re-enable this when we can get a version free of QPID-4901 bug
+   public void testBrowser() throws Throwable {
+
+      boolean success = false;
+
+      for (int i = 0; i < 10; i++) {
+         // As this test was hunging, we added a protection here to fail it instead.
+         // it seems something on the qpid client, so this failure belongs to them and we can ignore it on
+         // our side (ActiveMQ)
+         success = runWithTimeout(new RunnerWithEX() {
+            @Override
+            public void run() throws Throwable {
+               int numMessages = 50;
+               javax.jms.Queue queue = createQueue(address);
+               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               MessageProducer p = session.createProducer(queue);
+               for (int i = 0; i < numMessages; i++) {
+                  TextMessage message = session.createTextMessage();
+                  message.setText("msg:" + i);
+                  p.send(message);
+               }
+
+               connection.close();
+               Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
+
+               connection = createConnection();
+               session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+               QueueBrowser browser = session.createBrowser(queue);
+               Enumeration enumeration = browser.getEnumeration();
+               int count = 0;
+               while (enumeration.hasMoreElements()) {
+                  Message msg = (Message) enumeration.nextElement();
+                  Assert.assertNotNull("" + count, msg);
+                  Assert.assertTrue("" + msg, msg instanceof TextMessage);
+                  String text = ((TextMessage) msg).getText();
+                  Assert.assertEquals(text, "msg:" + count++);
+               }
+               Assert.assertEquals(count, numMessages);
+               connection.close();
+               Assert.assertEquals(getMessageCount(q), numMessages);
+            }
+         }, 5000);
+
+         if (success) {
+            break;
+         }
+         else {
+            System.err.println("Had to make it fail!!!");
+            tearDown();
+            setUp();
+         }
+      }
+
+      // There is a bug on the qpid client library currently, we can expect having to interrupt the thread on browsers.
+      // but we can't have it on 10 iterations... something must be broken if that's the case
+      Assert.assertTrue("Test had to interrupt on all occasions.. this is beyond the expected for the test", success);
+   }
+
+   @Test
+   public void testConnection() throws Exception {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageConsumer cons = session.createConsumer(createQueue(address));
+
+      org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(SimpleString.toSimpleString(coreAddress));
+
+      assertEquals(1, serverQueue.getConsumerCount());
+
+      cons.close();
+
+      for (int i = 0; i < 100 && serverQueue.getConsumerCount() != 0; i++) {
+         Thread.sleep(500);
+      }
+
+      assertEquals(0, serverQueue.getConsumerCount());
+
+      session.close();
+
+   }
+
+   @Test
+   public void testMessagesSentTransactional() throws Exception {
+      int numMessages = 1000;
+      javax.jms.Queue queue = createQueue(address);
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      MessageProducer p = session.createProducer(queue);
+      byte[] bytes = new byte[2048];
+      new Random().nextBytes(bytes);
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("msg:" + i);
+         p.send(message);
+      }
+      session.commit();
+      connection.close();
+      Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
+      for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && getMessageCount(q) != numMessages; ) {
+         Thread.sleep(1);
+      }
+      Assert.assertEquals(numMessages, getMessageCount(q));
+   }
+
+   @Test
+   public void testMessagesSentTransactionalRolledBack() throws Exception {
+      int numMessages = 1;
+      javax.jms.Queue queue = createQueue(address);
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      MessageProducer p = session.createProducer(queue);
+      byte[] bytes = new byte[2048];
+      new Random().nextBytes(bytes);
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("msg:" + i);
+         p.send(message);
+      }
+      session.close();
+      connection.close();
+      Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
+      Assert.assertEquals(getMessageCount(q), 0);
+   }
+
+   @Test
+   public void testCancelMessages() throws Exception {
+      int numMessages = 10;
+      long time = System.currentTimeMillis();
+      javax.jms.Queue queue = createQueue(address);
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer p = session.createProducer(queue);
+      byte[] bytes = new byte[2048];
+      new Random().nextBytes(bytes);
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("msg:" + i);
+         p.send(message);
+      }
+      connection.close();
+      Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
+
+      for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && getMessageCount(q) != numMessages; ) {
+         Thread.sleep(1);
+      }
+
+      Assert.assertEquals(numMessages, getMessageCount(q));
+      //now create a new connection and receive
+      connection = createConnection();
+      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer = session.createConsumer(queue);
+      Thread.sleep(100);
+      consumer.close();
+      connection.close();
+      Assert.assertEquals(numMessages, getMessageCount(q));
+      long taken = (System.currentTimeMillis() - time) / 1000;
+      System.out.println("taken = " + taken);
+   }
+
+   @Test
+   public void testClientAckMessages() throws Exception {
+      int numMessages = 10;
+      long time = System.currentTimeMillis();
+      javax.jms.Queue queue = createQueue(address);
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer p = session.createProducer(queue);
+      byte[] bytes = new byte[2048];
+      new Random().nextBytes(bytes);
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("msg:" + i);
+         p.send(message);
+      }
+      connection.close();
+      Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
+
+      for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && getMessageCount(q) != numMessages; ) {
+         Thread.sleep(1);
+      }
+      Assert.assertEquals(numMessages, getMessageCount(q));
+      //now create a new connection and receive
+      connection = createConnection();
+      session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      MessageConsumer consumer = session.createConsumer(queue);
+      for (int i = 0; i < numMessages; i++) {
+         Message msg = consumer.receive(5000);
+         if (msg == null) {
+            System.out.println("ProtonTest.testManyMessages");
+         }
+         Assert.assertNotNull("" + i, msg);
+         Assert.assertTrue("" + msg, msg instanceof TextMessage);
+         String text = ((TextMessage) msg).getText();
+         //System.out.println("text = " + text);
+         Assert.assertEquals(text, "msg:" + i);
+         msg.acknowledge();
+      }
+
+      consumer.close();
+      connection.close();
+
+      // Wait for Acks to be processed and message removed from queue.
+      Thread.sleep(500);
+
+      Assert.assertEquals(0, getMessageCount(q));
+      long taken = (System.currentTimeMillis() - time) / 1000;
+      System.out.println("taken = " + taken);
+   }
+
+   @Test
+   public void testMessagesReceivedInParallel() throws Throwable {
+      final int numMessages = 50000;
+      long time = System.currentTimeMillis();
+      final javax.jms.Queue queue = createQueue(address);
+
+      final ArrayList<Throwable> exceptions = new ArrayList<>();
+
+      Thread t = new Thread(new Runnable() {
+         @Override
+         public void run() {
+            Connection connectionConsumer = null;
+            try {
+               // TODO the test may starve if using the same connection (dead lock maybe?)
+               connectionConsumer = createConnection();
+               //               connectionConsumer = connection;
+               connectionConsumer.start();
+               Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+               long n = 0;
+               int count = numMessages;
+               while (count > 0) {
+                  try {
+                     if (++n % 1000 == 0) {
+                        System.out.println("received " + n + " messages");
+                     }
+
+                     Message m = consumer.receive(5000);
+                     Assert.assertNotNull("Could not receive message count=" + count + " on consumer", m);
+                     count--;
+                  }
+                  catch (JMSException e) {
+                     e.printStackTrace();
+                     break;
+                  }
+               }
+            }
+            catch (Throwable e) {
+               exceptions.add(e);
+               e.printStackTrace();
+            }
+            finally {
+               try {
+                  // if the createconnecion wasn't commented out
+                  if (connectionConsumer != connection) {
+                     connectionConsumer.close();
+                  }
+               }
+               catch (Throwable ignored) {
+                  // NO OP
+               }
+            }
+         }
+      });
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      t.start();
+
+      MessageProducer p = session.createProducer(queue);
+      p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      for (int i = 0; i < numMessages; i++) {
+         BytesMessage message = session.createBytesMessage();
+         message.writeUTF("Hello world!!!!" + i);
+         message.setIntProperty("count", i);
+         p.send(message);
+      }
+      t.join();
+
+      for (Throwable e : exceptions) {
+         throw e;
+      }
+      Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
+
+      connection.close();
+      Assert.assertEquals(0, getMessageCount(q));
+
+      long taken = (System.currentTimeMillis() - time);
+      System.out.println("Microbenchamrk ran in " + taken + " milliseconds, sending/receiving " + numMessages);
+
+      double messagesPerSecond = ((double) numMessages / (double) taken) * 1000;
+
+      System.out.println(((int) messagesPerSecond) + " messages per second");
+
+   }
+
+   @Test
+   public void testSimpleBinary() throws Throwable {
+      final int numMessages = 500;
+      long time = System.currentTimeMillis();
+      final javax.jms.Queue queue = createQueue(address);
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      byte[] bytes = new byte[0xf + 1];
+      for (int i = 0; i <= 0xf; i++) {
+         bytes[i] = (byte) i;
+      }
+
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < numMessages; i++) {
+         System.out.println("Sending " + i);
+         BytesMessage message = session.createBytesMessage();
+
+         message.writeBytes(bytes);
+         message.setIntProperty("count", i);
+         p.send(message);
+      }
+
+      Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+      for (int i = 0; i < numMessages; i++) {
+         BytesMessage m = (BytesMessage) consumer.receive(5000);
+         Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
+
+         m.reset();
+
+         long size = m.getBodyLength();
+         byte[] bytesReceived = new byte[(int) size];
+         m.readBytes(bytesReceived);
+
+         System.out.println("Received " + ByteUtil.bytesToHex(bytesReceived, 1) + " count - " + m.getIntProperty("count"));
+
+         Assert.assertArrayEquals(bytes, bytesReceived);
+      }
+
+      //      assertEquals(0, q.getMessageCount());
+      long taken = (System.currentTimeMillis() - time) / 1000;
+      System.out.println("taken = " + taken);
+   }
+
+   @Test
+   public void testSimpleDefault() throws Throwable {
+      final int numMessages = 500;
+      long time = System.currentTimeMillis();
+      final javax.jms.Queue queue = createQueue(address);
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      byte[] bytes = new byte[0xf + 1];
+      for (int i = 0; i <= 0xf; i++) {
+         bytes[i] = (byte) i;
+      }
+
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < numMessages; i++) {
+         System.out.println("Sending " + i);
+         Message message = session.createMessage();
+
+         message.setIntProperty("count", i);
+         p.send(message);
+      }
+
+      Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+      for (int i = 0; i < numMessages; i++) {
+         Message m = consumer.receive(5000);
+         Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
+      }
+
+      //      assertEquals(0, q.getMessageCount());
+      long taken = (System.currentTimeMillis() - time) / 1000;
+      System.out.println("taken = " + taken);
+   }
+
+   @Test
+   public void testSimpleMap() throws Throwable {
+      final int numMessages = 100;
+      long time = System.currentTimeMillis();
+      final javax.jms.Queue queue = createQueue(address);
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < numMessages; i++) {
+         System.out.println("Sending " + i);
+         MapMessage message = session.createMapMessage();
+
+         message.setInt("i", i);
+         message.setIntProperty("count", i);
+         p.send(message);
+      }
+
+      Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+      for (int i = 0; i < numMessages; i++) {
+         MapMessage m = (MapMessage) consumer.receive(5000);
+         Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
+
+         Assert.assertEquals(i, m.getInt("i"));
+         Assert.assertEquals(i, m.getIntProperty("count"));
+      }
+
+      //      assertEquals(0, q.getMessageCount());
+      long taken = (System.currentTimeMillis() - time) / 1000;
+      System.out.println("taken = " + taken);
+   }
+
+   @Test
+   public void testSimpleStream() throws Throwable {
+      final int numMessages = 100;
+      final javax.jms.Queue queue = createQueue(address);
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < numMessages; i++) {
+         StreamMessage message = session.createStreamMessage();
+         message.writeInt(i);
+         message.writeBoolean(true);
+         message.writeString("test");
+         p.send(message);
+      }
+
+      Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+      for (int i = 0; i < numMessages; i++) {
+         StreamMessage m = (StreamMessage) consumer.receive(5000);
+         Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
+
+         Assert.assertEquals(i, m.readInt());
+         Assert.assertEquals(true, m.readBoolean());
+         Assert.assertEquals("test", m.readString());
+      }
+
+   }
+
+   @Test
+   public void testSimpleText() throws Throwable {
+      final int numMessages = 100;
+      long time = System.currentTimeMillis();
+      final javax.jms.Queue queue = createQueue(address);
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < numMessages; i++) {
+         System.out.println("Sending " + i);
+         TextMessage message = session.createTextMessage("text" + i);
+         message.setStringProperty("text", "text" + i);
+         p.send(message);
+      }
+
+      Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage m = (TextMessage) consumer.receive(5000);
+         Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
+         Assert.assertEquals("text" + i, m.getText());
+      }
+
+      //      assertEquals(0, q.getMessageCount());
+      long taken = (System.currentTimeMillis() - time) / 1000;
+      System.out.println("taken = " + taken);
+   }
+
+   @Test
+   public void testSimpleObject() throws Throwable {
+      final int numMessages = 1;
+      long time = System.currentTimeMillis();
+      final javax.jms.Queue queue = createQueue(address);
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < numMessages; i++) {
+         System.out.println("Sending " + i);
+         ObjectMessage message = session.createObjectMessage(new AnythingSerializable(i));
+         p.send(message);
+      }
+
+      Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+      for (int i = 0; i < numMessages; i++) {
+         ObjectMessage msg = (ObjectMessage) consumer.receive(5000);
+         Assert.assertNotNull("Could not receive message count=" + i + " on consumer", msg);
+
+         AnythingSerializable someSerialThing = (AnythingSerializable) msg.getObject();
+         Assert.assertEquals(i, someSerialThing.getCount());
+      }
+
+      //      assertEquals(0, q.getMessageCount());
+      long taken = (System.currentTimeMillis() - time) / 1000;
+      System.out.println("taken = " + taken);
+   }
+
+   @Test
+   public void testSelector() throws Exception {
+      javax.jms.Queue queue = createQueue(address);
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer p = session.createProducer(queue);
+      TextMessage message = session.createTextMessage();
+      message.setText("msg:0");
+      p.send(message);
+      message = session.createTextMessage();
+      message.setText("msg:1");
+      message.setStringProperty("color", "RED");
+      p.send(message);
+      connection.start();
+      MessageConsumer messageConsumer = session.createConsumer(queue, "color = 'RED'");
+      TextMessage m = (TextMessage) messageConsumer.receive(5000);
+      Assert.assertNotNull(m);
+      Assert.assertEquals("msg:1", m.getText());
+      Assert.assertEquals(m.getStringProperty("color"), "RED");
+      connection.close();
+   }
+
+   @Test
+   public void testProperties() throws Exception {
+      javax.jms.Queue queue = createQueue(address);
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer p = session.createProducer(queue);
+      TextMessage message = session.createTextMessage();
+      message.setText("msg:0");
+      message.setBooleanProperty("true", true);
+      message.setBooleanProperty("false", false);
+      message.setStringProperty("foo", "bar");
+      message.setDoubleProperty("double", 66.6);
+      message.setFloatProperty("float", 56.789f);
+      message.setIntProperty("int", 8);
+      message.setByteProperty("byte", (byte) 10);
+      p.send(message);
+      p.send(message);
+      connection.start();
+      MessageConsumer messageConsumer = session.createConsumer(queue);
+      TextMessage m = (TextMessage) messageConsumer.receive(5000);
+      Assert.assertNotNull(m);
+      Assert.assertEquals("msg:0", m.getText());
+      Assert.assertEquals(m.getBooleanProperty("true"), true);
+      Assert.assertEquals(m.getBooleanProperty("false"), false);
+      Assert.assertEquals(m.getStringProperty("foo"), "bar");
+      Assert.assertEquals(m.getDoubleProperty("double"), 66.6, 0.0001);
+      Assert.assertEquals(m.getFloatProperty("float"), 56.789f, 0.0001);
+      Assert.assertEquals(m.getIntProperty("int"), 8);
+      Assert.assertEquals(m.getByteProperty("byte"), (byte) 10);
+      m = (TextMessage) messageConsumer.receive(5000);
+      Assert.assertNotNull(m);
+      connection.close();
+   }
+
+   @Test
+   public void testClientID() throws Exception {
+      Connection testConn1 = createConnection(false);
+      Connection testConn2 = createConnection(false);
+      try {
+         testConn1.setClientID("client-id1");
+         try {
+            testConn1.setClientID("client-id2");
+            fail("didn't get expected exception");
+         }
+         catch (javax.jms.IllegalStateException e) {
+            //expected
+         }
+
+         try {
+            testConn2.setClientID("client-id1");
+            fail("didn't get expected exception");
+         }
+         catch (InvalidClientIDException e) {
+            //expected
+         }
+      }
+      finally {
+         testConn1.close();
+         testConn2.close();
+      }
+
+      try {
+         testConn1 = createConnection(false);
+         testConn2 = createConnection(false);
+         testConn1.setClientID("client-id1");
+         testConn2.setClientID("client-id2");
+      }
+      finally {
+         testConn1.close();
+         testConn2.close();
+      }
+   }
+
+   private javax.jms.Queue createQueue(String address) throws Exception {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      try {
+         return session.createQueue(address);
+      }
+      finally {
+         session.close();
+      }
+   }
+
+   private Connection createConnection() throws JMSException {
+      return this.createConnection(true);
+   }
+
+   private javax.jms.Connection createConnection(boolean isStart) throws JMSException {
+      Connection connection;
+      if (protocol == 3) {
+         factory = new JmsConnectionFactory(amqpConnectionUri);
+         connection = factory.createConnection();
+      }
+      else if (protocol == 0) {
+         factory = new JmsConnectionFactory(userName, password, amqpConnectionUri);
+         connection = factory.createConnection();
+      }
+      else {
+         Assert.fail("protocol = " + protocol + " not supported");
+         return null; // just to compile, the previous statement will throw an exception
+      }
+      if (isStart) {
+         connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+               exception.printStackTrace();
+            }
+         });
+         connection.start();
+      }
+
+      return connection;
+   }
+
+   private javax.jms.Connection createConnection(String clientId) throws JMSException {
+      Connection connection;
+      if (protocol == 3) {
+         factory = new JmsConnectionFactory(amqpConnectionUri);
+         connection = factory.createConnection();
+         connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+               exception.printStackTrace();
+            }
+         });
+         connection.setClientID(clientId);
+         connection.start();
+      }
+      else if (protocol == 0) {
+         factory = new JmsConnectionFactory(userName, password, amqpConnectionUri);
+         connection = factory.createConnection();
+         connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+               exception.printStackTrace();
+            }
+         });
+         connection.setClientID(clientId);
+         connection.start();
+      }
+      else {
+         Assert.fail("protocol = " + protocol + " not supported");
+         return null; // just to compile, the previous statement will throw an exception
+      }
+
+      return connection;
+   }
+
+
+   private void setAddressFullBlockPolicy() {
+      // For BLOCK tests
+      AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
+      addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+      addressSettings.setMaxSizeBytes(maxSizeBytes);
+      addressSettings.setMaxSizeBytesRejectThreshold(maxSizeBytesRejectThreshold);
+      server.getAddressSettingsRepository().addMatch("#", addressSettings);
+   }
+
+   public static class AnythingSerializable implements Serializable {
+
+      private int count;
+
+      public AnythingSerializable(int count) {
+         this.count = count;
+      }
+
+      public int getCount() {
+         return count;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
new file mode 100644
index 0000000..d6369eb
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ProtonTestBase extends ActiveMQTestBase {
+
+   protected String brokerName = "my-broker";
+   protected ActiveMQServer server;
+
+   protected String tcpAmqpConnectionUri = "tcp://localhost:5672";
+   protected String userName = "guest";
+   protected String password = "guest";
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      server = this.createServer(true, true);
+      HashMap<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.PORT_PROP_NAME, "5672");
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
+      HashMap<String, Object> amqpParams = new HashMap<>();
+      configureAmqp(amqpParams);
+      TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams);
+
+      server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
+      server.getConfiguration().setName(brokerName);
+
+      // Default Page
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      server.getConfiguration().getAddressesSettings().put("#", addressSettings);
+
+      server.start();
+   }
+
+   protected void configureAmqp(Map<String, Object> params) {
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         server.stop();
+      }
+      finally {
+         super.tearDown();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestForHeader.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestForHeader.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestForHeader.java
new file mode 100644
index 0000000..a50af0d
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestForHeader.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.fusesource.hawtbuf.Buffer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ProtonTestForHeader extends ActiveMQTestBase {
+
+   private ActiveMQServer server;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server = this.createServer(true, true);
+      HashMap<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.PORT_PROP_NAME, "5672");
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
+      TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+
+      server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
+      server.getConfiguration().setSecurityEnabled(true);
+      server.start();
+      ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
+      securityManager.getConfiguration().addUser("auser", "pass");
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         server.stop();
+      }
+      finally {
+         super.tearDown();
+      }
+   }
+
+   @Test
+   public void testSimpleBytes() throws Exception {
+      final AmqpHeader header = new AmqpHeader();
+
+      header.setProtocolId(0);
+      header.setMajor(1);
+      header.setMinor(0);
+      header.setRevision(0);
+
+      final ClientConnection connection = new ClientConnection();
+      connection.open("localhost", 5672);
+      connection.send(header);
+
+      AmqpHeader response = connection.readAmqpHeader();
+      assertNotNull(response);
+      IntegrationTestLogger.LOGGER.info("Broker responded with: " + response);
+
+      assertTrue("Broker should have closed client connection", Wait.waitFor(new Wait.Condition() {
+
+         @Override
+         public boolean isSatisfied() throws Exception {
+            try {
+               connection.send(header);
+               return false;
+            }
+            catch (Exception e) {
+               return true;
+            }
+         }
+      }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(250)));
+   }
+
+   private class ClientConnection {
+
+      protected static final long RECEIVE_TIMEOUT = 10000;
+      protected Socket clientSocket;
+
+      public void open(String host, int port) throws IOException {
+         clientSocket = new Socket(host, port);
+         clientSocket.setTcpNoDelay(true);
+      }
+
+      public void send(AmqpHeader header) throws Exception {
+         IntegrationTestLogger.LOGGER.info("Client sending header: " + header);
+         OutputStream outputStream = clientSocket.getOutputStream();
+         header.getBuffer().writeTo(outputStream);
+         outputStream.flush();
+      }
+
+      public AmqpHeader readAmqpHeader() throws Exception {
+         clientSocket.setSoTimeout((int) RECEIVE_TIMEOUT);
+         InputStream is = clientSocket.getInputStream();
+
+         byte[] header = new byte[8];
+         int read = is.read(header);
+         if (read == header.length) {
+            return new AmqpHeader(new Buffer(header));
+         }
+         else {
+            return null;
+         }
+      }
+   }
+
+   private class AmqpHeader {
+
+      final Buffer PREFIX = new Buffer(new byte[]{'A', 'M', 'Q', 'P'});
+
+      private Buffer buffer;
+
+      AmqpHeader() {
+         this(new Buffer(new byte[]{'A', 'M', 'Q', 'P', 0, 1, 0, 0}));
+      }
+
+      AmqpHeader(Buffer buffer) {
+         this(buffer, true);
+      }
+
+      AmqpHeader(Buffer buffer, boolean validate) {
+         setBuffer(buffer, validate);
+      }
+
+      public int getProtocolId() {
+         return buffer.get(4) & 0xFF;
+      }
+
+      public void setProtocolId(int value) {
+         buffer.data[buffer.offset + 4] = (byte) value;
+      }
+
+      public int getMajor() {
+         return buffer.get(5) & 0xFF;
+      }
+
+      public void setMajor(int value) {
+         buffer.data[buffer.offset + 5] = (byte) value;
+      }
+
+      public int getMinor() {
+         return buffer.get(6) & 0xFF;
+      }
+
+      public void setMinor(int value) {
+         buffer.data[buffer.offset + 6] = (byte) value;
+      }
+
+      public int getRevision() {
+         return buffer.get(7) & 0xFF;
+      }
+
+      public void setRevision(int value) {
+         buffer.data[buffer.offset + 7] = (byte) value;
+      }
+
+      public Buffer getBuffer() {
+         return buffer;
+      }
+
+      public void setBuffer(Buffer value) {
+         setBuffer(value, true);
+      }
+
+      public void setBuffer(Buffer value, boolean validate) {
+         if (validate && !value.startsWith(PREFIX) || value.length() != 8) {
+            throw new IllegalArgumentException("Not an AMQP header buffer");
+         }
+         buffer = value.buffer();
+      }
+
+      public boolean hasValidPrefix() {
+         return buffer.startsWith(PREFIX);
+      }
+
+      @Override
+      public String toString() {
+         StringBuilder builder = new StringBuilder();
+         for (int i = 0; i < buffer.length(); ++i) {
+            char value = (char) buffer.get(i);
+            if (Character.isLetter(value)) {
+               builder.append(value);
+            }
+            else {
+               builder.append(",");
+               builder.append((int) value);
+            }
+         }
+         return builder.toString();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
new file mode 100644
index 0000000..0c9783e
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SendingAndReceivingTest extends ActiveMQTestBase {
+
+   private ActiveMQServer server;
+
+   @Before
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(true, true);
+      server.start();
+   }
+
+   @After
+   @Override
+   public void tearDown() throws Exception {
+      try {
+         server.stop();
+      }
+      finally {
+         super.tearDown();
+      }
+   }
+
+   //https://issues.apache.org/jira/browse/ARTEMIS-214
+   @Test
+   public void testSendingBigMessage() throws Exception {
+      Connection connection = null;
+      ConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:61616");
+
+      try {
+         connection = connectionFactory.createConnection();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue("jms.queue.exampleQueue");
+         MessageProducer sender = session.createProducer(queue);
+
+         String body = createMessage(10240);
+         sender.send(session.createTextMessage(body));
+         connection.start();
+
+         MessageConsumer consumer = session.createConsumer(queue);
+         TextMessage m = (TextMessage) consumer.receive(5000);
+
+         Assert.assertEquals(body, m.getText());
+      }
+      finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
+   private static String createMessage(int messageSize) {
+      final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+      Random rnd = new Random();
+      StringBuilder sb = new StringBuilder(messageSize);
+      for (int j = 0; j < messageSize; j++ ) {
+         sb.append(AB.charAt(rnd.nextInt(AB.length())));
+      }
+      String body = sb.toString();
+      return body;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/jms/SendingAndReceivingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/jms/SendingAndReceivingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/jms/SendingAndReceivingTest.java
deleted file mode 100644
index d298267..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/jms/SendingAndReceivingTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.amqp.jms;
-
-import java.util.Random;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.qpid.jms.JmsConnectionFactory;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class SendingAndReceivingTest extends ActiveMQTestBase {
-
-   private ActiveMQServer server;
-
-   @Before
-   @Override
-   public void setUp() throws Exception {
-      super.setUp();
-      server = createServer(true, true);
-      server.start();
-   }
-
-   @After
-   @Override
-   public void tearDown() throws Exception {
-      try {
-         server.stop();
-      }
-      finally {
-         super.tearDown();
-      }
-   }
-
-   //https://issues.apache.org/jira/browse/ARTEMIS-214
-   @Test
-   public void testSendingBigMessage() throws Exception {
-      Connection connection = null;
-      ConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:61616");
-
-      try {
-         connection = connectionFactory.createConnection();
-         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Queue queue = session.createQueue("jms.queue.exampleQueue");
-         MessageProducer sender = session.createProducer(queue);
-
-         String body = createMessage(10240);
-         sender.send(session.createTextMessage(body));
-         connection.start();
-
-         MessageConsumer consumer = session.createConsumer(queue);
-         TextMessage m = (TextMessage) consumer.receive(5000);
-
-         Assert.assertEquals(body, m.getText());
-      }
-      finally {
-         if (connection != null) {
-            connection.close();
-         }
-      }
-   }
-
-   private static String createMessage(int messageSize) {
-      final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
-      Random rnd = new Random();
-      StringBuilder sb = new StringBuilder(messageSize);
-      for (int j = 0; j < messageSize; j++ ) {
-         sb.append(AB.charAt(rnd.nextInt(AB.length())));
-      }
-      String body = sb.toString();
-      return body;
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonMaxFrameSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonMaxFrameSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonMaxFrameSizeTest.java
deleted file mode 100644
index 908285e..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonMaxFrameSizeTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.proton;
-
-import org.apache.activemq.transport.amqp.client.AmqpClient;
-import org.apache.activemq.transport.amqp.client.AmqpConnection;
-import org.apache.activemq.transport.amqp.client.AmqpMessage;
-import org.apache.activemq.transport.amqp.client.AmqpReceiver;
-import org.apache.activemq.transport.amqp.client.AmqpSender;
-import org.apache.activemq.transport.amqp.client.AmqpSession;
-import org.apache.qpid.proton.amqp.messaging.Data;
-import org.apache.qpid.proton.message.impl.MessageImpl;
-import org.junit.Test;
-
-import java.net.URI;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class ProtonMaxFrameSizeTest extends ProtonTestBase {
-
-   private static final int FRAME_SIZE = 512;
-
-   @Override
-   protected void configureAmqp(Map<String, Object> params) {
-      params.put("maxFrameSize", FRAME_SIZE);
-   }
-
-   @Test
-   public void testMultipleTransfers() throws Exception {
-
-      String testQueueName = "ConnectionFrameSize";
-      int nMsgs = 200;
-
-      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
-
-
-      AmqpConnection amqpConnection = client.createConnection();
-
-      try {
-         amqpConnection.connect();
-
-         AmqpSession session = amqpConnection.createSession();
-         AmqpSender sender = session.createSender("jms.queue." + testQueueName);
-
-         final int payload = FRAME_SIZE * 16;
-
-         for (int i = 0; i < nMsgs; ++i) {
-            AmqpMessage message = createAmqpMessage((byte) 'A', payload);
-            sender.send(message);
-         }
-
-         int count = getMessageCount(server.getPostOffice(), "jms.queue." + testQueueName);
-         assertEquals(nMsgs, count);
-
-         AmqpReceiver receiver = session.createReceiver("jms.queue." + testQueueName);
-         receiver.flow(nMsgs);
-
-         for (int i = 0; i < nMsgs; ++i) {
-            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-            assertNotNull("failed at " + i, message);
-            MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
-            Data data = (Data) wrapped.getBody();
-            System.out.println("received : message: " + data.getValue().getLength());
-            assertEquals(payload, data.getValue().getLength());
-            message.accept();
-         }
-
-      }
-      finally {
-         amqpConnection.close();
-      }
-   }
-
-   private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
-      AmqpMessage message = new AmqpMessage();
-      byte[] payload = new byte[payloadSize];
-      for (int i = 0; i < payload.length; i++) {
-         payload[i] = value;
-      }
-      message.setBytes(payload);
-      return message;
-   }
-
-}


Mime
View raw message