activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [7/7] activemq-artemis git commit: ARTEMIS-1123 Major AMQP Test Suite refactoring
Date Fri, 28 Apr 2017 09:17:04 GMT
ARTEMIS-1123 Major AMQP Test Suite refactoring

Refactor the AMQP test suite grouping tests into more logical unit
tests and adding additional coverage in many areas.  Adds some negative
validation tests to cover features that were only partially tested.

Brings in tests from ActiveMQ 5.x that were not yet ported to Artemis
to increase coverage amd test scenarios previously seen to have issues
in the 5.x broker.

Improve tests that were failing sporadically due to not waiting for
broker stats to be updated after async calls were made.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bad6acb5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bad6acb5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bad6acb5

Branch: refs/heads/master
Commit: bad6acb58f8aed7845276aae119d72dbcb6d6b79
Parents: 60036c9
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Apr 26 18:36:32 2017 -0400
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Fri Apr 28 10:16:21 2017 +0100

----------------------------------------------------------------------
 .../amqp/client/AmqpAbstractResource.java       |   48 +-
 .../transport/amqp/client/AmqpReceiver.java     |  154 +-
 .../transport/amqp/client/AmqpSender.java       |   61 +-
 .../transport/amqp/client/AmqpValidator.java    |    8 +-
 .../integration/amqp/AmqpClientTestSupport.java |  243 ++-
 .../amqp/AmqpDescribedTypePayloadTest.java      |   11 +-
 .../amqp/AmqpDurableReceiverTest.java           |   17 +-
 .../amqp/AmqpExpiredMessageTest.java            |   67 +-
 .../integration/amqp/AmqpFlowControlTest.java   |  284 +++
 .../amqp/AmqpFullyQualifiedNameTest.java        |  221 +++
 .../amqp/AmqpInboundConnectionTest.java         |   25 +-
 .../integration/amqp/AmqpManagementTest.java    |    2 +-
 .../integration/amqp/AmqpMaxFrameSizeTest.java  |   93 +
 .../amqp/AmqpMessageDivertsTest.java            |   59 +
 .../amqp/AmqpMessageRoutingTest.java            |  162 ++
 .../amqp/AmqpOutboundConnectionTest.java        |   70 +
 .../amqp/AmqpPresettledReceiverTest.java        |   22 +-
 .../amqp/AmqpProtocolHeaderHandlingTest.java    |  186 ++
 .../amqp/AmqpReceiverDispositionTest.java       |   19 -
 .../integration/amqp/AmqpReceiverDrainTest.java |  145 +-
 .../integration/amqp/AmqpReceiverTest.java      |  298 +++
 .../amqp/AmqpScheduledMessageTest.java          |    2 +-
 .../integration/amqp/AmqpSecurityTest.java      |   69 +-
 .../integration/amqp/AmqpSendReceiveTest.java   |  346 ++--
 .../tests/integration/amqp/AmqpSenderTest.java  |   78 +
 .../tests/integration/amqp/AmqpSessionTest.java |   75 +
 .../tests/integration/amqp/AmqpTestSupport.java |    4 +-
 .../integration/amqp/AmqpTransactionTest.java   |   21 +-
 .../amqp/BrokerDefinedAnycastConsumerTest.java  |   31 +-
 .../BrokerDefinedMulticastConsumerTest.java     |   21 +-
 .../amqp/ClientDefinedAnycastConsumerTest.java  |    9 +-
 .../amqp/ClientDefinedMultiConsumerTest.java    |   34 +-
 .../integration/amqp/JMSClientTestSupport.java  |  151 ++
 .../integration/amqp/JMSConnectionTest.java     |  118 ++
 .../amqp/JMSConnectionWithSecurityTest.java     |  157 ++
 .../amqp/JMSDurableConsumerTest.java            |  202 ++
 .../amqp/JMSMessageConsumerTest.java            |  500 +++++
 .../integration/amqp/JMSMessageGroupsTest.java  |  102 +
 .../amqp/JMSMessageProducerTest.java            |  221 +++
 .../integration/amqp/JMSMessageTypesTest.java   |  394 ++++
 .../integration/amqp/JMSQueueBrowserTest.java   |  296 +++
 .../amqp/JMSTemporaryDestinationTest.java       |  137 ++
 .../integration/amqp/JMSTopicConsumerTest.java  |  244 +++
 .../integration/amqp/JMSTransactionTest.java    |  216 +++
 .../amqp/ProtonFullQualifiedNameTest.java       |  237 ---
 .../amqp/ProtonMaxFrameSizeTest.java            |   96 -
 .../integration/amqp/ProtonPubSubTest.java      |  271 ---
 .../tests/integration/amqp/ProtonTest.java      | 1788 ------------------
 .../tests/integration/amqp/ProtonTestBase.java  |   91 -
 .../integration/amqp/ProtonTestForHeader.java   |  215 ---
 .../amqp/SendingAndReceivingTest.java           |  224 ---
 51 files changed, 4996 insertions(+), 3549 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
index 691c11f..7328804 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -28,9 +28,8 @@ import org.slf4j.LoggerFactory;
 /**
  * Abstract base for all AmqpResource implementations to extend.
  *
- * This abstract class wraps up the basic state management bits so that the concrete
- * object don't have to reproduce it.  Provides hooks for the subclasses to initialize
- * and shutdown.
+ * This abstract class wraps up the basic state management bits so that the concrete object
+ * don't have to reproduce it. Provides hooks for the subclasses to initialize and shutdown.
  */
 public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpResource {
 
@@ -243,7 +242,6 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
 
    @Override
    public void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException {
-      doDeliveryUpdate(delivery);
    }
 
    @Override
@@ -251,18 +249,17 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
    }
 
    /**
-    * Perform the open operation on the managed endpoint.  A subclass may
-    * override this method to provide additional open actions or configuration
-    * updates.
+    * Perform the open operation on the managed endpoint. A subclass may override this method to
+    * provide additional open actions or configuration updates.
     */
    protected void doOpen() {
       getEndpoint().open();
    }
 
    /**
-    * Perform the close operation on the managed endpoint.  A subclass may
-    * override this method to provide additional close actions or alter the
-    * standard close path such as endpoint detach etc.
+    * Perform the close operation on the managed endpoint. A subclass may override this method
+    * to provide additional close actions or alter the standard close path such as endpoint
+    * detach etc.
     */
    protected void doClose() {
       getEndpoint().close();
@@ -271,17 +268,16 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
    /**
     * Perform the detach operation on the managed endpoint.
     *
-    * By default this method throws an UnsupportedOperationException, a subclass
-    * must implement this and do a detach if its resource supports that.
+    * By default this method throws an UnsupportedOperationException, a subclass must implement
+    * this and do a detach if its resource supports that.
     */
    protected void doDetach() {
       throw new UnsupportedOperationException("Endpoint cannot be detached.");
    }
 
    /**
-    * Complete the open operation on the managed endpoint. A subclass may
-    * override this method to provide additional verification actions or configuration
-    * updates.
+    * Complete the open operation on the managed endpoint. A subclass may override this method
+    * to provide additional verification actions or configuration updates.
     */
    protected void doOpenCompletion() {
       LOG.debug("{} is now open: ", this);
@@ -289,15 +285,14 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
    }
 
    /**
-    * When aborting the open operation, and there isn't an error condition,
-    * provided by the peer, the returned exception will be used instead.
-    * A subclass may override this method to provide alternative behaviour.
+    * When aborting the open operation, and there isn't an error condition, provided by the
+    * peer, the returned exception will be used instead. A subclass may override this method to
+    * provide alternative behaviour.
     */
    protected Exception getOpenAbortException() {
       return new IOException("Open failed unexpectedly.");
    }
 
-   // TODO - Fina a more generic way to do this.
    protected abstract void doOpenInspection();
 
    protected abstract void doClosedInspection();
@@ -305,18 +300,7 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
    protected void doDetachedInspection() {
    }
 
-   protected void doDeliveryUpdate(Delivery delivery) {
-      AmqpValidator validator = getStateInspector();
-      if (validator != null) {
-         try {
-            validator.inspectDeliveryUpdate(delivery);
-         } catch (Throwable error) {
-            validator.markAsInvalid(error.getMessage());
-         }
-      }
-   }
-
-   //----- Private implementation utility methods ---------------------------//
+   // ----- Private implementation utility methods ---------------------------//
 
    private boolean isAwaitingOpen() {
       return this.openRequest != null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 9de2fce..8653cff 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -128,9 +128,12 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    /**
     * Create a new receiver instance.
     *
-    * @param session    The parent session that created the receiver.
-    * @param source     The Source instance to use instead of creating and configuring one.
-    * @param receiverId The unique ID assigned to this receiver.
+    * @param session
+    *        The parent session that created the receiver.
+    * @param source
+    *        The Source instance to use instead of creating and configuring one.
+    * @param receiverId
+    *        The unique ID assigned to this receiver.
     */
    public AmqpReceiver(AmqpSession session, Source source, String receiverId) {
 
@@ -147,10 +150,11 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    }
 
    /**
-    * Close the receiver, a closed receiver will throw exceptions if any further send
-    * calls are made.
+    * Close the receiver, a closed receiver will throw exceptions if any further send calls are
+    * made.
     *
-    * @throws IOException if an error occurs while closing the receiver.
+    * @throws IOException
+    *         if an error occurs while closing the receiver.
     */
    public void close() throws IOException {
       if (closed.compareAndSet(false, true)) {
@@ -170,10 +174,11 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    }
 
    /**
-    * Detach the receiver, a closed receiver will throw exceptions if any further send
-    * calls are made.
+    * Detach the receiver, a closed receiver will throw exceptions if any further send calls are
+    * made.
     *
-    * @throws IOException if an error occurs while closing the receiver.
+    * @throws IOException
+    *         if an error occurs while closing the receiver.
     */
    public void detach() throws IOException {
       if (closed.compareAndSet(false, true)) {
@@ -207,11 +212,12 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    }
 
    /**
-    * Attempts to wait on a message to be delivered to this receiver.  The receive
-    * call will wait indefinitely for a message to be delivered.
+    * Attempts to wait on a message to be delivered to this receiver. The receive call will wait
+    * indefinitely for a message to be delivered.
     *
     * @return a newly received message sent to this receiver.
-    * @throws Exception if an error occurs during the receive attempt.
+    * @throws Exception
+    *         if an error occurs during the receive attempt.
     */
    public AmqpMessage receive() throws Exception {
       checkClosed();
@@ -219,13 +225,16 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    }
 
    /**
-    * Attempts to receive a message sent to this receiver, waiting for the given
-    * timeout value before giving up and returning null.
+    * Attempts to receive a message sent to this receiver, waiting for the given timeout value
+    * before giving up and returning null.
     *
-    * @param timeout the time to wait for a new message to arrive.
-    * @param unit    the unit of time that the timeout value represents.
+    * @param timeout
+    *        the time to wait for a new message to arrive.
+    * @param unit
+    *        the unit of time that the timeout value represents.
     * @return a newly received message or null if the time to wait period expires.
-    * @throws Exception if an error occurs during the receive attempt.
+    * @throws Exception
+    *         if an error occurs during the receive attempt.
     */
    public AmqpMessage receive(long timeout, TimeUnit unit) throws Exception {
       checkClosed();
@@ -233,11 +242,12 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    }
 
    /**
-    * If a message is already available in this receiver's prefetch buffer then
-    * it is returned immediately otherwise this methods return null without waiting.
+    * If a message is already available in this receiver's prefetch buffer then it is returned
+    * immediately otherwise this methods return null without waiting.
     *
     * @return a newly received message or null if there is no currently available message.
-    * @throws Exception if an error occurs during the receive attempt.
+    * @throws Exception
+    *         if an error occurs during the receive attempt.
     */
    public AmqpMessage receiveNoWait() throws Exception {
       checkClosed();
@@ -248,7 +258,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     * Request a remote peer send a Message to this client waiting until one arrives.
     *
     * @return the pulled AmqpMessage or null if none was pulled from the remote.
-    * @throws IOException if an error occurs
+    * @throws IOException
+    *         if an error occurs
     */
    public AmqpMessage pull() throws IOException {
       return pull(-1, TimeUnit.MILLISECONDS);
@@ -258,7 +269,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     * Request a remote peer send a Message to this client using an immediate drain request.
     *
     * @return the pulled AmqpMessage or null if none was pulled from the remote.
-    * @throws IOException if an error occurs
+    * @throws IOException
+    *         if an error occurs
     */
    public AmqpMessage pullImmediate() throws IOException {
       return pull(0, TimeUnit.MILLISECONDS);
@@ -273,10 +285,13 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     *
     * The timeout value when positive is given in milliseconds.
     *
-    * @param timeout the amount of time to tell the remote peer to keep this pull request valid.
-    * @param unit    the unit of measure that the timeout represents.
+    * @param timeout
+    *        the amount of time to tell the remote peer to keep this pull request valid.
+    * @param unit
+    *        the unit of measure that the timeout represents.
     * @return the pulled AmqpMessage or null if none was pulled from the remote.
-    * @throws IOException if an error occurs
+    * @throws IOException
+    *         if an error occurs
     */
    public AmqpMessage pull(final long timeout, final TimeUnit unit) throws IOException {
       checkClosed();
@@ -342,8 +357,10 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    /**
     * Controls the amount of credit given to the receiver link.
     *
-    * @param credit the amount of credit to grant.
-    * @throws IOException if an error occurs while sending the flow.
+    * @param credit
+    *        the amount of credit to grant.
+    * @throws IOException
+    *         if an error occurs while sending the flow.
     */
    public void flow(final int credit) throws IOException {
       checkClosed();
@@ -369,8 +386,10 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    /**
     * Attempts to drain a given amount of credit from the link.
     *
-    * @param credit the amount of credit to drain.
-    * @throws IOException if an error occurs while sending the drain.
+    * @param credit
+    *        the amount of credit to drain.
+    * @throws IOException
+    *         if an error occurs while sending the drain.
     */
    public void drain(final int credit) throws IOException {
       checkClosed();
@@ -396,7 +415,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    /**
     * Stops the receiver, using all link credit and waiting for in-flight messages to arrive.
     *
-    * @throws IOException if an error occurs while sending the drain.
+    * @throws IOException
+    *         if an error occurs while sending the drain.
     */
    public void stop() throws IOException {
       checkClosed();
@@ -419,12 +439,14 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    }
 
    /**
-    * Accepts a message that was dispatched under the given Delivery instance and settles the delivery.
+    * Accepts a message that was dispatched under the given Delivery instance and settles the
+    * delivery.
     *
     * @param delivery
     *        the Delivery instance to accept.
     *
-    * @throws IOException if an error occurs while sending the accept.
+    * @throws IOException
+    *         if an error occurs while sending the accept.
     */
    public void accept(Delivery delivery) throws IOException {
       accept(delivery, this.session, true);
@@ -438,25 +460,28 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     * @param settle
     *        true if the receiver should settle the delivery or just send the disposition.
     *
-    * @throws IOException if an error occurs while sending the accept.
+    * @throws IOException
+    *         if an error occurs while sending the accept.
     */
    public void accept(Delivery delivery, boolean settle) throws IOException {
       accept(delivery, this.session, settle);
    }
 
    /**
-    * Accepts a message that was dispatched under the given Delivery instance and settles the delivery.
+    * Accepts a message that was dispatched under the given Delivery instance and settles the
+    * delivery.
     *
     * This method allows for the session that is used in the accept to be specified by the
-    * caller.  This allows for an accepted message to be involved in a transaction that is
-    * being managed by some other session other than the one that created this receiver.
+    * caller. This allows for an accepted message to be involved in a transaction that is being
+    * managed by some other session other than the one that created this receiver.
     *
     * @param delivery
     *        the Delivery instance to accept.
     * @param session
     *        the session under which the message is being accepted.
     *
-    * @throws IOException if an error occurs while sending the accept.
+    * @throws IOException
+    *         if an error occurs while sending the accept.
     */
    public void accept(final Delivery delivery, final AmqpSession session) throws IOException {
       accept(delivery, session, true);
@@ -466,8 +491,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     * Accepts a message that was dispatched under the given Delivery instance.
     *
     * This method allows for the session that is used in the accept to be specified by the
-    * caller.  This allows for an accepted message to be involved in a transaction that is
-    * being managed by some other session other than the one that created this receiver.
+    * caller. This allows for an accepted message to be involved in a transaction that is being
+    * managed by some other session other than the one that created this receiver.
     *
     * @param delivery
     *        the Delivery instance to accept.
@@ -476,7 +501,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     * @param settle
     *        true if the receiver should settle the delivery or just send the disposition.
     *
-    * @throws IOException if an error occurs while sending the accept.
+    * @throws IOException
+    *         if an error occurs while sending the accept.
     */
    public void accept(final Delivery delivery, final AmqpSession session, final boolean settle) throws IOException {
       checkClosed();
@@ -532,14 +558,16 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    /**
     * Mark a message that was dispatched under the given Delivery instance as Modified.
     *
-    * @param delivery the Delivery instance to mark modified.
-    * @param deliveryFailed indicates that the delivery failed for some reason.
-    * @param undeliverableHere marks the delivery as not being able to be process by link it was sent to.
-    * @throws IOException if an error occurs while sending the reject.
+    * @param delivery
+    *        the Delivery instance to mark modified.
+    * @param deliveryFailed
+    *        indicates that the delivery failed for some reason.
+    * @param undeliverableHere
+    *        marks the delivery as not being able to be process by link it was sent to.
+    * @throws IOException
+    *         if an error occurs while sending the reject.
     */
-   public void modified(final Delivery delivery,
-                        final Boolean deliveryFailed,
-                        final Boolean undeliverableHere) throws IOException {
+   public void modified(final Delivery delivery, final Boolean deliveryFailed, final Boolean undeliverableHere) throws IOException {
       checkClosed();
 
       if (delivery == null) {
@@ -574,8 +602,10 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    /**
     * Release a message that was dispatched under the given Delivery instance.
     *
-    * @param delivery the Delivery instance to release.
-    * @throws IOException if an error occurs while sending the release.
+    * @param delivery
+    *        the Delivery instance to release.
+    * @throws IOException
+    *         if an error occurs while sending the release.
     */
    public void release(final Delivery delivery) throws IOException {
       checkClosed();
@@ -609,8 +639,10 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    /**
     * Reject a message that was dispatched under the given Delivery instance.
     *
-    * @param delivery the Delivery instance to reject.
-    * @throws IOException if an error occurs while sending the release.
+    * @param delivery
+    *        the Delivery instance to reject.
+    * @throws IOException
+    *         if an error occurs while sending the release.
     */
    public void reject(final Delivery delivery) throws IOException {
       checkClosed();
@@ -648,7 +680,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
       return new UnmodifiableReceiver(getEndpoint());
    }
 
-   //----- Receiver configuration properties --------------------------------//
+   // ----- Receiver configuration properties --------------------------------//
 
    public boolean isPresettle() {
       return presettle;
@@ -690,7 +722,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
       return session.getConnection().getDrainTimeout();
    }
 
-   //----- Internal implementation ------------------------------------------//
+   // ----- Internal implementation ------------------------------------------//
 
    @Override
    protected void doOpen() {
@@ -802,7 +834,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
 
    protected void configureSource(Source source) {
       Map<Symbol, DescribedType> filters = new HashMap<>();
-      Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};
+      Symbol[] outcomes = new Symbol[] {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};
 
       if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) {
          source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
@@ -868,6 +900,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    }
 
    private void processDelivery(Delivery incoming) throws Exception {
+      doDeliveryInspection(incoming);
+
       Message message = null;
       try {
          message = decodeIncomingMessage(incoming);
@@ -890,6 +924,14 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
       }
    }
 
+   private void doDeliveryInspection(Delivery delivery) {
+      try {
+         getStateInspector().inspectDelivery(getReceiver(), delivery);
+      } catch (Throwable error) {
+         getStateInspector().markAsInvalid(error.getMessage());
+      }
+   }
+
    @Override
    public void processFlowUpdates(AmqpConnection connection) throws IOException {
       if (pullRequest != null || stopRequest != null) {
@@ -1013,7 +1055,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
       }
    }
 
-   //----- Internal Transaction state callbacks -----------------------------//
+   // ----- Internal Transaction state callbacks -----------------------------//
 
    void preCommit() {
    }
@@ -1027,7 +1069,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    void postRollback() {
    }
 
-   //----- Inner classes used in message pull operations --------------------//
+   // ----- Inner classes used in message pull operations --------------------//
 
    protected static final class ScheduledRequest implements AsyncResult {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index c9bc0d6..03bd28e 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -56,7 +56,7 @@ import org.slf4j.LoggerFactory;
 public class AmqpSender extends AmqpAbstractResource<Sender> {
 
    private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class);
-   private static final byte[] EMPTY_BYTE_ARRAY = new byte[]{};
+   private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
 
    public static final long DEFAULT_SEND_TIMEOUT = 15000;
 
@@ -126,9 +126,12 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
    /**
     * Create a new sender instance using the given Target when creating the link.
     *
-    * @param session  The parent session that created the session.
-    * @param target   The target that this sender produces to.
-    * @param senderId The unique ID assigned to this sender.
+    * @param session
+    *        The parent session that created the session.
+    * @param target
+    *        The target that this sender produces to.
+    * @param senderId
+    *        The unique ID assigned to this sender.
     */
    public AmqpSender(AmqpSession session, Target target, String senderId) {
 
@@ -147,8 +150,10 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
    /**
     * Sends the given message to this senders assigned address.
     *
-    * @param message the message to send.
-    * @throws IOException if an error occurs during the send.
+    * @param message
+    *        the message to send.
+    * @throws IOException
+    *         if an error occurs during the send.
     */
    public void send(final AmqpMessage message) throws IOException {
       checkClosed();
@@ -156,11 +161,15 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
    }
 
    /**
-    * Sends the given message to this senders assigned address using the supplied transaction ID.
+    * Sends the given message to this senders assigned address using the supplied transaction
+    * ID.
     *
-    * @param message the message to send.
-    * @param txId    the transaction ID to assign the outgoing send.
-    * @throws IOException if an error occurs during the send.
+    * @param message
+    *        the message to send.
+    * @param txId
+    *        the transaction ID to assign the outgoing send.
+    * @throws IOException
+    *         if an error occurs during the send.
     */
    public void send(final AmqpMessage message, final AmqpTransactionId txId) throws IOException {
       checkClosed();
@@ -188,10 +197,11 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
    }
 
    /**
-    * Close the sender, a closed sender will throw exceptions if any further send
-    * calls are made.
+    * Close the sender, a closed sender will throw exceptions if any further send calls are
+    * made.
     *
-    * @throws IOException if an error occurs while closing the sender.
+    * @throws IOException
+    *         if an error occurs while closing the sender.
     */
    public void close() throws IOException {
       if (closed.compareAndSet(false, true)) {
@@ -231,7 +241,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
       return address;
    }
 
-   //----- Sender configuration ---------------------------------------------//
+   // ----- Sender configuration ---------------------------------------------//
 
    /**
     * @return will messages be settle on send.
@@ -243,7 +253,8 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
    /**
     * Configure is sent messages are marked as settled on send, defaults to false.
     *
-    * @param presettle configure if this sender will presettle all sent messages.
+    * @param presettle
+    *        configure if this sender will presettle all sent messages.
     */
    public void setPresettle(boolean presettle) {
       this.presettle = presettle;
@@ -259,13 +270,13 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
    /**
     * Sets the amount of time the sender will block on a send before failing.
     *
-    * @param sendTimeout time in milliseconds to wait.
+    * @param sendTimeout
+    *        time in milliseconds to wait.
     */
    public void setSendTimeout(long sendTimeout) {
       this.sendTimeout = sendTimeout;
    }
 
-
    public void setDesiredCapabilities(Symbol[] desiredCapabilities) {
       if (getEndpoint() != null) {
          throw new IllegalStateException("Endpoint already established");
@@ -290,7 +301,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
       this.properties = properties;
    }
 
-   //----- Private Sender implementation ------------------------------------//
+   // ----- Private Sender implementation ------------------------------------//
 
    private void checkClosed() {
       if (isClosed()) {
@@ -301,7 +312,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
    @Override
    protected void doOpen() {
 
-      Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
+      Symbol[] outcomes = new Symbol[] {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
       Source source = new Source();
       source.setAddress(senderId);
       source.setOutcomes(outcomes);
@@ -384,6 +395,14 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
       }
    }
 
+   protected void doDeliveryUpdateInspection(Delivery delivery) {
+      try {
+         getStateInspector().inspectDeliveryUpdate(getSender(), delivery);
+      } catch (Throwable error) {
+         getStateInspector().markAsInvalid(error.getMessage());
+      }
+   }
+
    @Override
    protected Exception getOpenAbortException() {
       // Verify the attach response contained a non-null target
@@ -470,6 +489,8 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
             continue;
          }
 
+         doDeliveryUpdateInspection(delivery);
+
          Outcome outcome = null;
          if (state instanceof TransactionalState) {
             LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state);
@@ -516,8 +537,6 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
          tagGenerator.returnTag(delivery.getTag());
          delivery.settle();
          toRemove.add(delivery);
-
-         doDeliveryUpdate(delivery);
       }
 
       pending.removeAll(toRemove);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
index 236dc60..291c690 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -72,7 +72,11 @@ public class AmqpValidator {
 
    }
 
-   public void inspectDeliveryUpdate(Delivery delivery) {
+   public void inspectDelivery(Receiver receiver, Delivery delivery) {
+
+   }
+
+   public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
 
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 02b1b99..b8d3b1c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -16,19 +16,28 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
-import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.jms.server.JMSServerManager;
-import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -44,16 +53,26 @@ import org.junit.Before;
  */
 public class AmqpClientTestSupport extends AmqpTestSupport {
 
-   protected static Symbol SHARED = Symbol.getSymbol("shared");
-   protected static Symbol GLOBAL = Symbol.getSymbol("global");
+   protected static final Symbol SHARED = Symbol.getSymbol("shared");
+   protected static final Symbol GLOBAL = Symbol.getSymbol("global");
+
+   protected static final String BROKER_NAME = "localhost";
+
+   protected String guestUser = "guest";
+   protected String guestPass = "guest";
+
+   protected String fullUser = "user";
+   protected String fullPass = "pass";
 
-   protected JMSServerManager serverManager;
    protected ActiveMQServer server;
 
+   protected MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
+
    @Before
    @Override
    public void setUp() throws Exception {
       super.setUp();
+
       server = createServer();
    }
 
@@ -69,18 +88,13 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
       }
       connections.clear();
 
-      if (serverManager != null) {
-         try {
-            serverManager.stop();
-         } catch (Throwable ignored) {
-            ignored.printStackTrace();
+      try {
+         if (server != null) {
+            server.stop();
          }
-         serverManager = null;
+      } finally {
+         super.tearDown();
       }
-
-      server.stop();
-
-      super.tearDown();
    }
 
    protected boolean isAutoCreateQueues() {
@@ -91,6 +105,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
       return true;
    }
 
+   protected boolean isSecurityEnabled() {
+      return false;
+   }
+
    protected String getDeadLetterAddress() {
       return "ActiveMQ.DLQ";
    }
@@ -99,48 +117,129 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
       return 10;
    }
 
+   public URI getBrokerOpenWireConnectionURI() {
+      return getBrokerAmqpConnectionURI();
+   }
+
    protected ActiveMQServer createServer() throws Exception {
-      ActiveMQServer server = createServer(true, true);
-      serverManager = new JMSServerManagerImpl(server);
-      Configuration serverConfig = server.getConfiguration();
-
-      // Address 1
-      CoreAddressConfiguration address = new CoreAddressConfiguration();
-      address.setName(getQueueName()).getRoutingTypes().add(RoutingType.ANYCAST);
-      CoreQueueConfiguration queueConfig = new CoreQueueConfiguration();
-      queueConfig.setName(getQueueName()).setAddress(getQueueName()).setRoutingType(RoutingType.ANYCAST);
-      address.getQueueConfigurations().add(queueConfig);
-      serverConfig.addAddressConfiguration(address);
-
-      // Address 1....N
-      for (int i = 0; i < getPrecreatedQueueSize(); ++i) {
-         CoreAddressConfiguration address2 = new CoreAddressConfiguration();
-         address2.setName(getQueueName(i)).getRoutingTypes().add(RoutingType.ANYCAST);
-         CoreQueueConfiguration queueConfig2 = new CoreQueueConfiguration();
-         queueConfig2.setName(getQueueName(i)).setAddress(getQueueName(i)).setRoutingType(RoutingType.ANYCAST);
-         address2.getQueueConfigurations().add(queueConfig2);
-         serverConfig.addAddressConfiguration(address2);
-      }
+      return createServer(AMQP_PORT);
+   }
+
+   protected ActiveMQServer createServer(int port) throws Exception {
+
+      final ActiveMQServer server = this.createServer(true, true);
+
+      server.getConfiguration().getAcceptorConfigurations().clear();
+      server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(server, port));
+      server.getConfiguration().setName(BROKER_NAME);
+      server.getConfiguration().setJournalDirectory(server.getConfiguration().getJournalDirectory() + port);
+      server.getConfiguration().setBindingsDirectory(server.getConfiguration().getBindingsDirectory() + port);
+      server.getConfiguration().setPagingDirectory(server.getConfiguration().getPagingDirectory() + port);
+      server.getConfiguration().setJMXManagementEnabled(true);
+      server.getConfiguration().setMessageExpiryScanPeriod(5000);
+      server.setMBeanServer(mBeanServer);
+
+      // Add any additional Acceptors needed for tests
+      addAdditionalAcceptors(server);
 
       // Address configuration
+      configureAddressPolicy(server);
+
+      // Add optional security for tests that need it
+      configureBrokerSecurity(server);
+
+      server.start();
+
+      // Prepare all addresses and queues for client tests.
+      createAddressAndQueues(server);
+
+      return server;
+   }
+
+   protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) {
+      HashMap<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, getConfiguredProtocols());
+
+      HashMap<String, Object> amqpParams = new HashMap<>();
+      configureAMQPAcceptorParameters(amqpParams);
+
+      return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "netty-acceptor", amqpParams);
+   }
+
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE";
+   }
+
+   protected void configureAddressPolicy(ActiveMQServer server) {
+      // Address configuration
       AddressSettings addressSettings = new AddressSettings();
 
+      addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
       addressSettings.setAutoCreateQueues(isAutoCreateQueues());
       addressSettings.setAutoCreateAddresses(isAutoCreateQueues());
-      addressSettings.setDeadLetterAddress(new SimpleString(getDeadLetterAddress()));
+      addressSettings.setDeadLetterAddress(SimpleString.toSimpleString(getDeadLetterAddress()));
+      addressSettings.setExpiryAddress(SimpleString.toSimpleString(getDeadLetterAddress()));
 
-      serverConfig.getAddressesSettings().put("#", addressSettings);
-      serverConfig.setSecurityEnabled(false);
-      Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations();
+      server.getConfiguration().getAddressesSettings().put("#", addressSettings);
+      Set<TransportConfiguration> acceptors = server.getConfiguration().getAcceptorConfigurations();
       for (TransportConfiguration tc : acceptors) {
-         if (tc.getName().equals("netty")) {
+         if (tc.getName().equals("netty-acceptor")) {
             tc.getExtraParams().put("anycastPrefix", "anycast://");
             tc.getExtraParams().put("multicastPrefix", "multicast://");
          }
       }
-      serverManager.start();
-      server.start();
-      return server;
+   }
+
+   protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
+      // Default DLQ
+      server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST));
+      server.createQueue(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST, SimpleString.toSimpleString(getQueueName()), null, true, false, -1, false, true);
+
+      // Default Queue
+      server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getDeadLetterAddress()), RoutingType.ANYCAST));
+      server.createQueue(SimpleString.toSimpleString(getDeadLetterAddress()), RoutingType.ANYCAST, SimpleString.toSimpleString(getDeadLetterAddress()), null, true, false, -1, false, true);
+
+      // Default Topic
+      server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTopicName()), RoutingType.MULTICAST));
+      server.createQueue(SimpleString.toSimpleString(getTopicName()), RoutingType.MULTICAST, SimpleString.toSimpleString(getTopicName()), null, true, false, -1, false, true);
+
+      // Additional Test Queues
+      for (int i = 0; i < getPrecreatedQueueSize(); ++i) {
+         server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName(i)), RoutingType.ANYCAST));
+         server.createQueue(SimpleString.toSimpleString(getQueueName(i)), RoutingType.ANYCAST, SimpleString.toSimpleString(getQueueName(i)), null, true, false, -1, false, true);
+      }
+   }
+
+   protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
+      // None by default
+   }
+
+   protected void configureBrokerSecurity(ActiveMQServer server) {
+      if (isSecurityEnabled()) {
+         ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
+
+         // User additions
+         securityManager.getConfiguration().addUser(guestUser, guestPass);
+         securityManager.getConfiguration().addRole(guestUser, "guest");
+         securityManager.getConfiguration().addUser(fullUser, fullPass);
+         securityManager.getConfiguration().addRole(fullUser, "full");
+
+         // Configure roles
+         HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
+         HashSet<Role> value = new HashSet<>();
+         value.add(new Role("guest", false, true, true, true, true, true, true, true));
+         value.add(new Role("full", true, true, true, true, true, true, true, true));
+         securityRepository.addMatch(getQueueName(), value);
+
+         server.getConfiguration().setSecurityEnabled(true);
+      } else {
+         server.getConfiguration().setSecurityEnabled(false);
+      }
+   }
+
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      // None by default
    }
 
    public Queue getProxyToQueue(String queueName) {
@@ -151,6 +250,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
       return getName();
    }
 
+   public String getTopicName() {
+      return getName() + "-Topic";
+   }
+
    public String getQueueName() {
       return getName();
    }
@@ -166,17 +269,45 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
       this.useSSL = useSSL;
    }
 
-   protected void sendMessages(int numMessages, String address) throws Exception {
+   protected void sendMessages(String destinationName, int count) throws Exception {
+      sendMessages(destinationName, count, null);
+   }
+
+   protected void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
-      AmqpSession session = connection.createSession();
-      AmqpSender sender = session.createSender(address);
-      for (int i = 0; i < numMessages; i++) {
-         AmqpMessage message = new AmqpMessage();
-         message.setText("message-" +  i);
-         sender.send(message);
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(destinationName);
+
+         for (int i = 0; i < count; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("MessageID:" + i);
+            if (routingType != null) {
+               message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType());
+            }
+            sender.send(message);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   protected void sendMessages(String destinationName, int count, boolean durable) throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(destinationName);
+
+         for (int i = 0; i < count; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("MessageID:" + i);
+            message.setDurable(durable);
+            sender.send(message);
+         }
+      } finally {
+         connection.close();
       }
-      sender.close();
-      connection.connect();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
index 4b9213b..e8c4432 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
@@ -29,6 +29,7 @@ import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -43,7 +44,7 @@ import org.junit.Test;
  * Test that the broker can pass through an AMQP message with a described type in the message
  * body regardless of transformer in use.
  */
-public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
+public class AmqpDescribedTypePayloadTest extends JMSClientTestSupport {
 
    @Test(timeout = 60000)
    public void testSendMessageWithDescribedTypeInBody() throws Exception {
@@ -87,7 +88,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
       Queue queue = getProxyToQueue(getQueueName());
       assertEquals(1, queue.getMessageCount());
 
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(getBrokerOpenWireConnectionURI());
       Connection jmsConnection = factory.createConnection();
       try {
          Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -118,10 +119,10 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
       sender.close();
 
       Queue queue = getProxyToQueue(getQueueName());
-      assertEquals(1, queue.getMessageCount());
+      assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
 
-      // Receive and resend with OpenWire JMS client
-      JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
+      // Receive and resend with Qpid JMS client
+      JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
       Connection jmsConnection = factory.createConnection();
       try {
          Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
index 19d4847..d659306 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
@@ -24,8 +24,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpFrameValidator;
@@ -53,13 +51,6 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
 
    private final String SELECTOR_STRING = "color = red";
 
-   @Override
-   public void setUp() throws Exception {
-      super.setUp();
-      server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTopicName()), RoutingType.MULTICAST));
-      server.createQueue(new SimpleString(getTopicName()), RoutingType.MULTICAST, new SimpleString(getTopicName()), null, true, false);
-   }
-
    @Test(timeout = 60000)
    public void testCreateDurableReceiver() throws Exception {
 
@@ -365,15 +356,11 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
       return null;
    }
 
-   public String getContainerID() {
+   private String getContainerID() {
       return "myContainerID";
    }
 
-   public String getSubscriptionName() {
+   private String getSubscriptionName() {
       return "mySubscription";
    }
-
-   public String getTopicName() {
-      return "myTopic";
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
index 440de12..42bc160 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
@@ -18,13 +18,16 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
@@ -55,7 +58,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
       AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
       assertNull(received);
 
-      assertEquals(1, queueView.getMessagesExpired());
+      assertTrue("Message should have expired", Wait.waitFor(() -> queueView.getMessagesExpired() == 1));
 
       connection.close();
    }
@@ -119,7 +122,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
       AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
       assertNull(received);
 
-      assertEquals(1, queueView.getMessagesExpired());
+      assertTrue("Message should have expired", Wait.waitFor(() -> queueView.getMessagesExpired() == 1));
 
       connection.close();
    }
@@ -154,7 +157,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
       AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
       assertNull(received);
 
-      assertEquals(1, queueView.getMessagesExpired());
+      assertTrue("Message should have expired", Wait.waitFor(() -> queueView.getMessagesExpired() == 1));
 
       connection.close();
    }
@@ -253,8 +256,64 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
       AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
       assertNull(received);
 
-      assertEquals(1, queueView.getMessagesExpired());
+      assertTrue("Message should have expired", Wait.waitFor(() -> queueView.getMessagesExpired() == 1));
 
       connection.close();
    }
+
+   @Test(timeout = 60000)
+   public void testExpiredMessageLandsInDLQ() throws Throwable {
+      internalSendExpiry(false);
+   }
+
+   @Test(timeout = 60000)
+   public void testExpiredMessageLandsInDLQAndExistsAfterRestart() throws Throwable {
+      internalSendExpiry(true);
+   }
+
+   public void internalSendExpiry(boolean restartServer) throws Throwable {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = client.connect();
+
+      try {
+
+         // Normal Session which won't create an TXN itself
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getQueueName());
+
+         AmqpMessage message = new AmqpMessage();
+         message.setDurable(true);
+         message.setText("Test-Message");
+         message.setDeliveryAnnotation("shouldDisappear", 1);
+         message.setAbsoluteExpiryTime(System.currentTimeMillis() + 1000);
+         sender.send(message);
+
+         org.apache.activemq.artemis.core.server.Queue dlq = server.locateQueue(SimpleString.toSimpleString(getDeadLetterAddress()));
+
+         assertTrue("Message not movied to DLQ", Wait.waitFor(() -> dlq.getMessageCount() > 0, 5000, 500));
+
+         connection.close();
+
+         if (restartServer) {
+            server.stop();
+            server.start();
+         }
+
+         connection = client.connect();
+         session = connection.createSession();
+
+         // Read all messages from the Queue
+         AmqpReceiver receiver = session.createReceiver(getDeadLetterAddress());
+         receiver.flow(20);
+
+         message = receiver.receive(5, TimeUnit.SECONDS);
+         Assert.assertNotNull(message);
+         Assert.assertEquals(getQueueName(), message.getMessageAnnotation(org.apache.activemq.artemis.api.core.Message.HDR_ORIGINAL_ADDRESS.toString()));
+         Assert.assertNull(message.getDeliveryAnnotation("shouldDisappear"));
+         Assert.assertNull(receiver.receiveNoWait());
+
+      } finally {
+         connection.close();
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
new file mode 100644
index 0000000..03db6d7
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AmqpFlowControlTest extends JMSClientTestSupport {
+
+   private static final long MAX_SIZE_BYTES = 1 * 1024 * 1024;
+   private static final long MAX_SIZE_BYTES_REJECT_THRESHOLD = 2 * 1024 * 1024;
+
+   private String singleCreditAcceptorURI = new String("tcp://localhost:" + (AMQP_PORT + 8));
+   private int messagesSent;
+
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+
+      messagesSent = 0;
+   }
+
+   @Override
+   protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
+      server.getConfiguration().addAcceptorConfiguration("flow", singleCreditAcceptorURI + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpMinCredits=1");
+   }
+
+   @Override
+   protected void configureAddressPolicy(ActiveMQServer server) {
+      // For BLOCK tests
+      AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
+      addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+      addressSettings.setMaxSizeBytes(MAX_SIZE_BYTES);
+      addressSettings.setMaxSizeBytesRejectThreshold(MAX_SIZE_BYTES_REJECT_THRESHOLD);
+      server.getAddressSettingsRepository().addMatch("#", addressSettings);
+   }
+
+   @Test(timeout = 60000)
+   public void testCreditsAreAllocatedOnceOnLinkCreated() throws Exception {
+      AmqpClient client = createAmqpClient(new URI(singleCreditAcceptorURI));
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getQueueName());
+         assertEquals("Should only be issued one credit", 1, sender.getSender().getCredit());
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
+      AmqpClient client = createAmqpClient(new URI(singleCreditAcceptorURI));
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getQueueName());
+
+         // Use blocking send to ensure buffered messages do not interfere with credit.
+         sender.setSendTimeout(-1);
+         sendUntilFull(sender);
+
+         // This should be -1. A single message is buffered in the client, and 0 credit has been allocated.
+         assertTrue(sender.getSender().getCredit() == -1);
+
+         long addressSize = server.getPagingManager().getPageStore(new SimpleString(getQueueName())).getAddressSize();
+         assertTrue(addressSize >= MAX_SIZE_BYTES && addressSize <= MAX_SIZE_BYTES_REJECT_THRESHOLD);
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
+      Connection connection = createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Destination d = session.createQueue(getQueueName());
+      MessageProducer p = session.createProducer(d);
+
+      fillAddress(getQueueName());
+
+      Exception e = null;
+      try {
+         p.send(session.createBytesMessage());
+      } catch (ResourceAllocationException rae) {
+         e = rae;
+      }
+      assertTrue(e instanceof ResourceAllocationException);
+      assertTrue(e.getMessage().contains("resource-limit-exceeded"));
+
+      long addressSize = server.getPagingManager().getPageStore(new SimpleString(getQueueName())).getAddressSize();
+      assertTrue(addressSize >= MAX_SIZE_BYTES_REJECT_THRESHOLD);
+   }
+
+   @Test(timeout = 60000)
+   public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception {
+      fillAddress(getQueueName());
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getQueueName());
+
+         // Wait for a potential flow frame.
+         Thread.sleep(500);
+         assertEquals(0, sender.getSender().getCredit());
+
+         // Empty Address except for 1 message used later.
+         AmqpReceiver receiver = session.createReceiver(getQueueName());
+         receiver.flow(100);
+
+         AmqpMessage m;
+         for (int i = 0; i < messagesSent - 1; i++) {
+            m = receiver.receive(5000, TimeUnit.MILLISECONDS);
+            m.accept();
+         }
+
+         // Wait for address to unblock and flow frame to arrive
+         Thread.sleep(500);
+
+         assertTrue(sender.getSender().getCredit() >= 0);
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws Exception {
+      fillAddress(getQueueName());
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getQueueName());
+
+         // Wait for a potential flow frame.
+         Thread.sleep(1000);
+         assertEquals(0, sender.getSender().getCredit());
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testTxIsRolledBackOnRejectedPreSettledMessage() throws Throwable {
+
+      // Create the link attach before filling the address to ensure the link is allocated credit.
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+
+      AmqpSession session = connection.createSession();
+      AmqpSender sender = session.createSender(getQueueName());
+      sender.setPresettle(true);
+
+      fillAddress(getQueueName());
+
+      final AmqpMessage message = new AmqpMessage();
+      byte[] payload = new byte[50 * 1024];
+      message.setBytes(payload);
+
+      Exception expectedException = null;
+      try {
+         session.begin();
+         sender.send(message);
+         session.commit();
+      } catch (Exception e) {
+         expectedException = e;
+      } finally {
+         connection.close();
+      }
+
+      assertNotNull(expectedException);
+      assertTrue(expectedException.getMessage().contains("resource-limit-exceeded"));
+      assertTrue(expectedException.getMessage().contains("Address is full: " + getQueueName()));
+   }
+
+   /*
+    * Fills an address.  Careful when using this method.  Only use when rejected messages are switched on.
+    */
+   private void fillAddress(String address) throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      Exception exception = null;
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(address);
+         sendUntilFull(sender);
+      } catch (Exception e) {
+         exception = e;
+      } finally {
+         connection.close();
+      }
+
+      // Should receive a rejected error
+      assertNotNull(exception);
+      assertTrue(exception.getMessage().contains("amqp:resource-limit-exceeded"));
+   }
+
+   private void sendUntilFull(final AmqpSender sender) throws Exception {
+      final AmqpMessage message = new AmqpMessage();
+      byte[] payload = new byte[50 * 1024];
+      message.setBytes(payload);
+
+      final int maxMessages = 50;
+      final AtomicInteger sentMessages = new AtomicInteger(0);
+      final Exception[] errors = new Exception[1];
+      final CountDownLatch timeout = new CountDownLatch(1);
+
+      Runnable sendMessages = new Runnable() {
+         @Override
+         public void run() {
+            try {
+               for (int i = 0; i < maxMessages; i++) {
+                  sender.send(message);
+                  System.out.println("Sent " + i);
+                  sentMessages.getAndIncrement();
+               }
+               timeout.countDown();
+            } catch (IOException e) {
+               errors[0] = e;
+            }
+         }
+      };
+
+      Thread t = new Thread(sendMessages);
+
+      try {
+         t.start();
+
+         timeout.await(1, TimeUnit.SECONDS);
+
+         messagesSent = sentMessages.get();
+         if (errors[0] != null) {
+            throw errors[0];
+         }
+      } finally {
+         t.interrupt();
+         t.join(1000);
+         Assert.assertFalse(t.isAlive());
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
new file mode 100644
index 0000000..65b0f7f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.HashMap;
+
+import javax.jms.Connection;
+import javax.jms.InvalidDestinationException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.CompositeAddress;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
+
+   private SimpleString anycastAddress = new SimpleString("address.anycast");
+   private SimpleString multicastAddress = new SimpleString("address.multicast");
+
+   private SimpleString anycastQ1 = new SimpleString("q1");
+   private SimpleString anycastQ2 = new SimpleString("q2");
+   private SimpleString anycastQ3 = new SimpleString("q3");
+
+   private ServerLocator locator;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      locator = createNettyNonHALocator();
+   }
+
+   @Override
+   protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
+      server.getConfiguration().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, new HashMap<String, Object>(), "netty", new HashMap<String, Object>()));
+   }
+
+   @Test(timeout = 60000)
+   //there isn't much use of FQQN for topics
+   //however we can test query functionality
+   public void testTopic() throws Exception {
+
+      Connection connection = createConnection(false);
+      try {
+         connection.setClientID("FQQNconn");
+         connection.start();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(multicastAddress.toString());
+
+         MessageConsumer consumer1 = session.createConsumer(topic);
+         MessageConsumer consumer2 = session.createConsumer(topic);
+         MessageConsumer consumer3 = session.createConsumer(topic);
+
+         MessageProducer producer = session.createProducer(topic);
+
+         producer.send(session.createMessage());
+
+         //each consumer receives one
+         Message m = consumer1.receive(2000);
+         assertNotNull(m);
+         m = consumer2.receive(2000);
+         assertNotNull(m);
+         m = consumer3.receive(2000);
+         assertNotNull(m);
+
+         Bindings bindings = server.getPostOffice().getBindingsForAddress(multicastAddress);
+         for (Binding b : bindings.getBindings()) {
+            System.out.println("checking binidng " + b.getUniqueName() + " " + ((LocalQueueBinding)b).getQueue().getDeliveringMessages());
+            SimpleString qName = b.getUniqueName();
+            //do FQQN query
+            QueueQueryResult result = server.queueQuery(CompositeAddress.toFullQN(multicastAddress, qName));
+            assertTrue(result.isExists());
+            assertEquals(result.getName(), CompositeAddress.toFullQN(multicastAddress, qName));
+            //do qname query
+            result = server.queueQuery(qName);
+            assertTrue(result.isExists());
+            assertEquals(result.getName(), qName);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test
+   public void testQueue() throws Exception {
+      server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);
+      server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ2, null, true, false, -1, false, true);
+      server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ3, null, true, false, -1, false, true);
+
+      Connection connection = createConnection();
+      try {
+         connection.start();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         javax.jms.Queue q1 = session.createQueue(CompositeAddress.toFullQN(anycastAddress, anycastQ1).toString());
+         javax.jms.Queue q2 = session.createQueue(CompositeAddress.toFullQN(anycastAddress, anycastQ2).toString());
+         javax.jms.Queue q3 = session.createQueue(CompositeAddress.toFullQN(anycastAddress, anycastQ3).toString());
+
+         //send 3 messages to anycastAddress
+         ClientSessionFactory cf = createSessionFactory(locator);
+         ClientSession coreSession = cf.createSession();
+
+         //send 3 messages
+         ClientProducer coreProducer = coreSession.createProducer(anycastAddress);
+         sendMessages(coreSession, coreProducer, 3);
+
+         MessageConsumer consumer1 = session.createConsumer(q1);
+         MessageConsumer consumer2 = session.createConsumer(q2);
+         MessageConsumer consumer3 = session.createConsumer(q3);
+
+         //each consumer receives one
+         assertNotNull(consumer1.receive(2000));
+         assertNotNull(consumer2.receive(2000));
+         assertNotNull(consumer3.receive(2000));
+
+         Queue queue1 = getProxyToQueue(anycastQ1.toString());
+         assertTrue("Message not consumed on Q1", Wait.waitFor(() -> queue1.getMessageCount() == 0));
+         Queue queue2 = getProxyToQueue(anycastQ2.toString());
+         assertTrue("Message not consumed on Q2", Wait.waitFor(() -> queue2.getMessageCount() == 0));
+         Queue queue3 = getProxyToQueue(anycastQ3.toString());
+         assertTrue("Message not consumed on Q3", Wait.waitFor(() -> queue3.getMessageCount() == 0));
+
+         connection.close();
+         //queues are empty now
+         for (SimpleString q : new SimpleString[]{anycastQ1, anycastQ2, anycastQ3}) {
+            //FQQN query
+            final QueueQueryResult query = server.queueQuery(CompositeAddress.toFullQN(anycastAddress, q));
+            assertTrue(query.isExists());
+            assertEquals(anycastAddress, query.getAddress());
+            assertEquals(CompositeAddress.toFullQN(anycastAddress, q), query.getName());
+            assertEquals("Message not consumed", 0, query.getMessageCount());
+            //try query again using qName
+            QueueQueryResult qNameQuery = server.queueQuery(q);
+            assertEquals(q, qNameQuery.getName());
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test
+   public void testQueueSpecial() throws Exception {
+      server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);
+
+      Connection connection = createConnection();
+      try {
+         connection.start();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         //::queue ok!
+         String specialName = CompositeAddress.toFullQN(new SimpleString(""), anycastQ1).toString();
+         javax.jms.Queue q1 = session.createQueue(specialName);
+
+         ClientSessionFactory cf = createSessionFactory(locator);
+         ClientSession coreSession = cf.createSession();
+
+         ClientProducer coreProducer = coreSession.createProducer(anycastAddress);
+         sendMessages(coreSession, coreProducer, 1);
+
+         System.out.println("create consumer: " + q1);
+         MessageConsumer consumer1 = session.createConsumer(q1);
+
+         assertNotNull(consumer1.receive(2000));
+
+         //queue::
+         specialName = CompositeAddress.toFullQN(anycastQ1, new SimpleString("")).toString();
+         q1 = session.createQueue(specialName);
+         try {
+            session.createConsumer(q1);
+            fail("should get exception");
+         } catch (InvalidDestinationException e) {
+            //expected
+         }
+
+         //::
+         specialName = CompositeAddress.toFullQN(new SimpleString(""), new SimpleString("")).toString();
+         q1 = session.createQueue(specialName);
+         try {
+            session.createConsumer(q1);
+            fail("should get exception");
+         } catch (InvalidDestinationException e) {
+            //expected
+         }
+      } finally {
+         connection.close();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java
index 79bdf59..93ceeb3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.VersionLoader;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -53,7 +54,27 @@ public class AmqpInboundConnectionTest extends AmqpClientTestSupport {
    private static final String BROKER_NAME = "localhost";
    private static final String PRODUCT_NAME = "apache-activemq-artemis";
 
-   @Test
+   @Test(timeout = 60000)
+   public void testCloseIsSentOnConnectionClose() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection amqpConnection = client.connect();
+
+      try {
+         for (RemotingConnection connection : server.getRemotingService().getConnections()) {
+            server.getRemotingService().removeConnection(connection);
+            connection.disconnect(true);
+         }
+
+         Wait.waitFor(amqpConnection::isClosed);
+
+         assertTrue(amqpConnection.isClosed());
+         assertEquals(AmqpSupport.CONNECTION_FORCED, amqpConnection.getConnection().getRemoteCondition().getCondition());
+      } finally {
+         amqpConnection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
    public void testBrokerContainerId() throws Exception {
       AmqpClient client = createAmqpClient();
       assertNotNull(client);
@@ -77,7 +98,7 @@ public class AmqpInboundConnectionTest extends AmqpClientTestSupport {
       }
    }
 
-   @Test
+   @Test(timeout = 60000)
    public void testBrokerConnectionProperties() throws Exception {
       AmqpClient client = createAmqpClient();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java
index c84a590..9b8ddfe 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java
@@ -31,7 +31,7 @@ import org.junit.Test;
 
 public class AmqpManagementTest extends AmqpClientTestSupport {
 
-   @Test
+   @Test(timeout = 60000)
    public void testManagementQueryOverAMQP() throws Throwable {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());


Mime
View raw message