activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [6/7] activemq-artemis git commit: ARTEMIS-1123 Major AMQP Test Suite refactoring
Date Fri, 28 Apr 2017 09:17:03 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
new file mode 100644
index 0000000..778cd40
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.junit.Test;
+
+public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
+
+   private static final int FRAME_SIZE = 512;
+
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      params.put("maxFrameSize", FRAME_SIZE);
+   }
+
+   @Test(timeout = 60000)
+   public void testMultipleTransfers() throws Exception {
+
+      String testQueueName = "ConnectionFrameSize";
+      int nMsgs = 200;
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         connection.connect();
+
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(testQueueName);
+
+         final int payload = FRAME_SIZE * 16;
+
+         for (int i = 0; i < nMsgs; ++i) {
+            AmqpMessage message = createAmqpMessage((byte) 'A', payload);
+            sender.send(message);
+         }
+
+         int count = getMessageCount(server.getPostOffice(), testQueueName);
+         assertEquals(nMsgs, count);
+
+         AmqpReceiver receiver = session.createReceiver(testQueueName);
+         receiver.flow(nMsgs);
+
+         for (int i = 0; i < nMsgs; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull("failed at " + i, message);
+            MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
+            Data data = (Data) wrapped.getBody();
+            System.out.println("received : message: " + data.getValue().getLength());
+            assertEquals(payload, data.getValue().getLength());
+            message.accept();
+         }
+
+      } finally {
+         connection.close();
+      }
+   }
+
+   private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
+      AmqpMessage message = new AmqpMessage();
+      byte[] payload = new byte[payloadSize];
+      for (int i = 0; i < payload.length; i++) {
+         payload[i] = value;
+      }
+      message.setBytes(payload);
+      return message;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java
new file mode 100644
index 0000000..f895c86
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+public class AmqpMessageDivertsTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testQueueReceiverReadMessageWithDivert() throws Exception {
+      final String forwardingAddress = getQueueName() + "Divert";
+      final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress);
+      server.createQueue(simpleForwardingAddress, RoutingType.ANYCAST, simpleForwardingAddress, null, true, false);
+      server.getActiveMQServerControl().createDivert("name", "routingName", getQueueName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString());
+
+      sendMessages(getQueueName(), 1);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(forwardingAddress);
+
+      Queue queueView = getProxyToQueue(forwardingAddress);
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+      receiver.close();
+
+      assertEquals(1, queueView.getMessageCount());
+
+      connection.close();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
new file mode 100644
index 0000000..fcce0ab
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.junit.Test;
+
+public class AmqpMessageRoutingTest extends JMSClientTestSupport {
+
+   @Override
+   protected boolean isAutoCreateQueues() {
+      return false;
+   }
+
+   @Override
+   protected boolean isAutoCreateAddresses() {
+      return false;
+   }
+
+   @Test(timeout = 60000)
+   public void testAnycastMessageRoutingExclusivityUsingPrefix() throws Exception {
+      final String addressA = "addressA";
+      final String queueA = "queueA";
+      final String queueB = "queueB";
+      final String queueC = "queueC";
+
+      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
+      serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
+      serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
+      serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
+
+      sendMessages("anycast://" + addressA, 1);
+
+      assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+      assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
+   }
+
+   @Test(timeout = 60000)
+   public void testAnycastMessageRoutingExclusivityUsingProperty() throws Exception {
+      final String addressA = "addressA";
+      final String queueA = "queueA";
+      final String queueB = "queueB";
+      final String queueC = "queueC";
+
+      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
+      serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
+      serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
+      serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
+
+      sendMessages(addressA, 1, RoutingType.ANYCAST);
+
+      assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+      assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
+   }
+
+   @Test(timeout = 60000)
+   public void testMulticastMessageRoutingExclusivityUsingPrefix() throws Exception {
+      final String addressA = "addressA";
+      final String queueA = "queueA";
+      final String queueB = "queueB";
+      final String queueC = "queueC";
+
+      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
+      serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
+      serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString());
+      serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
+
+      sendMessages("multicast://" + addressA, 1);
+
+      assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
+      assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+   }
+
+   @Test(timeout = 60000)
+   public void testMulticastMessageRoutingExclusivityUsingProperty() throws Exception {
+      final String addressA = "addressA";
+      final String queueA = "queueA";
+      final String queueB = "queueB";
+      final String queueC = "queueC";
+
+      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
+      serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
+      serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString());
+      serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
+
+      sendMessages(addressA, 1, RoutingType.MULTICAST);
+
+      assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
+      assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+   }
+
+   /**
+    * If we have an address configured with both ANYCAST and MULTICAST routing types enabled, we must ensure that any
+    * messages sent specifically to MULTICAST (e.g. JMS TopicProducer) are only delivered to MULTICAST queues (e.g.
+    * i.e. subscription queues) and **NOT** to ANYCAST queues (e.g. JMS Queue).
+    *
+    * @throws Exception
+    */
+   @Test(timeout = 60000)
+   public void testRoutingExclusivity() throws Exception {
+
+      // Create Address with both ANYCAST and MULTICAST enabled
+      String testAddress = "testRoutingExclusivity-mixed-mode";
+      SimpleString ssTestAddress = new SimpleString(testAddress);
+
+      AddressInfo addressInfo = new AddressInfo(ssTestAddress);
+      addressInfo.addRoutingType(RoutingType.MULTICAST);
+      addressInfo.addRoutingType(RoutingType.ANYCAST);
+
+      server.addAddressInfo(addressInfo);
+      server.createQueue(ssTestAddress, RoutingType.ANYCAST, ssTestAddress, null, true, false);
+
+      Connection connection = createConnection(UUIDGenerator.getInstance().generateStringUUID());
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         Topic topic = session.createTopic(testAddress);
+         javax.jms.Queue queue = session.createQueue(testAddress);
+
+         MessageProducer producer = session.createProducer(topic);
+
+         MessageConsumer queueConsumer = session.createConsumer(queue);
+         MessageConsumer topicConsumer = session.createConsumer(topic);
+
+         producer.send(session.createTextMessage("testMessage"));
+
+         assertNotNull(topicConsumer.receive(1000));
+         assertNull(queueConsumer.receive(1000));
+      } finally {
+         connection.close();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
new file mode 100644
index 0000000..3d8be49
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
+import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
+import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testOutboundConnection() throws Throwable {
+      final ActiveMQServer remote = createServer(AMQP_PORT + 1);
+      remote.start();
+      try {
+         Wait.waitFor(remote::isActive);
+      } catch (Exception e) {
+         remote.stop();
+         throw e;
+      }
+
+      final Map<String, Object> config = new LinkedHashMap<>();
+      config.put(TransportConstants.HOST_PROP_NAME, "localhost");
+      config.put(TransportConstants.PORT_PROP_NAME, String.valueOf(AMQP_PORT + 1));
+      ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000), Optional.empty());
+      ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
+      NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager);
+      connector.start();
+      connector.createConnection();
+
+      try {
+         Wait.waitFor(() -> remote.getConnectionCount() > 0);
+         assertEquals(1, remote.getConnectionCount());
+
+         lifeCycleListener.stop();
+
+         Wait.waitFor(() -> remote.getConnectionCount() == 0);
+         assertEquals(0, remote.getConnectionCount());
+      } finally {
+         lifeCycleListener.stop();
+         remote.stop();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java
index 422e23e..3fd21b1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -192,7 +193,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
       message.setText("Test-Message");
       sender.send(message);
 
-      assertEquals(1, queue.getMessageCount());
+      assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
 
       AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true);
 
@@ -228,7 +229,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
       message.setText("Test-Message");
       sender.send(message);
 
-      assertEquals(1, queue.getMessageCount());
+      assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
 
       AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true);
 
@@ -250,21 +251,4 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
       sender.close();
       connection.close();
    }
-
-   public void sendMessages(String destinationName, int count) throws Exception {
-      AmqpClient client = createAmqpClient();
-      AmqpConnection connection = addConnection(client.connect());
-      try {
-         AmqpSession session = connection.createSession();
-         AmqpSender sender = session.createSender(destinationName);
-
-         for (int i = 0; i < count; ++i) {
-            AmqpMessage message = new AmqpMessage();
-            message.setMessageId("MessageID:" + i);
-            sender.send(message);
-         }
-      } finally {
-         connection.close();
-      }
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpProtocolHeaderHandlingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpProtocolHeaderHandlingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpProtocolHeaderHandlingTest.java
new file mode 100644
index 0000000..e16fd46
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpProtocolHeaderHandlingTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.fusesource.hawtbuf.Buffer;
+import org.junit.Test;
+
+public class AmqpProtocolHeaderHandlingTest extends AmqpClientTestSupport {
+
+   @Override
+   protected boolean isSecurityEnabled() {
+      return true;
+   }
+
+   @Test(timeout = 60000)
+   public void testNonSaslHeaderRejectedOnConnect() throws Exception {
+      final AmqpHeader header = new AmqpHeader();
+
+      header.setProtocolId(0);
+      header.setMajor(1);
+      header.setMinor(0);
+      header.setRevision(0);
+
+      final ClientConnection connection = new ClientConnection();
+      connection.open("localhost", AMQP_PORT);
+      connection.send(header);
+
+      AmqpHeader response = connection.readAmqpHeader();
+      assertNotNull(response);
+      assertEquals(3, response.getProtocolId());
+      IntegrationTestLogger.LOGGER.info("Broker responded with: " + response);
+
+      // pump some bytes down the wire until broker closes the connection
+      assertTrue("Broker should have closed client connection", Wait.waitFor(new Wait.Condition() {
+
+         @Override
+         public boolean isSatisfied() throws Exception {
+            try {
+               connection.send(header);
+               return false;
+            } catch (Exception e) {
+               return true;
+            }
+         }
+      }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(250)));
+   }
+
+   private class ClientConnection {
+
+      protected static final long RECEIVE_TIMEOUT = 10000;
+      protected Socket clientSocket;
+
+      public void open(String host, int port) throws IOException {
+         clientSocket = new Socket(host, port);
+         clientSocket.setTcpNoDelay(true);
+      }
+
+      public void send(AmqpHeader header) throws Exception {
+         IntegrationTestLogger.LOGGER.info("Client sending header: " + header);
+         OutputStream outputStream = clientSocket.getOutputStream();
+         header.getBuffer().writeTo(outputStream);
+         outputStream.flush();
+      }
+
+      public AmqpHeader readAmqpHeader() throws Exception {
+         clientSocket.setSoTimeout((int) RECEIVE_TIMEOUT);
+         InputStream is = clientSocket.getInputStream();
+
+         byte[] header = new byte[8];
+         int read = is.read(header);
+         if (read == header.length) {
+            return new AmqpHeader(new Buffer(header));
+         } else {
+            return null;
+         }
+      }
+   }
+
+   @SuppressWarnings("unused")
+   private class AmqpHeader {
+
+      final Buffer PREFIX = new Buffer(new byte[]{'A', 'M', 'Q', 'P'});
+
+      private Buffer buffer;
+
+      AmqpHeader() {
+         this(new Buffer(new byte[]{'A', 'M', 'Q', 'P', 0, 1, 0, 0}));
+      }
+
+      AmqpHeader(Buffer buffer) {
+         this(buffer, true);
+      }
+
+      AmqpHeader(Buffer buffer, boolean validate) {
+         setBuffer(buffer, validate);
+      }
+
+      public int getProtocolId() {
+         return buffer.get(4) & 0xFF;
+      }
+
+      public void setProtocolId(int value) {
+         buffer.data[buffer.offset + 4] = (byte) value;
+      }
+
+      public int getMajor() {
+         return buffer.get(5) & 0xFF;
+      }
+
+      public void setMajor(int value) {
+         buffer.data[buffer.offset + 5] = (byte) value;
+      }
+
+      public int getMinor() {
+         return buffer.get(6) & 0xFF;
+      }
+
+      public void setMinor(int value) {
+         buffer.data[buffer.offset + 6] = (byte) value;
+      }
+
+      public int getRevision() {
+         return buffer.get(7) & 0xFF;
+      }
+
+      public void setRevision(int value) {
+         buffer.data[buffer.offset + 7] = (byte) value;
+      }
+
+      public Buffer getBuffer() {
+         return buffer;
+      }
+
+      public void setBuffer(Buffer value) {
+         setBuffer(value, true);
+      }
+
+      public void setBuffer(Buffer value, boolean validate) {
+         if (validate && !value.startsWith(PREFIX) || value.length() != 8) {
+            throw new IllegalArgumentException("Not an AMQP header buffer");
+         }
+         buffer = value.buffer();
+      }
+
+      public boolean hasValidPrefix() {
+         return buffer.startsWith(PREFIX);
+      }
+
+      @Override
+      public String toString() {
+         StringBuilder builder = new StringBuilder();
+         for (int i = 0; i < buffer.length(); ++i) {
+            char value = (char) buffer.get(i);
+            if (Character.isLetter(value)) {
+               builder.append(value);
+            } else {
+               builder.append(",");
+               builder.append((int) value);
+            }
+         }
+         return builder.toString();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
index 9cd8f50..e636d83 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
@@ -22,7 +22,6 @@ import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
-import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.qpid.proton.message.Message;
 import org.junit.Test;
@@ -57,7 +56,6 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
       receiver2.flow(1);
       message.release();
 
-
       // Read the message again and validate its state
       message = receiver2.receive(10, TimeUnit.SECONDS);
       assertNotNull("did not receive message again", message);
@@ -172,21 +170,4 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
 
       connection.close();
    }
-
-   public void sendMessages(String destinationName, int count) throws Exception {
-      AmqpClient client = createAmqpClient();
-      AmqpConnection connection = addConnection(client.connect());
-      try {
-         AmqpSession session = connection.createSession();
-         AmqpSender sender = session.createSender(destinationName);
-
-         for (int i = 0; i < count; ++i) {
-            AmqpMessage message = new AmqpMessage();
-            message.setMessageId("MessageID:" + i);
-            sender.send(message);
-         }
-      } finally {
-         connection.close();
-      }
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java
index 681ffbd..edf9459 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java
@@ -19,11 +19,11 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
-import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.junit.Test;
 
@@ -33,46 +33,81 @@ import org.junit.Test;
 public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
 
    @Test(timeout = 60000)
-   public void testReceiverCanDrainMessages() throws Exception {
+   public void testReceiverCanDrainMessagesQueue() throws Exception {
+      doTestReceiverCanDrainMessages(false);
+   }
+
+   @Test(timeout = 60000)
+   public void testReceiverCanDrainMessagesTopic() throws Exception {
+      doTestReceiverCanDrainMessages(true);
+   }
+
+   private void doTestReceiverCanDrainMessages(boolean topic) throws Exception {
+      final String destinationName;
+      if (topic) {
+         destinationName = getTopicName();
+      } else {
+         destinationName = getQueueName();
+      }
+
       int MSG_COUNT = 20;
-      sendMessages(getQueueName(), MSG_COUNT);
 
       AmqpClient client = createAmqpClient();
-      AmqpConnection connection = client.connect();
+      AmqpConnection connection = addConnection(client.connect());
       AmqpSession session = connection.createSession();
 
-      AmqpReceiver receiver = session.createReceiver(getQueueName());
+      AmqpReceiver receiver = session.createReceiver(destinationName);
+
+      sendMessages(destinationName, MSG_COUNT);
+
+      Queue queueView = getProxyToQueue(destinationName);
 
-      Queue queueView = getProxyToQueue(getQueueName());
       assertEquals(MSG_COUNT, queueView.getMessageCount());
+      assertEquals(0, queueView.getDeliveringCount());
 
       receiver.drain(MSG_COUNT);
       for (int i = 0; i < MSG_COUNT; ++i) {
          AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-         assertNotNull(message);
+         assertNotNull("Failed to read message: " + (i + 1), message);
+         IntegrationTestLogger.LOGGER.info("Read message: " + message.getMessageId());
          message.accept();
       }
       receiver.close();
 
-      assertEquals(0, queueView.getMessageCount());
-
       connection.close();
    }
 
    @Test(timeout = 60000)
-   public void testPullWithNoMessageGetDrained() throws Exception {
+   public void testPullWithNoMessageGetDrainedQueue() throws Exception {
+      doTestPullWithNoMessageGetDrained(false);
+   }
+
+   @Test(timeout = 60000)
+   public void testPullWithNoMessageGetDrainedTopic() throws Exception {
+      doTestPullWithNoMessageGetDrained(true);
+   }
+
+   private void doTestPullWithNoMessageGetDrained(boolean topic) throws Exception {
+
+      final String destinationName;
+      if (topic) {
+         destinationName = getTopicName();
+      } else {
+         destinationName = getQueueName();
+      }
 
       AmqpClient client = createAmqpClient();
-      AmqpConnection connection = client.connect();
+      AmqpConnection connection = addConnection(client.connect());
       AmqpSession session = connection.createSession();
 
-      AmqpReceiver receiver = session.createReceiver(getQueueName());
+      AmqpReceiver receiver = session.createReceiver(destinationName);
 
       receiver.flow(10);
 
-      Queue queueView = getProxyToQueue(getQueueName());
+      Queue queueView = getProxyToQueue(destinationName);
+
       assertEquals(0, queueView.getMessageCount());
-      assertEquals(0, queueView.getDeliveringCount());
+      assertEquals(0, queueView.getMessagesAcknowledged());
 
       assertEquals(10, receiver.getReceiver().getRemoteCredit());
 
@@ -84,18 +119,36 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 60000)
-   public void testPullOneFromRemote() throws Exception {
-      int MSG_COUNT = 20;
-      sendMessages(getQueueName(), MSG_COUNT);
+   public void testPullOneFromRemoteQueue() throws Exception {
+      doTestPullOneFromRemote(false);
+   }
+
+   @Test(timeout = 60000)
+   public void testPullOneFromRemoteTopic() throws Exception {
+      doTestPullOneFromRemote(true);
+   }
+
+   private void doTestPullOneFromRemote(boolean topic) throws Exception {
 
       AmqpClient client = createAmqpClient();
-      AmqpConnection connection = client.connect();
+      AmqpConnection connection = addConnection(client.connect());
       AmqpSession session = connection.createSession();
 
-      AmqpReceiver receiver = session.createReceiver(getQueueName());
+      final String destinationName;
+      if (topic) {
+         destinationName = getTopicName();
+      } else {
+         destinationName = getQueueName();
+      }
+
+      AmqpReceiver receiver = session.createReceiver(destinationName);
 
-      Queue queueView = getProxyToQueue(getQueueName());
+      int MSG_COUNT = 20;
+      sendMessages(destinationName, MSG_COUNT);
+
+      Queue queueView = getProxyToQueue(destinationName);
       assertEquals(MSG_COUNT, queueView.getMessageCount());
+      assertEquals(0, queueView.getDeliveringCount());
 
       assertEquals(0, receiver.getReceiver().getRemoteCredit());
 
@@ -107,24 +160,39 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
 
       receiver.close();
 
-      assertEquals(MSG_COUNT - 1, queueView.getMessageCount());
-      assertEquals(1, queueView.getMessagesAcknowledged());
-
       connection.close();
    }
 
    @Test(timeout = 60000)
-   public void testMultipleZeroResultPulls() throws Exception {
+   public void testMultipleZeroResultPullsQueue() throws Exception {
+      doTestMultipleZeroResultPulls(false);
+   }
+
+   @Test(timeout = 60000)
+   public void testMultipleZeroResultPullsTopic() throws Exception {
+      doTestMultipleZeroResultPulls(true);
+   }
+
+   private void doTestMultipleZeroResultPulls(boolean topic) throws Exception {
+
       AmqpClient client = createAmqpClient();
-      AmqpConnection connection = client.connect();
+      AmqpConnection connection = addConnection(client.connect());
       AmqpSession session = connection.createSession();
 
-      AmqpReceiver receiver = session.createReceiver(getQueueName());
+      final String destinationName;
+      if (topic) {
+         destinationName = getTopicName();
+      } else {
+         destinationName = getQueueName();
+      }
+
+      AmqpReceiver receiver = session.createReceiver(destinationName);
 
       receiver.flow(10);
 
-      Queue queueView = getProxyToQueue(getQueueName());
+      Queue queueView = getProxyToQueue(destinationName);
       assertEquals(0, queueView.getMessageCount());
+      assertEquals(0, queueView.getDeliveringCount());
 
       assertEquals(10, receiver.getReceiver().getRemoteCredit());
 
@@ -139,27 +207,4 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
 
       connection.close();
    }
-
-   public void sendMessages(String destinationName, int count) throws Exception {
-      AmqpClient client = createAmqpClient();
-      AmqpConnection connection = null;
-
-      try {
-         connection = client.connect();
-         AmqpSession session = connection.createSession();
-         AmqpSender sender = session.createSender(destinationName);
-
-         for (int i = 0; i < count; ++i) {
-            AmqpMessage message = new AmqpMessage();
-            message.setText("Test-Message-" + i);
-            sender.send(message);
-         }
-
-         sender.close();
-      } finally {
-         if (connection != null) {
-            connection.close();
-         }
-      }
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java
index b47ad50..3aff030 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java
@@ -16,13 +16,38 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.transport.amqp.client.AmqpUnknownFilterType;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Session;
 import org.junit.Test;
 
 /**
@@ -31,6 +56,119 @@ import org.junit.Test;
 public class AmqpReceiverTest extends AmqpClientTestSupport {
 
    @Test(timeout = 60000)
+   public void testCreateQueueReceiver() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getQueueName());
+
+      Queue queue = getProxyToQueue(getQueueName());
+      assertNotNull(queue);
+
+      receiver.close();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testCreateTopicReceiver() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTopicName());
+
+      Queue queue = getProxyToQueue(getQueueName());
+      assertNotNull(queue);
+
+      receiver.close();
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testCreateQueueReceiverWithNoLocalSet() throws Exception {
+      AmqpClient client = createAmqpClient();
+
+      client.setValidator(new AmqpValidator() {
+
+         @SuppressWarnings("unchecked")
+         @Override
+         public void inspectOpenedResource(Receiver receiver) {
+
+            if (receiver.getRemoteSource() == null) {
+               markAsInvalid("Link opened with null source.");
+            }
+
+            Source source = (Source) receiver.getRemoteSource();
+            Map<Symbol, Object> filters = source.getFilter();
+
+            // Currently don't support noLocal on a Queue
+            if (findFilter(filters, NO_LOCAL_FILTER_IDS) != null) {
+               markAsInvalid("Broker did not return the NoLocal Filter on Attach");
+            }
+         }
+      });
+
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      session.createReceiver(getQueueName(), null, true);
+
+      connection.getStateInspector().assertValid();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testCreateQueueReceiverWithJMSSelector() throws Exception {
+      AmqpClient client = createAmqpClient();
+
+      client.setValidator(new AmqpValidator() {
+
+         @SuppressWarnings("unchecked")
+         @Override
+         public void inspectOpenedResource(Receiver receiver) {
+
+            if (receiver.getRemoteSource() == null) {
+               markAsInvalid("Link opened with null source.");
+            }
+
+            Source source = (Source) receiver.getRemoteSource();
+            Map<Symbol, Object> filters = source.getFilter();
+
+            if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) {
+               markAsInvalid("Broker did not return the JMS Filter on Attach");
+            }
+         }
+      });
+
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      session.createReceiver(getQueueName(), "JMSPriority > 8");
+
+      connection.getStateInspector().assertValid();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testInvalidFilter() throws Exception {
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      try {
+         session.createReceiver(getQueueName(), "null = 'f''", true);
+         fail("should throw exception");
+      } catch (Exception e) {
+         assertTrue(e.getCause() instanceof JMSException);
+      }
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
    public void testSenderSettlementModeSettledIsHonored() throws Exception {
       doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED);
    }
@@ -96,4 +234,164 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
       receiver.close();
       connection.close();
    }
+
+   @Test(timeout = 60000)
+   public void testClientIdIsSetInSubscriptionList() throws Exception {
+      server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST));
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      connection.setContainerId("testClient");
+      connection.connect();
+
+      try {
+         AmqpSession session = connection.createSession();
+
+         Source source = new Source();
+         source.setDurable(TerminusDurability.UNSETTLED_STATE);
+         source.setCapabilities(Symbol.getSymbol("topic"));
+         source.setAddress("mytopic");
+         session.createReceiver(source, "testSub");
+
+         SimpleString fo = new SimpleString("testClient.testSub:mytopic");
+         assertNotNull(server.locateQueue(fo));
+
+      } catch (Exception e) {
+         e.printStackTrace();
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testLinkDetachSentWhenQueueDeleted() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpReceiver receiver = session.createReceiver(getQueueName());
+
+         server.destroyQueue(new SimpleString(getQueueName()), null, false, true);
+
+         assertTrue("Receiver should have closed", Wait.waitFor(receiver::isClosed));
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception {
+      AddressSettings value = new AddressSettings();
+      value.setAutoCreateQueues(false);
+      value.setAutoCreateAddresses(false);
+      server.getAddressSettingsRepository().addMatch("AnAddressThatDoesNotExist", value);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         AmqpSession session = connection.createSession();
+
+         Exception expectedException = null;
+         try {
+            session.createSender("AnAddressThatDoesNotExist");
+            fail("Creating a sender here on an address that doesn't exist should fail");
+         } catch (Exception e) {
+            expectedException = e;
+         }
+
+         assertNotNull(expectedException);
+         assertTrue(expectedException.getMessage().contains("amqp:not-found"));
+         assertTrue(expectedException.getMessage().contains("target address does not exist"));
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception {
+      AmqpClient client = createAmqpClient();
+
+      client.setValidator(new AmqpValidator() {
+
+         @SuppressWarnings("unchecked")
+         @Override
+         public void inspectOpenedResource(Receiver receiver) {
+
+            if (receiver.getRemoteSource() == null) {
+               markAsInvalid("Link opened with null source.");
+            }
+
+            Source source = (Source) receiver.getRemoteSource();
+            Map<Symbol, Object> filters = source.getFilter();
+
+            if (findFilter(filters, AmqpUnknownFilterType.UNKNOWN_FILTER_IDS) != null) {
+               markAsInvalid("Broker should not return unsupported filter on attach.");
+            }
+         }
+      });
+
+      Map<Symbol, DescribedType> filters = new HashMap<>();
+      filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKNOWN_FILTER);
+
+      Source source = new Source();
+      source.setAddress(getQueueName());
+      source.setFilter(filters);
+      source.setDurable(TerminusDurability.NONE);
+      source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      assertEquals(0, server.getTotalConsumerCount());
+
+      session.createReceiver(source);
+
+      assertEquals(1, server.getTotalConsumerCount());
+
+      connection.getStateInspector().assertValid();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testReceiverCloseSendsRemoteClose() throws Exception {
+      AmqpClient client = createAmqpClient();
+      assertNotNull(client);
+
+      final AtomicBoolean closed = new AtomicBoolean();
+
+      client.setValidator(new AmqpValidator() {
+
+         @Override
+         public void inspectClosedResource(Session session) {
+            IntegrationTestLogger.LOGGER.info("Session closed: " + session.getContext());
+         }
+
+         @Override
+         public void inspectDetachedResource(Receiver receiver) {
+            markAsInvalid("Broker should not detach receiver linked to closed session.");
+         }
+
+         @Override
+         public void inspectClosedResource(Receiver receiver) {
+            IntegrationTestLogger.LOGGER.info("Receiver closed: " + receiver.getContext());
+            closed.set(true);
+         }
+      });
+
+      AmqpConnection connection = addConnection(client.connect());
+      assertNotNull(connection);
+      AmqpSession session = connection.createSession();
+      assertNotNull(session);
+      AmqpReceiver receiver = session.createReceiver(getQueueName());
+      assertNotNull(receiver);
+
+      receiver.close();
+
+      assertTrue("Did not process remote close as expected", closed.get());
+      connection.getStateInspector().assertValid();
+
+      connection.close();
+   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
index 748f10a..6459e76 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
@@ -104,7 +104,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
       }
    }
 
-   @Test
+   @Test(timeout = 60000)
    public void testScheduleWithDelay() throws Exception {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
index 8e41d71..f99fc14 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
@@ -17,21 +17,9 @@
 package org.apache.activemq.artemis.tests.integration.amqp;
 
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.security.Role;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
-import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -39,44 +27,26 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
 import org.junit.Test;
 
 public class AmqpSecurityTest extends AmqpClientTestSupport {
 
-   private String user1 = "user1";
-   private String password1 = "password1";
-
    @Override
-   protected ActiveMQServer createServer() throws Exception {
-      ActiveMQServer server = createServer(true, true);
-      ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
-      securityManager.getConfiguration().addUser("foo", "bar");
-      securityManager.getConfiguration().addRole("foo", "none");
-      securityManager.getConfiguration().addUser(user1, password1);
-      securityManager.getConfiguration().addRole(user1, "none");
-      HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
-      HashSet<Role> value = new HashSet<>();
-      value.add(new Role("none", false, true, true, true, true, true, true, true));
-      securityRepository.addMatch(getQueueName(), value);
-
-      serverManager = new JMSServerManagerImpl(server);
-      Configuration serverConfig = server.getConfiguration();
-      serverConfig.getAddressesSettings().put("jms.queue.#", new AddressSettings().setAutoCreateJmsQueues(true).setDeadLetterAddress(new SimpleString("jms.queue.ActiveMQ.DLQ")));
-      serverConfig.setSecurityEnabled(true);
-      serverManager.start();
-      server.start();
-      return server;
+   protected boolean isSecurityEnabled() {
+      return true;
    }
 
    @Test(timeout = 60000)
    public void testSaslAuthWithInvalidCredentials() throws Exception {
       AmqpConnection connection = null;
-      AmqpClient client = createAmqpClient("foo", "foo");
+      AmqpClient client = createAmqpClient(fullUser, guestUser);
 
       try {
          connection = client.connect();
-         fail("Should authenticate even with authzid set");
+         fail("Should not authenticate when invalid credentials provided");
       } catch (Exception ex) {
+         // Expected
       } finally {
          if (connection != null) {
             connection.close();
@@ -87,8 +57,8 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
    @Test(timeout = 60000)
    public void testSaslAuthWithAuthzid() throws Exception {
       AmqpConnection connection = null;
-      AmqpClient client = createAmqpClient("foo", "bar");
-      client.setAuthzid("foo");
+      AmqpClient client = createAmqpClient(guestUser, guestPass);
+      client.setAuthzid(guestUser);
 
       try {
          connection = client.connect();
@@ -104,7 +74,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
    @Test(timeout = 60000)
    public void testSaslAuthWithoutAuthzid() throws Exception {
       AmqpConnection connection = null;
-      AmqpClient client = createAmqpClient("foo", "bar");
+      AmqpClient client = createAmqpClient(guestUser, guestPass);
 
       try {
          connection = client.connect();
@@ -119,20 +89,22 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
 
    @Test(timeout = 60000)
    public void testSendAndRejected() throws Exception {
-      AmqpConnection connection = null;
-      AmqpClient client = createAmqpClient("foo", "bar");
       CountDownLatch latch = new CountDownLatch(1);
+
+      AmqpClient client = createAmqpClient(guestUser, guestPass);
       client.setValidator(new AmqpValidator() {
+
          @Override
-         public void inspectDeliveryUpdate(Delivery delivery) {
-            super.inspectDeliveryUpdate(delivery);
+         public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
             if (!delivery.remotelySettled()) {
                markAsInvalid("delivery is not remotely settled");
             }
+
             latch.countDown();
          }
       });
-      connection = addConnection(client.connect());
+
+      AmqpConnection connection = addConnection(client.connect());
       AmqpSession session = connection.createSession();
 
       AmqpSender sender = session.createSender(getQueueName());
@@ -145,8 +117,8 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
       try {
          sender.send(message);
       } catch (IOException e) {
-         //
       }
+
       assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
       connection.getStateInspector().assertValid();
       connection.close();
@@ -154,11 +126,9 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
 
    @Test(timeout = 60000)
    public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception {
-      server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST));
-      server.createQueue(new SimpleString(getQueueName()), RoutingType.ANYCAST, new SimpleString(getQueueName()), null, true, false);
-
-      AmqpClient client = createAmqpClient(user1, password1);
+      AmqpClient client = createAmqpClient(guestUser, guestPass);
       AmqpConnection connection = client.connect();
+
       try {
          AmqpSession session = connection.createSession();
 
@@ -181,5 +151,4 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
          connection.close();
       }
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 9cf256a..0cae79f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -16,28 +16,23 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
-import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
-import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
 import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
-import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
 
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.jms.JMSException;
+import javax.jms.Topic;
 
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
-import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -48,8 +43,6 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Sender;
 import org.jgroups.util.UUID;
 import org.junit.Test;
@@ -63,19 +56,14 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
 
    protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
 
-   @Test(timeout = 60000)
-   public void testCreateQueueReceiver() throws Exception {
-      AmqpClient client = createAmqpClient();
-      AmqpConnection connection = addConnection(client.connect());
-      AmqpSession session = connection.createSession();
-
-      AmqpReceiver receiver = session.createReceiver(getQueueName());
-
-      Queue queue = getProxyToQueue(getQueueName());
-      assertNotNull(queue);
+   @Override
+   protected boolean isAutoCreateQueues() {
+      return false;
+   }
 
-      receiver.close();
-      connection.close();
+   @Override
+   protected boolean isAutoCreateAddresses() {
+      return false;
    }
 
    @Test(timeout = 60000)
@@ -103,90 +91,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
       assertEquals(0, queue.getMessageCount());
    }
 
-
-   @Test(timeout = 60000)
-   public void testCreateQueueReceiverWithJMSSelector() throws Exception {
-      AmqpClient client = createAmqpClient();
-
-      client.setValidator(new AmqpValidator() {
-
-         @SuppressWarnings("unchecked")
-         @Override
-         public void inspectOpenedResource(Receiver receiver) {
-
-            if (receiver.getRemoteSource() == null) {
-               markAsInvalid("Link opened with null source.");
-            }
-
-            Source source = (Source) receiver.getRemoteSource();
-            Map<Symbol, Object> filters = source.getFilter();
-
-            if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) {
-               markAsInvalid("Broker did not return the JMS Filter on Attach");
-            }
-         }
-      });
-
-      AmqpConnection connection = addConnection(client.connect());
-      AmqpSession session = connection.createSession();
-
-      session.createReceiver(getQueueName(), "JMSPriority > 8");
-
-      connection.getStateInspector().assertValid();
-      connection.close();
-   }
-
-   @Test(timeout = 60000)
-   public void testCreateQueueReceiverWithNoLocalSet() throws Exception {
-      AmqpClient client = createAmqpClient();
-
-      client.setValidator(new AmqpValidator() {
-
-         @SuppressWarnings("unchecked")
-         @Override
-         public void inspectOpenedResource(Receiver receiver) {
-
-            if (receiver.getRemoteSource() == null) {
-               markAsInvalid("Link opened with null source.");
-            }
-
-            Source source = (Source) receiver.getRemoteSource();
-            Map<Symbol, Object> filters = source.getFilter();
-
-            // Currently don't support noLocal on a Queue
-            if (findFilter(filters, NO_LOCAL_FILTER_IDS) != null) {
-               markAsInvalid("Broker did not return the NoLocal Filter on Attach");
-            }
-         }
-      });
-
-      AmqpConnection connection = addConnection(client.connect());
-      AmqpSession session = connection.createSession();
-
-      session.createReceiver(getQueueName(), null, true);
-
-      connection.getStateInspector().assertValid();
-      connection.close();
-   }
-
-   @Test(timeout = 60000)
-   public void testInvalidFilter() throws Exception {
-      AmqpClient client = createAmqpClient();
-
-      AmqpConnection connection = addConnection(client.connect());
-      AmqpSession session = connection.createSession();
-
-      try {
-         session.createReceiver(getQueueName(), "null = 'f''", true);
-         fail("should throw exception");
-      } catch (Exception e) {
-         assertTrue(e.getCause() instanceof JMSException);
-         //passed
-      }
-
-      connection.close();
-   }
-
    @Test(timeout = 60000)
    public void testQueueReceiverReadMessage() throws Exception {
       sendMessages(getQueueName(), 1);
@@ -210,108 +114,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 60000)
-   public void testQueueReceiverReadMessageWithDivert() throws Exception {
-      final String forwardingAddress = getQueueName() + "Divert";
-      final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress);
-      server.createQueue(simpleForwardingAddress, RoutingType.ANYCAST, simpleForwardingAddress, null, true, false);
-      server.getActiveMQServerControl().createDivert("name", "routingName", getQueueName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString());
-      sendMessages(getQueueName(), 1);
-
-      AmqpClient client = createAmqpClient();
-      AmqpConnection connection = addConnection(client.connect());
-      AmqpSession session = connection.createSession();
-
-      AmqpReceiver receiver = session.createReceiver(forwardingAddress);
-
-      Queue queueView = getProxyToQueue(forwardingAddress);
-      assertEquals(1, queueView.getMessageCount());
-
-      receiver.flow(1);
-      assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
-      receiver.close();
-
-      assertEquals(1, queueView.getMessageCount());
-
-      connection.close();
-   }
-
-   @Test(timeout = 60000)
-   public void testAnycastMessageRoutingExclusivityUsingPrefix() throws Exception {
-      final String addressA = "addressA";
-      final String queueA = "queueA";
-      final String queueB = "queueB";
-      final String queueC = "queueC";
-
-      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
-      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
-      serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
-      serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
-      serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
-
-      sendMessages("anycast://" + addressA, 1);
-
-      assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
-      assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
-   }
-
-   @Test(timeout = 60000)
-   public void testAnycastMessageRoutingExclusivityUsingProperty() throws Exception {
-      final String addressA = "addressA";
-      final String queueA = "queueA";
-      final String queueB = "queueB";
-      final String queueC = "queueC";
-
-      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
-      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
-      serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
-      serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
-      serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
-
-      sendMessages(addressA, 1, RoutingType.ANYCAST);
-
-      assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
-      assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
-   }
-
-   @Test
-   public void testMulticastMessageRoutingExclusivityUsingPrefix() throws Exception {
-      final String addressA = "addressA";
-      final String queueA = "queueA";
-      final String queueB = "queueB";
-      final String queueC = "queueC";
-
-      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
-      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
-      serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
-      serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString());
-      serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
-
-      sendMessages("multicast://" + addressA, 1);
-
-      assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
-      assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
-   }
-
-   @Test
-   public void testMulticastMessageRoutingExclusivityUsingProperty() throws Exception {
-      final String addressA = "addressA";
-      final String queueA = "queueA";
-      final String queueB = "queueB";
-      final String queueC = "queueC";
-
-      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
-      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
-      serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
-      serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString());
-      serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
-
-      sendMessages(addressA, 1, RoutingType.MULTICAST);
-
-      assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
-      assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
-   }
-
-   @Test(timeout = 60000)
    public void testMessageDurableFalse() throws Exception {
       sendMessages(getQueueName(), 1, false);
 
@@ -870,7 +672,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
       message1.setMessageId("ID:Message:1");
       sender.send(message1);
 
-      assertEquals(1, queue.getMessageCount());
+      assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
       receiver1.flow(1);
       message1 = receiver1.receive(50, TimeUnit.SECONDS);
       assertNotNull("Should have read a message", message1);
@@ -884,7 +686,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
       message2.setMessageId("ID:Message:2");
       sender.send(message2);
 
-      assertEquals(1, queue.getMessageCount());
+      assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
       receiver1.flow(1);
       message2 = receiver1.receive(50, TimeUnit.SECONDS);
       assertNotNull("Should have read a message", message2);
@@ -1018,7 +820,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
       connection.close();
    }
 
-   @Test
+   @Test(timeout = 60000)
    public void testDeliveryDelayOfferedWhenRequested() throws Exception {
       AmqpClient client = createAmqpClient();
       client.setValidator(new AmqpValidator() {
@@ -1036,7 +838,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
       AmqpConnection connection = addConnection(client.connect());
       AmqpSession session = connection.createSession();
 
-      AmqpSender sender = session.createSender("queue://" + getQueueName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY});
+      AmqpSender sender = session.createSender(getQueueName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY});
       assertNotNull(sender);
 
       connection.getStateInspector().assertValid();
@@ -1100,45 +902,119 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
       connection.close();
    }
 
-   public void sendMessages(String destinationName, int count) throws Exception {
-      sendMessages(destinationName, count, null);
+   @Test(timeout = 60000)
+   public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      Exception expectedException = null;
+      try {
+         session.createSender("AnAddressThatDoesNotExist");
+         fail("Creating a sender here on an address that doesn't exist should fail");
+      } catch (Exception e) {
+         expectedException = e;
+      }
+
+      assertNotNull(expectedException);
+      assertTrue(expectedException.getMessage().contains("amqp:not-found"));
+      assertTrue(expectedException.getMessage().contains("target address does not exist"));
+
+      connection.close();
    }
 
-   public void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception {
+   @Test(timeout = 60000)
+   public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception {
+      String queueName = "TestQueueName";
+      String address = "TestAddress";
+      server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.ANYCAST));
+      server.createQueue(new SimpleString(address), RoutingType.ANYCAST, new SimpleString(queueName), null, true, false);
+
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
+
       try {
          AmqpSession session = connection.createSession();
-         AmqpSender sender = session.createSender(destinationName);
+         AmqpSender sender = session.createSender(address);
+         AmqpReceiver receiver = session.createReceiver(address);
+         receiver.flow(1);
 
-         for (int i = 0; i < count; ++i) {
-            AmqpMessage message = new AmqpMessage();
-            message.setMessageId("MessageID:" + i);
-            if (routingType != null) {
-               message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType());
-            }
-            sender.send(message);
-         }
+         AmqpMessage message = new AmqpMessage();
+         message.setText("TestPayload");
+         sender.send(message);
+
+         AmqpMessage receivedMessage = receiver.receive(5000, TimeUnit.MILLISECONDS);
+         assertNotNull(receivedMessage);
       } finally {
          connection.close();
       }
    }
 
-   public void sendMessages(String destinationName, int count, boolean durable) throws Exception {
+   @Test(timeout = 60000)
+   public void testSendReceiveLotsOfDurableMessagesOnQueue() throws Exception {
+      doTestSendReceiveLotsOfDurableMessages(Queue.class);
+   }
+
+   @Test(timeout = 60000)
+   public void testSendReceiveLotsOfDurableMessagesOnTopic() throws Exception {
+      doTestSendReceiveLotsOfDurableMessages(Topic.class);
+   }
+
+   private void doTestSendReceiveLotsOfDurableMessages(Class<?> destType) throws Exception {
+      final int MSG_COUNT = 1000;
+
       AmqpClient client = createAmqpClient();
+
       AmqpConnection connection = addConnection(client.connect());
-      try {
-         AmqpSession session = connection.createSession();
-         AmqpSender sender = session.createSender(destinationName);
+      AmqpSession session = connection.createSession();
+
+      final CountDownLatch done = new CountDownLatch(MSG_COUNT);
+      final AtomicBoolean error = new AtomicBoolean(false);
+      final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+      final String address;
+      if (Queue.class.equals(destType)) {
+         address = getQueueName();
+      } else {
+         address = getTopicName();
+      }
+
+      final AmqpReceiver receiver = session.createReceiver(address);
+      receiver.flow(MSG_COUNT);
+
+      AmqpSender sender = session.createSender(address);
 
-         for (int i = 0; i < count; ++i) {
-            AmqpMessage message = new AmqpMessage();
-            message.setMessageId("MessageID:" + i);
-            message.setDurable(durable);
-            sender.send(message);
+      Queue queueView = getProxyToQueue(address);
+
+      executor.execute(new Runnable() {
+
+         @Override
+         public void run() {
+            for (int i = 0; i < MSG_COUNT; i++) {
+               try {
+                  AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+                  received.accept();
+                  done.countDown();
+               } catch (Exception ex) {
+                  LOG.info("Caught error: {}", ex.getClass().getSimpleName());
+                  error.set(true);
+               }
+            }
          }
-      } finally {
-         connection.close();
+      });
+
+      for (int i = 0; i < MSG_COUNT; i++) {
+         AmqpMessage message = new AmqpMessage();
+         message.setMessageId("msg" + i);
+         sender.send(message);
       }
+
+      assertTrue("did not read all messages, waiting on: " + done.getCount(), done.await(10, TimeUnit.SECONDS));
+      assertFalse("should not be any errors on receive", error.get());
+      assertTrue("Should be no inflight messages.", Wait.waitFor(() -> queueView.getDeliveringCount() == 0));
+
+      sender.close();
+      receiver.close();
+      connection.close();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
index 7b8cbef..8c95064 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
@@ -16,14 +16,22 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
 import org.junit.Test;
 
 /**
@@ -101,4 +109,74 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
 
       connection.close();
    }
+
+   @Test(timeout = 60000)
+   public void testUnsettledSender() throws Exception {
+      final int MSG_COUNT = 1000;
+
+      final CountDownLatch settled = new CountDownLatch(MSG_COUNT);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+
+      connection.setStateInspector(new AmqpValidator() {
+
+         @Override
+         public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
+            if (delivery.remotelySettled()) {
+               IntegrationTestLogger.LOGGER.trace("Remote settled message for sender: " + sender.getName());
+               settled.countDown();
+            }
+         }
+      });
+
+      AmqpSession session = connection.createSession();
+      AmqpSender sender = session.createSender(getQueueName(), false);
+
+      for (int i = 1; i <= MSG_COUNT; ++i) {
+         AmqpMessage message = new AmqpMessage();
+         message.setText("Test-Message: " + i);
+         sender.send(message);
+
+         if (i % 1000 == 0) {
+            IntegrationTestLogger.LOGGER.info("Sent message: " + i);
+         }
+      }
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertTrue("All messages should arrive", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT));
+
+      sender.close();
+
+      assertTrue("Remote should have settled all deliveries", settled.await(5, TimeUnit.MINUTES));
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testPresettledSender() throws Exception {
+      final int MSG_COUNT = 1000;
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getQueueName(), true);
+
+      for (int i = 1; i <= MSG_COUNT; ++i) {
+         AmqpMessage message = new AmqpMessage();
+         message.setText("Test-Message: " + i);
+         sender.send(message);
+
+         if (i % 1000 == 0) {
+            IntegrationTestLogger.LOGGER.info("Sent message: " + i);
+         }
+      }
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertTrue("All messages should arrive", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT));
+
+      sender.close();
+      connection.close();
+   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
new file mode 100644
index 0000000..0048be5
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Session;
+import org.junit.Test;
+
+public class AmqpSessionTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testCreateSession() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+      assertNotNull(session);
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testSessionClosedDoesNotGetReceiverDetachFromRemote() throws Exception {
+      AmqpClient client = createAmqpClient();
+      assertNotNull(client);
+
+      client.setValidator(new AmqpValidator() {
+
+         @Override
+         public void inspectClosedResource(Session session) {
+            IntegrationTestLogger.LOGGER.info("Session closed: " + session.getContext());
+         }
+
+         @Override
+         public void inspectDetachedResource(Receiver receiver) {
+            markAsInvalid("Broker should not detach receiver linked to closed session.");
+         }
+
+         @Override
+         public void inspectClosedResource(Receiver receiver) {
+            markAsInvalid("Broker should not close receiver linked to closed session.");
+         }
+      });
+
+      AmqpConnection connection = addConnection(client.connect());
+      assertNotNull(connection);
+      AmqpSession session = connection.createSession();
+      assertNotNull(session);
+      AmqpReceiver receiver = session.createReceiver(getQueueName());
+      assertNotNull(receiver);
+
+      session.close();
+
+      connection.getStateInspector().assertValid();
+      connection.close();
+   }
+}


Mime
View raw message