activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer
Date Tue, 03 Dec 2019 16:11:20 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bd7105  ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address
full to a sending peer
     new e86df5a  This closes #2847
7bd7105 is described below

commit 7bd710520d8faf4393a86dd4cc0f09fa627000f8
Author: Keith Wall <kwall@apache.org>
AuthorDate: Fri Sep 20 20:39:10 2019 +0100

    ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending
peer
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |   7 +-
 .../amqp/broker/ProtonProtocolManager.java         |  20 +++
 .../artemis/protocol/amqp/proton/AmqpSupport.java  |   3 +
 .../amqp/proton/ProtonServerReceiverContext.java   |  80 +++++++--
 .../proton/ProtonServerReceiverContextTest.java    | 111 ++++++++++++
 .../activemq/transport/amqp/client/AmqpSender.java |  19 ++-
 .../transport/amqp/client/AmqpSession.java         |  28 ++-
 .../integration/amqp/AmqpFlowControlFailTest.java  | 188 +++++++++++++++------
 8 files changed, 378 insertions(+), 78 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 4b2b669..a65361d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -440,6 +440,7 @@ public class AMQPSessionCallback implements SessionCallback {
          // Anonymous relay must set a To value
          address = message.getAddressSimpleString();
          if (address == null) {
+            // Errors are not currently handled as required by AMQP 1.0 anonterm-v1.0
             rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message
sent to an anonymous producer");
             return;
          }
@@ -457,14 +458,14 @@ public class AMQPSessionCallback implements SessionCallback {
          PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
          if (store != null && store.isRejectingMessages()) {
             // We drop pre-settled messages (and abort any associated Tx)
+            String amqpAddress = delivery.getLink().getTarget().getAddress();
+            ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address
is full: " + amqpAddress);
             if (delivery.remotelySettled()) {
                if (transaction != null) {
-                  String amqpAddress = delivery.getLink().getTarget().getAddress();
-                  ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address
is full: " + amqpAddress);
                   transaction.markAsRollbackOnly(e);
                }
             } else {
-               rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full:
" + address);
+               throw e;
             }
          } else {
             serverSend(context, transaction, message, delivery, receiver, routingContext);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 5b9aa38..bab27d1 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -69,6 +69,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
 
    private int amqpLowCredits = AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
 
+   private boolean amqpUseModifiedForTransientDeliveryErrors = AmqpSupport.AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
+
    private int initialRemoteMaxFrameSize = 4 * 1024;
 
    private String[] saslMechanisms = MechanismFinder.getKnownMechanisms();
@@ -293,4 +295,22 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
       this.initialRemoteMaxFrameSize = initialRemoteMaxFrameSize;
    }
 
+   /**
+    * Returns true if transient delivery errors should be handled with a Modified disposition
+    * (if permitted by link)
+    */
+   public boolean isUseModifiedForTransientDeliveryErrors() {
+      return this.amqpUseModifiedForTransientDeliveryErrors;
+   }
+
+   /**
+    * Sets if transient delivery errors should be handled with a Modified disposition
+    * (if permitted by link)
+    */
+   public ProtonProtocolManager setAmqpUseModifiedForTransientDeliveryErrors(boolean amqpUseModifiedForTransientDeliveryErrors)
{
+      this.amqpUseModifiedForTransientDeliveryErrors = amqpUseModifiedForTransientDeliveryErrors;
+      return this;
+   }
+
+
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index e929406..6d19724 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -32,6 +32,9 @@ public class AmqpSupport {
    public static final int AMQP_CREDITS_DEFAULT = 1000;
    public static final int AMQP_LOW_CREDITS_DEFAULT = 300;
 
+   // Defaults for controlling the behaviour of AMQP dispositions
+   public static final boolean AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS = false;
+
    // Identification values used to locating JMS selector types.
    public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L);
    public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string");
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 1446373..44b97cf 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -20,6 +20,8 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -40,10 +42,14 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.codec.ReadableBuffer;
@@ -77,6 +83,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
     * We set it as ran as the first one should always go through
     */
    protected final AtomicRunnable creditRunnable;
+   private final boolean useModified;
 
    /**
     * This Credit Runnable may be used in Mock tests to simulate the credit semantic here
@@ -125,6 +132,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       this.amqpCredits = connection.getAmqpCredits();
       this.minCreditRefresh = connection.getAmqpLowCredits();
       this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver,
connection).setRan();
+      useModified = this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
    }
 
    @Override
@@ -304,20 +312,11 @@ public class ProtonServerReceiverContext extends ProtonInitializable
implements
          sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(),
data, routingContext);
       } catch (Exception e) {
          log.warn(e.getMessage(), e);
-         Rejected rejected = new Rejected();
-         ErrorCondition condition = new ErrorCondition();
-
-         if (e instanceof ActiveMQSecurityException) {
-            condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
-         } else {
-            condition.setCondition(Symbol.valueOf("failed"));
-         }
+         DeliveryState deliveryState = determineDeliveryState(((Source) receiver.getSource()),
+                                                              useModified,
+                                                              e);
          connection.runLater(() -> {
-
-            condition.setDescription(e.getMessage());
-            rejected.setError(condition);
-
-            delivery.disposition(rejected);
+            delivery.disposition(deliveryState);
             delivery.settle();
             flow();
             connection.flush();
@@ -326,6 +325,50 @@ public class ProtonServerReceiverContext extends ProtonInitializable
implements
       }
    }
 
+   private DeliveryState determineDeliveryState(final Source source, final boolean useModified,
final Exception e) {
+      Outcome defaultOutcome = getEffectiveDefaultOutcome(source);
+
+      if (isAddressFull(e) && useModified &&
+          (outcomeSupported(source, Modified.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof
Modified)) {
+         Modified modified = new Modified();
+         modified.setDeliveryFailed(true);
+         return modified;
+      } else {
+         if (outcomeSupported(source, Rejected.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof
Rejected) {
+            return createRejected(e);
+         } else if (source.getDefaultOutcome() instanceof DeliveryState) {
+            return ((DeliveryState) source.getDefaultOutcome());
+         } else {
+            // The AMQP specification requires that Accepted is returned for this case. However
there exist
+            // implementations that set neither outcomes/default-outcome but use/expect for
full range of outcomes.
+            // To maintain compatibility with these implementations, we maintain previous
behaviour.
+            return createRejected(e);
+         }
+      }
+   }
+
+   private boolean isAddressFull(final Exception e) {
+      return e instanceof ActiveMQException && ActiveMQExceptionType.ADDRESS_FULL.equals(((ActiveMQException)
e).getType());
+   }
+
+   private Rejected createRejected(final Exception e) {
+      ErrorCondition condition = new ErrorCondition();
+
+      // Set condition
+      if (e instanceof ActiveMQSecurityException) {
+         condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
+      } else if (isAddressFull(e)) {
+         condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
+      } else {
+         condition.setCondition(Symbol.valueOf("failed"));
+      }
+      condition.setDescription(e.getMessage());
+
+      Rejected rejected = new Rejected();
+      rejected.setError(condition);
+      return rejected;
+   }
+
    @Override
    public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
       protonSession.removeReceiver(receiver);
@@ -375,4 +418,15 @@ public class ProtonServerReceiverContext extends ProtonInitializable
implements
    public boolean isDraining() {
       return receiver.draining();
    }
+
+   private boolean outcomeSupported(final Source source, final Symbol outcome) {
+      if (source != null && source.getOutcomes() != null) {
+         return Arrays.asList(( source).getOutcomes()).contains(outcome);
+      }
+      return false;
+   }
+
+   private Outcome getEffectiveDefaultOutcome(final Source source) {
+      return (source.getOutcomes() == null || source.getOutcomes().length == 0) ? source.getDefaultOutcome()
: null;
+   }
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
index a157ef1..571ca92 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
@@ -16,16 +16,43 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.junit.Test;
+import org.mockito.stubbing.Answer;
 
 public class ProtonServerReceiverContextTest {
 
@@ -39,6 +66,44 @@ public class ProtonServerReceiverContextTest {
       doOnMessageWithAbortedDeliveryTestImpl(true);
    }
 
+   @Test
+   public void addressFull_SourceSupportsModified() throws Exception {
+      doOnMessageWithDeliveryException(asList(Rejected.DESCRIPTOR_SYMBOL,
+                                              Accepted.DESCRIPTOR_SYMBOL,
+                                              Modified.DESCRIPTOR_SYMBOL),
+                                       null, new ActiveMQAddressFullException(),
+                                       Modified.class);
+   }
+
+   @Test
+   public void addressFull_SourceDoesNotSupportModified() throws Exception {
+      doOnMessageWithDeliveryException(asList(Rejected.DESCRIPTOR_SYMBOL,
+                                              Accepted.DESCRIPTOR_SYMBOL),
+                                       null, new ActiveMQAddressFullException(),
+                                       Rejected.class);
+   }
+
+   @Test
+   public void otherFailure_SourceSupportsRejects() throws Exception {
+      doOnMessageWithDeliveryException(asList(Rejected.DESCRIPTOR_SYMBOL,
+                                              Accepted.DESCRIPTOR_SYMBOL,
+                                              Modified.DESCRIPTOR_SYMBOL),
+                                       null, new ActiveMQException(),
+                                       Rejected.class);
+   }
+
+   @Test
+   public void otherFailure_SourceDoesNotSupportReject() throws Exception {
+      doOnMessageWithDeliveryException(singletonList(Accepted.DESCRIPTOR_SYMBOL),
+                                       Accepted.getInstance(), new ActiveMQException(),
+                                       Accepted.class);
+      // violates AMQP specification - see explanation ProtonServerReceiverContext.determineDeliveryState
+      doOnMessageWithDeliveryException(singletonList(Accepted.DESCRIPTOR_SYMBOL),
+                                       null,
+                                       new ActiveMQException(),
+                                       Rejected.class);
+   }
+
    private void doOnMessageWithAbortedDeliveryTestImpl(boolean drain) throws ActiveMQAMQPException
{
       Receiver mockReceiver = mock(Receiver.class);
       AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
@@ -46,6 +111,8 @@ public class ProtonServerReceiverContextTest {
       when(mockConnContext.getAmqpCredits()).thenReturn(100);
       when(mockConnContext.getAmqpLowCredits()).thenReturn(30);
 
+      when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class));
+
       ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext,
null, mockReceiver);
 
       Delivery mockDelivery = mock(Delivery.class);
@@ -72,4 +139,48 @@ public class ProtonServerReceiverContextTest {
       verifyNoMoreInteractions(mockReceiver);
    }
 
+   private void doOnMessageWithDeliveryException(List<Symbol> sourceSymbols,
+                                                 Outcome defaultOutcome, Exception deliveryException,
+                                                 Class<? extends DeliveryState> expectedDeliveryState)
throws Exception {
+      AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
+      doAnswer((Answer<Void>) invocation -> {
+         Runnable runnable = invocation.getArgument(0);
+         runnable.run();
+         return null;
+      }).when(mockConnContext).runLater(any(Runnable.class));
+      ProtonProtocolManager mockProtocolManager = mock(ProtonProtocolManager.class);
+      when(mockProtocolManager.isUseModifiedForTransientDeliveryErrors()).thenReturn(true);
+      when(mockConnContext.getProtocolManager()).thenReturn(mockProtocolManager);
+
+
+      AMQPSessionCallback mockSession = mock(AMQPSessionCallback.class);
+
+      Receiver mockReceiver = mock(Receiver.class);
+      ProtonServerReceiverContext rc = new ProtonServerReceiverContext(mockSession, mockConnContext,
null, mockReceiver);
+
+      Delivery mockDelivery = mock(Delivery.class);
+      when(mockDelivery.getLink()).thenReturn(mockReceiver);
+
+      when(mockReceiver.current()).thenReturn(mockDelivery);
+      Source source = new Source();
+      source.setOutcomes(sourceSymbols.toArray(new Symbol[]{}));
+      source.setDefaultOutcome(defaultOutcome);
+      when(mockReceiver.getSource()).thenReturn(source);
+
+      doThrow(deliveryException).when(mockSession)
+                                .serverSend(eq(rc),
+                                            nullable(Transaction.class),
+                                            eq(mockReceiver),
+                                            eq(mockDelivery),
+                                            nullable(SimpleString.class),
+                                            anyInt(),
+                                            nullable(ReadableBuffer.class),
+                                            any(RoutingContext.class));
+
+      rc.onMessage(mockDelivery);
+
+      verify(mockDelivery, times(1)).settle();
+      verify(mockDelivery, times(1)).disposition(any(expectedDeliveryState));
+   }
+
 }
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index 703d489..eed7cf2 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -59,6 +59,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
 
    public static final long DEFAULT_SEND_TIMEOUT = 15000;
+   public static final Symbol[] DEFAULT_OUTCOMES = {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
 
    private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true);
    private final AtomicBoolean closed = new AtomicBoolean();
@@ -70,6 +71,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
    private final Target userSpecifiedTarget;
    private final SenderSettleMode userSpecifiedSenderSettlementMode;
    private final ReceiverSettleMode userSpecifiedReceiverSettlementMode;
+   private final Symbol[] outcomes;
 
    private boolean presettle;
    private long sendTimeout = DEFAULT_SEND_TIMEOUT;
@@ -92,7 +94,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
     *        The unique ID assigned to this sender.
     */
    public AmqpSender(AmqpSession session, String address, String senderId) {
-      this(session, address, senderId, null, null);
+      this(session, address, senderId, null, null, DEFAULT_OUTCOMES);
    }
 
    /**
@@ -108,8 +110,15 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
     *        The {@link SenderSettleMode} to use on open.
     * @param receiverMode
     *        The {@link ReceiverSettleMode} to use on open.
+    * @param outcomes
+    *        The outcomes to use on open
     */
-   public AmqpSender(AmqpSession session, String address, String senderId, SenderSettleMode
senderMode, ReceiverSettleMode receiverMode) {
+   public AmqpSender(AmqpSession session,
+                     String address,
+                     String senderId,
+                     SenderSettleMode senderMode,
+                     ReceiverSettleMode receiverMode,
+                     Symbol[] outcomes) {
 
       if (address != null && address.isEmpty()) {
          throw new IllegalArgumentException("Address cannot be empty.");
@@ -121,6 +130,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
       this.userSpecifiedTarget = null;
       this.userSpecifiedSenderSettlementMode = senderMode;
       this.userSpecifiedReceiverSettlementMode = receiverMode;
+      this.outcomes = outcomes;
    }
 
    /**
@@ -145,6 +155,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
       this.userSpecifiedTarget = target;
       this.userSpecifiedSenderSettlementMode = null;
       this.userSpecifiedReceiverSettlementMode = null;
+      outcomes = DEFAULT_OUTCOMES;
    }
 
    /**
@@ -311,11 +322,9 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
 
    @Override
    protected void doOpen() {
-
-      Symbol[] outcomes = new Symbol[] {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
       Source source = new Source();
       source.setAddress(senderId);
-      source.setOutcomes(outcomes);
+      source.setOutcomes(this.outcomes);
 
       Target target = userSpecifiedTarget;
       if (target == null) {
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 53d45e3..ff23e67 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -189,10 +189,34 @@ public class AmqpSession extends AmqpAbstractResource<Session>
{
     *
     * @throws Exception if an error occurs while creating the sender.
     */
-   public AmqpSender createSender(final String address, final SenderSettleMode senderMode,
ReceiverSettleMode receiverMode) throws Exception {
+   public AmqpSender createSender(final String address,
+                                  final SenderSettleMode senderMode,
+                                  ReceiverSettleMode receiverMode) throws Exception {
+      return createSender(address, senderMode, receiverMode, AmqpSender.DEFAULT_OUTCOMES);
+   }
+
+   /**
+    * Create a sender instance using the given address
+    *
+    * @param address
+    *        the address to which the sender will produce its messages.
+    * @param senderSettlementMode
+    *        controls the settlement mode used by the created Sender
+    * @param receiverSettlementMode
+    *        controls the desired settlement mode used by the remote Receiver
+    * @param outcomes
+    *        specifies the outcomes supported by the sender
+    *
+    * @return a newly created sender that is ready for use.
+    *
+    * @throws Exception if an error occurs while creating the sender.
+    */
+   public AmqpSender createSender(final String address,
+                                  final SenderSettleMode senderMode,
+                                  ReceiverSettleMode receiverMode, final Symbol[] outcomes)
throws Exception {
       checkClosed();
 
-      final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(),
senderMode, receiverMode);
+      final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(),
senderMode, receiverMode, outcomes);
       final ClientFuture request = new ClientFuture();
 
       connection.getScheduler().execute(new Runnable() {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
index c6119a1..42fb5f3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
@@ -27,70 +27,148 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
 
-public class AmqpFlowControlFailTest extends JMSClientTestSupport {
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+
+@RunWith(Enclosed.class)
+public class AmqpFlowControlFailTest {
+
+   @RunWith(Parameterized.class)
+   public static class AmqpFlowControlFailDispositionTests extends JMSClientTestSupport {
+
+      @Parameterized.Parameter()
+      public boolean useModified;
+
+      @Parameterized.Parameter(1)
+      public Symbol[] outcomes;
+
+      @Parameterized.Parameter(2)
+      public String expectedMessage;
+
+
+      @Parameterized.Parameters(name = "useModified={0}")
+      public static Collection<Object[]> parameters() {
+         return Arrays.asList(new Object[][] {
+               {true, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
Modified.DESCRIPTOR_SYMBOL}, "failure at remote"},
+               {true, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL},
"[condition = amqp:resource-limit-exceeded]"},
+               {false, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
Modified.DESCRIPTOR_SYMBOL}, "[condition = amqp:resource-limit-exceeded]"},
+               {false, new Symbol[]{}, "[condition = amqp:resource-limit-exceeded]"}
+         });
+      }
+
+      @Override
+      protected void configureAddressPolicy(ActiveMQServer server) {
+         AmqpFlowControlFailTest.configureAddressPolicy(server);
+      }
+
+      @Override
+      protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+         params.put("amqpUseModifiedForTransientDeliveryErrors", useModified);
+      }
 
-   @Override
-   protected void configureAddressPolicy(ActiveMQServer server) {
-      // For BLOCK tests
-      AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
-      addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
-      addressSettings.setMaxSizeBytes(1000);
-     // addressSettings.setMaxSizeBytesRejectThreshold(MAX_SIZE_BYTES_REJECT_THRESHOLD);
-      server.getAddressSettingsRepository().addMatch("#", addressSettings);
-   }
 
-   @Test(timeout = 60000)
-   public void testMesagesNotSent() throws Exception {
-      AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
-      AmqpConnection connection = addConnection(client.connect());
-      int messagesSent = 0;
-      try {
-         AmqpSession session = connection.createSession();
-         AmqpSender sender = session.createSender(getQueueName());
-         boolean rejected = false;
-         for (int i = 0; i < 1000; i++) {
-            final AmqpMessage message = new AmqpMessage();
-            byte[] payload = new byte[10];
-            message.setBytes(payload);
-            try {
-               sender.send(message);
-               messagesSent++;
-               System.out.println("message = " + message);
-            } catch (IOException e) {
-               rejected = true;
+      @Test(timeout = 60000)
+      public void testAddressFullDisposition() throws Exception {
+         AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
+         AmqpConnection connection = addConnection(client.connect());
+         try {
+            AmqpSession session = connection.createSession();
+            AmqpSender sender = session.createSender(getQueueName(), null, null, outcomes);
+            boolean rejected = false;
+            for (int i = 0; i < 1000; i++) {
+               final AmqpMessage message = new AmqpMessage();
+               byte[] payload = new byte[10];
+               message.setBytes(payload);
+               try {
+                  sender.send(message);
+               } catch (IOException e) {
+                  rejected = true;
+                  assertTrue(String.format("Unexpected message expected %s to contain %s",
e.getMessage(), expectedMessage),
+                             e.getMessage().contains(expectedMessage));
+               }
             }
+
+            assertTrue("Expected messages to be refused by broker", rejected);
+         } finally {
+            connection.close();
          }
-         assertTrue(rejected);
-         rejected = false;
-         assertEquals(0, sender.getSender().getCredit());
-         AmqpSession session2 = connection.createSession();
-         AmqpReceiver receiver = session2.createReceiver(getQueueName());
-         receiver.flow(messagesSent);
-         for (int i = 0; i < messagesSent; i++) {
-            AmqpMessage receive = receiver.receive();
-            receive.accept();
-         }
-         receiver.close();
-         session2.close();
-
-         Wait.assertEquals(1000, sender.getSender()::getCredit);
-         for (int i = 0; i < 1000; i++) {
-            final AmqpMessage message = new AmqpMessage();
-            byte[] payload = new byte[100];
-            message.setBytes(payload);
-            try {
-               sender.send(message);
-            } catch (IOException e) {
-               rejected = true;
+      }
+   }
+
+   public static class AmqpFlowControlFailOrdinaryTests extends JMSClientTestSupport {
+
+      @Override
+      protected void configureAddressPolicy(ActiveMQServer server) {
+         AmqpFlowControlFailTest.configureAddressPolicy(server);
+      }
+
+      @Test(timeout = 60000)
+      public void testMesagesNotSent() throws Exception {
+         AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
+         AmqpConnection connection = addConnection(client.connect());
+         int messagesSent = 0;
+         try {
+            AmqpSession session = connection.createSession();
+            AmqpSender sender = session.createSender(getQueueName());
+            boolean rejected = false;
+            for (int i = 0; i < 1000; i++) {
+               final AmqpMessage message = new AmqpMessage();
+               byte[] payload = new byte[10];
+               message.setBytes(payload);
+               try {
+                  sender.send(message);
+                  messagesSent++;
+               } catch (IOException e) {
+                  rejected = true;
+               }
+            }
+            assertTrue(rejected);
+            rejected = false;
+            assertEquals(0, sender.getSender().getCredit());
+            AmqpSession session2 = connection.createSession();
+            AmqpReceiver receiver = session2.createReceiver(getQueueName());
+            receiver.flow(messagesSent);
+            for (int i = 0; i < messagesSent; i++) {
+               AmqpMessage receive = receiver.receive();
+               receive.accept();
             }
+            receiver.close();
+            session2.close();
+
+            Wait.assertEquals(1000, sender.getSender()::getCredit);
+            for (int i = 0; i < 1000; i++) {
+               final AmqpMessage message = new AmqpMessage();
+               byte[] payload = new byte[100];
+               message.setBytes(payload);
+               try {
+                  sender.send(message);
+               } catch (IOException e) {
+                  rejected = true;
+               }
+            }
+            assertTrue(rejected);
+            assertEquals(0, sender.getSender().getCredit());
+         } finally {
+            connection.close();
          }
-         assertTrue(rejected);
-         assertEquals(0, sender.getSender().getCredit());
-      } finally {
-         connection.close();
       }
    }
+
+   private static void configureAddressPolicy(final ActiveMQServer server) {
+      AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
+      addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+      addressSettings.setMaxSizeBytes(1000);
+      server.getAddressSettingsRepository().addMatch("#", addressSettings);
+   }
 }


Mime
View raw message