From commits-return-57958-archive-asf-public=cust-asf.ponee.io@activemq.apache.org Tue Dec 3 16:11:21 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 37ABC180629 for ; Tue, 3 Dec 2019 17:11:21 +0100 (CET) Received: (qmail 17567 invoked by uid 500); 3 Dec 2019 16:11:20 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 17558 invoked by uid 99); 3 Dec 2019 16:11:20 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Dec 2019 16:11:20 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 5EA8B8D811; Tue, 3 Dec 2019 16:11:20 +0000 (UTC) Date: Tue, 03 Dec 2019 16:11:20 +0000 To: "commits@activemq.apache.org" Subject: [activemq-artemis] branch master updated: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157538948008.18723.13666276572801240464@gitbox.apache.org> From: clebertsuconic@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: activemq-artemis X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 92f87feab99acdffa1d2fdef18374d620f92b457 X-Git-Newrev: 7bd710520d8faf4393a86dd4cc0f09fa627000f8 X-Git-Rev: 7bd710520d8faf4393a86dd4cc0f09fa627000f8 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git The following commit(s) were added to refs/heads/master by this push: new 7bd7105 ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer new e86df5a This closes #2847 7bd7105 is described below commit 7bd710520d8faf4393a86dd4cc0f09fa627000f8 Author: Keith Wall AuthorDate: Fri Sep 20 20:39:10 2019 +0100 ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer --- .../protocol/amqp/broker/AMQPSessionCallback.java | 7 +- .../amqp/broker/ProtonProtocolManager.java | 20 +++ .../artemis/protocol/amqp/proton/AmqpSupport.java | 3 + .../amqp/proton/ProtonServerReceiverContext.java | 80 +++++++-- .../proton/ProtonServerReceiverContextTest.java | 111 ++++++++++++ .../activemq/transport/amqp/client/AmqpSender.java | 19 ++- .../transport/amqp/client/AmqpSession.java | 28 ++- .../integration/amqp/AmqpFlowControlFailTest.java | 188 +++++++++++++++------ 8 files changed, 378 insertions(+), 78 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 4b2b669..a65361d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -440,6 +440,7 @@ public class AMQPSessionCallback implements SessionCallback { // Anonymous relay must set a To value address = message.getAddressSimpleString(); if (address == null) { + // Errors are not currently handled as required by AMQP 1.0 anonterm-v1.0 rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer"); return; } @@ -457,14 +458,14 @@ public class AMQPSessionCallback implements SessionCallback { PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString()); if (store != null && store.isRejectingMessages()) { // We drop pre-settled messages (and abort any associated Tx) + String amqpAddress = delivery.getLink().getTarget().getAddress(); + ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress); if (delivery.remotelySettled()) { if (transaction != null) { - String amqpAddress = delivery.getLink().getTarget().getAddress(); - ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress); transaction.markAsRollbackOnly(e); } } else { - rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address); + throw e; } } else { serverSend(context, transaction, message, delivery, receiver, routingContext); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 5b9aa38..bab27d1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -69,6 +69,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager { - - condition.setDescription(e.getMessage()); - rejected.setError(condition); - - delivery.disposition(rejected); + delivery.disposition(deliveryState); delivery.settle(); flow(); connection.flush(); @@ -326,6 +325,50 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } } + private DeliveryState determineDeliveryState(final Source source, final boolean useModified, final Exception e) { + Outcome defaultOutcome = getEffectiveDefaultOutcome(source); + + if (isAddressFull(e) && useModified && + (outcomeSupported(source, Modified.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Modified)) { + Modified modified = new Modified(); + modified.setDeliveryFailed(true); + return modified; + } else { + if (outcomeSupported(source, Rejected.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Rejected) { + return createRejected(e); + } else if (source.getDefaultOutcome() instanceof DeliveryState) { + return ((DeliveryState) source.getDefaultOutcome()); + } else { + // The AMQP specification requires that Accepted is returned for this case. However there exist + // implementations that set neither outcomes/default-outcome but use/expect for full range of outcomes. + // To maintain compatibility with these implementations, we maintain previous behaviour. + return createRejected(e); + } + } + } + + private boolean isAddressFull(final Exception e) { + return e instanceof ActiveMQException && ActiveMQExceptionType.ADDRESS_FULL.equals(((ActiveMQException) e).getType()); + } + + private Rejected createRejected(final Exception e) { + ErrorCondition condition = new ErrorCondition(); + + // Set condition + if (e instanceof ActiveMQSecurityException) { + condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS); + } else if (isAddressFull(e)) { + condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED); + } else { + condition.setCondition(Symbol.valueOf("failed")); + } + condition.setDescription(e.getMessage()); + + Rejected rejected = new Rejected(); + rejected.setError(condition); + return rejected; + } + @Override public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { protonSession.removeReceiver(receiver); @@ -375,4 +418,15 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements public boolean isDraining() { return receiver.draining(); } + + private boolean outcomeSupported(final Source source, final Symbol outcome) { + if (source != null && source.getOutcomes() != null) { + return Arrays.asList(( source).getOutcomes()).contains(outcome); + } + return false; + } + + private Outcome getEffectiveDefaultOutcome(final Source source) { + return (source.getOutcomes() == null || source.getOutcomes().length == 0) ? source.getDefaultOutcome() : null; + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java index a157ef1..571ca92 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java @@ -16,16 +16,43 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.util.List; + +import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; + +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Outcome; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; import org.junit.Test; +import org.mockito.stubbing.Answer; public class ProtonServerReceiverContextTest { @@ -39,6 +66,44 @@ public class ProtonServerReceiverContextTest { doOnMessageWithAbortedDeliveryTestImpl(true); } + @Test + public void addressFull_SourceSupportsModified() throws Exception { + doOnMessageWithDeliveryException(asList(Rejected.DESCRIPTOR_SYMBOL, + Accepted.DESCRIPTOR_SYMBOL, + Modified.DESCRIPTOR_SYMBOL), + null, new ActiveMQAddressFullException(), + Modified.class); + } + + @Test + public void addressFull_SourceDoesNotSupportModified() throws Exception { + doOnMessageWithDeliveryException(asList(Rejected.DESCRIPTOR_SYMBOL, + Accepted.DESCRIPTOR_SYMBOL), + null, new ActiveMQAddressFullException(), + Rejected.class); + } + + @Test + public void otherFailure_SourceSupportsRejects() throws Exception { + doOnMessageWithDeliveryException(asList(Rejected.DESCRIPTOR_SYMBOL, + Accepted.DESCRIPTOR_SYMBOL, + Modified.DESCRIPTOR_SYMBOL), + null, new ActiveMQException(), + Rejected.class); + } + + @Test + public void otherFailure_SourceDoesNotSupportReject() throws Exception { + doOnMessageWithDeliveryException(singletonList(Accepted.DESCRIPTOR_SYMBOL), + Accepted.getInstance(), new ActiveMQException(), + Accepted.class); + // violates AMQP specification - see explanation ProtonServerReceiverContext.determineDeliveryState + doOnMessageWithDeliveryException(singletonList(Accepted.DESCRIPTOR_SYMBOL), + null, + new ActiveMQException(), + Rejected.class); + } + private void doOnMessageWithAbortedDeliveryTestImpl(boolean drain) throws ActiveMQAMQPException { Receiver mockReceiver = mock(Receiver.class); AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class); @@ -46,6 +111,8 @@ public class ProtonServerReceiverContextTest { when(mockConnContext.getAmqpCredits()).thenReturn(100); when(mockConnContext.getAmqpLowCredits()).thenReturn(30); + when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class)); + ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver); Delivery mockDelivery = mock(Delivery.class); @@ -72,4 +139,48 @@ public class ProtonServerReceiverContextTest { verifyNoMoreInteractions(mockReceiver); } + private void doOnMessageWithDeliveryException(List sourceSymbols, + Outcome defaultOutcome, Exception deliveryException, + Class expectedDeliveryState) throws Exception { + AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class); + doAnswer((Answer) invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(mockConnContext).runLater(any(Runnable.class)); + ProtonProtocolManager mockProtocolManager = mock(ProtonProtocolManager.class); + when(mockProtocolManager.isUseModifiedForTransientDeliveryErrors()).thenReturn(true); + when(mockConnContext.getProtocolManager()).thenReturn(mockProtocolManager); + + + AMQPSessionCallback mockSession = mock(AMQPSessionCallback.class); + + Receiver mockReceiver = mock(Receiver.class); + ProtonServerReceiverContext rc = new ProtonServerReceiverContext(mockSession, mockConnContext, null, mockReceiver); + + Delivery mockDelivery = mock(Delivery.class); + when(mockDelivery.getLink()).thenReturn(mockReceiver); + + when(mockReceiver.current()).thenReturn(mockDelivery); + Source source = new Source(); + source.setOutcomes(sourceSymbols.toArray(new Symbol[]{})); + source.setDefaultOutcome(defaultOutcome); + when(mockReceiver.getSource()).thenReturn(source); + + doThrow(deliveryException).when(mockSession) + .serverSend(eq(rc), + nullable(Transaction.class), + eq(mockReceiver), + eq(mockDelivery), + nullable(SimpleString.class), + anyInt(), + nullable(ReadableBuffer.class), + any(RoutingContext.class)); + + rc.onMessage(mockDelivery); + + verify(mockDelivery, times(1)).settle(); + verify(mockDelivery, times(1)).disposition(any(expectedDeliveryState)); + } + } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 703d489..eed7cf2 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -59,6 +59,7 @@ public class AmqpSender extends AmqpAbstractResource { private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; public static final long DEFAULT_SEND_TIMEOUT = 15000; + public static final Symbol[] DEFAULT_OUTCOMES = {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL}; private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true); private final AtomicBoolean closed = new AtomicBoolean(); @@ -70,6 +71,7 @@ public class AmqpSender extends AmqpAbstractResource { private final Target userSpecifiedTarget; private final SenderSettleMode userSpecifiedSenderSettlementMode; private final ReceiverSettleMode userSpecifiedReceiverSettlementMode; + private final Symbol[] outcomes; private boolean presettle; private long sendTimeout = DEFAULT_SEND_TIMEOUT; @@ -92,7 +94,7 @@ public class AmqpSender extends AmqpAbstractResource { * The unique ID assigned to this sender. */ public AmqpSender(AmqpSession session, String address, String senderId) { - this(session, address, senderId, null, null); + this(session, address, senderId, null, null, DEFAULT_OUTCOMES); } /** @@ -108,8 +110,15 @@ public class AmqpSender extends AmqpAbstractResource { * The {@link SenderSettleMode} to use on open. * @param receiverMode * The {@link ReceiverSettleMode} to use on open. + * @param outcomes + * The outcomes to use on open */ - public AmqpSender(AmqpSession session, String address, String senderId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) { + public AmqpSender(AmqpSession session, + String address, + String senderId, + SenderSettleMode senderMode, + ReceiverSettleMode receiverMode, + Symbol[] outcomes) { if (address != null && address.isEmpty()) { throw new IllegalArgumentException("Address cannot be empty."); @@ -121,6 +130,7 @@ public class AmqpSender extends AmqpAbstractResource { this.userSpecifiedTarget = null; this.userSpecifiedSenderSettlementMode = senderMode; this.userSpecifiedReceiverSettlementMode = receiverMode; + this.outcomes = outcomes; } /** @@ -145,6 +155,7 @@ public class AmqpSender extends AmqpAbstractResource { this.userSpecifiedTarget = target; this.userSpecifiedSenderSettlementMode = null; this.userSpecifiedReceiverSettlementMode = null; + outcomes = DEFAULT_OUTCOMES; } /** @@ -311,11 +322,9 @@ public class AmqpSender extends AmqpAbstractResource { @Override protected void doOpen() { - - Symbol[] outcomes = new Symbol[] {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL}; Source source = new Source(); source.setAddress(senderId); - source.setOutcomes(outcomes); + source.setOutcomes(this.outcomes); Target target = userSpecifiedTarget; if (target == null) { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 53d45e3..ff23e67 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -189,10 +189,34 @@ public class AmqpSession extends AmqpAbstractResource { * * @throws Exception if an error occurs while creating the sender. */ - public AmqpSender createSender(final String address, final SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception { + public AmqpSender createSender(final String address, + final SenderSettleMode senderMode, + ReceiverSettleMode receiverMode) throws Exception { + return createSender(address, senderMode, receiverMode, AmqpSender.DEFAULT_OUTCOMES); + } + + /** + * Create a sender instance using the given address + * + * @param address + * the address to which the sender will produce its messages. + * @param senderSettlementMode + * controls the settlement mode used by the created Sender + * @param receiverSettlementMode + * controls the desired settlement mode used by the remote Receiver + * @param outcomes + * specifies the outcomes supported by the sender + * + * @return a newly created sender that is ready for use. + * + * @throws Exception if an error occurs while creating the sender. + */ + public AmqpSender createSender(final String address, + final SenderSettleMode senderMode, + ReceiverSettleMode receiverMode, final Symbol[] outcomes) throws Exception { checkClosed(); - final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(), senderMode, receiverMode); + final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(), senderMode, receiverMode, outcomes); final ClientFuture request = new ClientFuture(); connection.getScheduler().execute(new Runnable() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java index c6119a1..42fb5f3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java @@ -27,70 +27,148 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; -public class AmqpFlowControlFailTest extends JMSClientTestSupport { +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Rejected; + +@RunWith(Enclosed.class) +public class AmqpFlowControlFailTest { + + @RunWith(Parameterized.class) + public static class AmqpFlowControlFailDispositionTests extends JMSClientTestSupport { + + @Parameterized.Parameter() + public boolean useModified; + + @Parameterized.Parameter(1) + public Symbol[] outcomes; + + @Parameterized.Parameter(2) + public String expectedMessage; + + + @Parameterized.Parameters(name = "useModified={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {true, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}, "failure at remote"}, + {true, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL}, "[condition = amqp:resource-limit-exceeded]"}, + {false, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}, "[condition = amqp:resource-limit-exceeded]"}, + {false, new Symbol[]{}, "[condition = amqp:resource-limit-exceeded]"} + }); + } + + @Override + protected void configureAddressPolicy(ActiveMQServer server) { + AmqpFlowControlFailTest.configureAddressPolicy(server); + } + + @Override + protected void configureAMQPAcceptorParameters(Map params) { + params.put("amqpUseModifiedForTransientDeliveryErrors", useModified); + } - @Override - protected void configureAddressPolicy(ActiveMQServer server) { - // For BLOCK tests - AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#"); - addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); - addressSettings.setMaxSizeBytes(1000); - // addressSettings.setMaxSizeBytesRejectThreshold(MAX_SIZE_BYTES_REJECT_THRESHOLD); - server.getAddressSettingsRepository().addMatch("#", addressSettings); - } - @Test(timeout = 60000) - public void testMesagesNotSent() throws Exception { - AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI()); - AmqpConnection connection = addConnection(client.connect()); - int messagesSent = 0; - try { - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getQueueName()); - boolean rejected = false; - for (int i = 0; i < 1000; i++) { - final AmqpMessage message = new AmqpMessage(); - byte[] payload = new byte[10]; - message.setBytes(payload); - try { - sender.send(message); - messagesSent++; - System.out.println("message = " + message); - } catch (IOException e) { - rejected = true; + @Test(timeout = 60000) + public void testAddressFullDisposition() throws Exception { + AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI()); + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getQueueName(), null, null, outcomes); + boolean rejected = false; + for (int i = 0; i < 1000; i++) { + final AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[10]; + message.setBytes(payload); + try { + sender.send(message); + } catch (IOException e) { + rejected = true; + assertTrue(String.format("Unexpected message expected %s to contain %s", e.getMessage(), expectedMessage), + e.getMessage().contains(expectedMessage)); + } } + + assertTrue("Expected messages to be refused by broker", rejected); + } finally { + connection.close(); } - assertTrue(rejected); - rejected = false; - assertEquals(0, sender.getSender().getCredit()); - AmqpSession session2 = connection.createSession(); - AmqpReceiver receiver = session2.createReceiver(getQueueName()); - receiver.flow(messagesSent); - for (int i = 0; i < messagesSent; i++) { - AmqpMessage receive = receiver.receive(); - receive.accept(); - } - receiver.close(); - session2.close(); - - Wait.assertEquals(1000, sender.getSender()::getCredit); - for (int i = 0; i < 1000; i++) { - final AmqpMessage message = new AmqpMessage(); - byte[] payload = new byte[100]; - message.setBytes(payload); - try { - sender.send(message); - } catch (IOException e) { - rejected = true; + } + } + + public static class AmqpFlowControlFailOrdinaryTests extends JMSClientTestSupport { + + @Override + protected void configureAddressPolicy(ActiveMQServer server) { + AmqpFlowControlFailTest.configureAddressPolicy(server); + } + + @Test(timeout = 60000) + public void testMesagesNotSent() throws Exception { + AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI()); + AmqpConnection connection = addConnection(client.connect()); + int messagesSent = 0; + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getQueueName()); + boolean rejected = false; + for (int i = 0; i < 1000; i++) { + final AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[10]; + message.setBytes(payload); + try { + sender.send(message); + messagesSent++; + } catch (IOException e) { + rejected = true; + } + } + assertTrue(rejected); + rejected = false; + assertEquals(0, sender.getSender().getCredit()); + AmqpSession session2 = connection.createSession(); + AmqpReceiver receiver = session2.createReceiver(getQueueName()); + receiver.flow(messagesSent); + for (int i = 0; i < messagesSent; i++) { + AmqpMessage receive = receiver.receive(); + receive.accept(); } + receiver.close(); + session2.close(); + + Wait.assertEquals(1000, sender.getSender()::getCredit); + for (int i = 0; i < 1000; i++) { + final AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[100]; + message.setBytes(payload); + try { + sender.send(message); + } catch (IOException e) { + rejected = true; + } + } + assertTrue(rejected); + assertEquals(0, sender.getSender().getCredit()); + } finally { + connection.close(); } - assertTrue(rejected); - assertEquals(0, sender.getSender().getCredit()); - } finally { - connection.close(); } } + + private static void configureAddressPolicy(final ActiveMQServer server) { + AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#"); + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); + addressSettings.setMaxSizeBytes(1000); + server.getAddressSettingsRepository().addMatch("#", addressSettings); + } }