qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject [05/11] qpid-broker-j git commit: QPID-8062: [Broker-J][AMQP 1.0] Move responsibility to create outcome for received message from ReceivingDestination into a caller of ReceivingDestination#send()
Date Mon, 29 Jan 2018 14:27:13 GMT
QPID-8062: [Broker-J][AMQP 1.0] Move responsibility to create outcome for received message
from ReceivingDestination into a caller of ReceivingDestination#send()

(cherry picked from commit b5a0d32806ed5d07853adc287537d53cd3a7791f)


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2495d194
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2495d194
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2495d194

Branch: refs/heads/7.0.x
Commit: 2495d194989110d383df7d2a44a92e90cb8c48bf
Parents: b4e746a
Author: Alex Rudyy <orudyy@apache.org>
Authored: Thu Jan 18 12:03:42 2018 +0000
Committer: Alex Rudyy <orudyy@apache.org>
Committed: Mon Jan 29 13:57:40 2018 +0000

----------------------------------------------------------------------
 .../v1_0/AnonymousRelayDestination.java         | 62 +++-----------------
 .../protocol/v1_0/NodeReceivingDestination.java | 62 +++-----------------
 .../protocol/v1_0/ReceivingDestination.java     | 15 +----
 .../v1_0/StandardReceivingLinkEndpoint.java     | 40 ++++++++++---
 .../v1_0/UnroutableMessageException.java        | 39 ++++++++++++
 .../anonymousrelay/AnonymousRelayTest.java      | 40 +++++++++++--
 6 files changed, 126 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2495d194/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
index 806469b..00c0588 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.v1_0;/*
 import static org.apache.qpid.server.protocol.v1_0.Session_1_0.DELAYED_DELIVERY;
 
 import java.util.Arrays;
-import java.util.Collections;
 
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
@@ -30,17 +29,10 @@ import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.DestinationAddress;
 import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.security.SecurityToken;
-import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 
 public class AnonymousRelayDestination implements ReceivingDestination
@@ -68,18 +60,9 @@ public class AnonymousRelayDestination implements ReceivingDestination
     }
 
     @Override
-    public Outcome[] getOutcomes()
-    {
-        return new Outcome[0];
-    }
-
-    @Override
-    public Outcome send(final ServerMessage<?> message,
-                        final ServerTransaction txn,
-                        final SecurityToken securityToken,
-                        final boolean rejectedOutcomeSupportedBySource,
-                        final boolean deliverySettled,
-                        final Binary deliveryTag) throws AmqpErrorException
+    public void send(final ServerMessage<?> message,
+                     final ServerTransaction txn,
+                     final SecurityToken securityToken) throws UnroutableMessageException
     {
         final ReceivingDestination destination;
         final String routingAddress = message.getTo();
@@ -98,53 +81,24 @@ public class AnonymousRelayDestination implements ReceivingDestination
             destination = null;
         }
 
-        final Outcome outcome;
         if (destination == null)
         {
             if (_discardUnroutable)
             {
                 _eventLogger.message(ExchangeMessages.DISCARDMSG("", routingAddress));
-                outcome = new Accepted();
             }
             else
             {
-                final Error notFoundError = new Error(AmqpError.NOT_FOUND,
-                                                      String.format("Unknown destination
'%s'", routingAddress));
-                notFoundError.setInfo(Collections.singletonMap(DELIVERY_TAG, deliveryTag));
-
-                // If the source of the link does not support the rejected outcome,
-                // or the message has already been settled by the sender,
-                // then the routing node MUST detach the link with an error.
-                // AMQP-140: When pre-settled messages are being sent within a transaction,
-                // then the behaviour defined for transactions should take precedence
-                // (essentially marking the transaction as rollback only).
-                if (!rejectedOutcomeSupportedBySource || (deliverySettled && !(txn
instanceof LocalTransaction)))
-                {
-                    throw new AmqpErrorException(notFoundError);
-                }
-                else
-                {
-                    if (deliverySettled && txn instanceof LocalTransaction)
-                    {
-                        ((LocalTransaction) txn).setRollbackOnly();
-                    }
-
-                    Rejected rejected = new Rejected();
-                    rejected.setError(notFoundError);
-                    outcome = rejected;
-                }
+                throw new UnroutableMessageException(AmqpError.NOT_FOUND,
+                                                     String.format("Unknown destination '%s'",
routingAddress));
             }
         }
         else
         {
-            outcome = destination.send(message,
-                                       txn,
-                                       securityToken,
-                                       rejectedOutcomeSupportedBySource,
-                                       deliverySettled,
-                                       deliveryTag);
+            destination.send(message,
+                             txn,
+                             securityToken);
         }
-        return outcome;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2495d194/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index 6d5859f..ee3839c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -34,25 +34,15 @@ import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.DestinationAddress;
 import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.security.SecurityToken;
-import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 
 public class NodeReceivingDestination implements ReceivingDestination
 {
-    private static final Accepted ACCEPTED = new Accepted();
-    private static final Rejected REJECTED = new Rejected();
-    private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
     private final boolean _discardUnroutable;
     private final EventLogger _eventLogger;
 
@@ -90,18 +80,9 @@ public class NodeReceivingDestination implements ReceivingDestination
     }
 
     @Override
-    public Outcome[] getOutcomes()
-    {
-        return OUTCOMES;
-    }
-
-    @Override
-    public Outcome send(final ServerMessage<?> message,
-                        final ServerTransaction txn,
-                        final SecurityToken securityToken,
-                        final boolean rejectedOutcomeSupportedBySource,
-                        final boolean deliverySettled,
-                        final Binary deliveryTag) throws AmqpErrorException
+    public void send(final ServerMessage<?> message,
+                     final ServerTransaction txn,
+                     final SecurityToken securityToken) throws UnroutableMessageException
     {
         final String routingAddress = "".equals(_routingAddress) ? getRoutingAddress(message)
: _routingAddress;
         _destination.authorisePublish(securityToken, Collections.singletonMap("routingKey",
routingAddress));
@@ -136,10 +117,10 @@ public class NodeReceivingDestination implements ReceivingDestination
         {
             if (!_discardUnroutable)
             {
-                final Error error;
+                final String errorMessage;
+                final AmqpError errorCode;
                 if (result.isRejected())
                 {
-                    AmqpError errorCode;
                     if (result.containsReject(RejectType.LIMIT_EXCEEDED))
                     {
                         errorCode = AmqpError.RESOURCE_LIMIT_EXCEEDED;
@@ -152,37 +133,20 @@ public class NodeReceivingDestination implements ReceivingDestination
                     {
                         errorCode = AmqpError.ILLEGAL_STATE;
                     }
-                    error = new Error(errorCode, result.getRejectReason());
-                }
-                else
-                {
-                    error = new Error(AmqpError.NOT_FOUND,
-                                      String.format("Unknown destination '%s'", routingAddress));
-                }
-                error.setInfo(Collections.singletonMap(DELIVERY_TAG, deliveryTag));
-
-                if (!rejectedOutcomeSupportedBySource || (deliverySettled && !(txn
instanceof LocalTransaction)))
-                {
-                    throw new AmqpErrorException(error);
+                    errorMessage = result.getRejectReason();
                 }
                 else
                 {
-                    if (deliverySettled && txn instanceof LocalTransaction)
-                    {
-                        ((LocalTransaction) txn).setRollbackOnly();
-                    }
-
-                    Rejected rejected = new Rejected();
-                    rejected.setError(error);
-                    return rejected;
+                    errorCode = AmqpError.NOT_FOUND;
+                    errorMessage = String.format("Unknown destination '%s'", routingAddress);
                 }
+                throw new UnroutableMessageException(errorCode, errorMessage);
             }
             else
             {
                 _eventLogger.message(ExchangeMessages.DISCARDMSG(_destination.getName(),
routingAddress));
             }
         }
-        return ACCEPTED;
     }
 
     private String getRoutingAddress(final ServerMessage<?> message)
@@ -199,14 +163,6 @@ public class NodeReceivingDestination implements ReceivingDestination
         return initialRoutingAddress;
     }
 
-    private Outcome createdRejectedOutcome(AmqpError errorCode, String errorMessage)
-    {
-        Rejected rejected = new Rejected();
-        final Error notFoundError = new Error(errorCode, errorMessage);
-        rejected.setError(notFoundError);
-        return rejected;
-    }
-
     @Override
     public String getAddress()
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2495d194/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
index 5abd825..5c746be 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
@@ -22,29 +22,20 @@ package org.apache.qpid.server.protocol.v1_0;
 
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.txn.ServerTransaction;
 
 public interface ReceivingDestination
 {
-    Symbol DELIVERY_TAG = Symbol.valueOf("delivery-tag");
     Symbol REJECT_UNROUTABLE = Symbol.valueOf("REJECT_UNROUTABLE");
     Symbol DISCARD_UNROUTABLE = Symbol.valueOf("DISCARD_UNROUTABLE");
 
     Symbol[] getCapabilities();
 
-    Outcome[] getOutcomes();
-
-    Outcome send(final ServerMessage<?> message,
-                 final ServerTransaction txn,
-                 final SecurityToken securityToken,
-                 final boolean rejectedOutcomeSupportedBySource,
-                 final boolean deliverySettled,
-                 final Binary deliveryTag) throws AmqpErrorException;
+    void send(final ServerMessage<?> message,
+              final ServerTransaction txn,
+              final SecurityToken securityToken) throws UnroutableMessageException;
 
     int getCredit();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2495d194/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index a565f7f..aebff2e 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -54,6 +54,7 @@ import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
@@ -77,6 +78,8 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         implements AsyncAutoCommitTransaction.FutureRecorder
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class);
+    private static final Symbol DELIVERY_TAG = Symbol.valueOf("delivery-tag");
+    private static final Accepted ACCEPTED = new Accepted();
     private static final String LINK = "link";
 
     private volatile ReceivingDestination _receivingDestination;
@@ -267,16 +270,37 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                     {
                         try
                         {
-                            outcome = getReceivingDestination().send(serverMessage,
-                                                                     transaction,
-                                                                     session.getSecurityToken(),
-                                                                     _rejectedOutcomeSupportedBySource,
-                                                                     delivery.isSettled(),
-                                                                     delivery.getDeliveryTag());
+                            getReceivingDestination().send(serverMessage,
+                                                           transaction,
+                                                           session.getSecurityToken());
+                            outcome = ACCEPTED;
                         }
-                        catch (AmqpErrorException e)
+                        catch (UnroutableMessageException e)
                         {
-                            return e.getError();
+                            final Error error = new Error();
+                            error.setCondition(e.getErrorCondition());
+                            error.setDescription(e.getMessage());
+                            String targetAddress = getTarget().getAddress();
+                            if (targetAddress == null || "".equals(targetAddress.trim()))
+                            {
+                                error.setInfo(Collections.singletonMap(DELIVERY_TAG, delivery.getDeliveryTag()));
+                            }
+                            if (!_rejectedOutcomeSupportedBySource ||
+                                (delivery.isSettled() && !(transaction instanceof
LocalTransaction)))
+                            {
+                                return error;
+                            }
+                            else
+                            {
+                                if (delivery.isSettled() && transaction instanceof
LocalTransaction)
+                                {
+                                    ((LocalTransaction) transaction).setRollbackOnly();
+                                }
+
+                                Rejected rejected = new Rejected();
+                                rejected.setError(error);
+                                outcome = rejected;
+                            }
                         }
                     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2495d194/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/UnroutableMessageException.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/UnroutableMessageException.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/UnroutableMessageException.java
new file mode 100644
index 0000000..378b86c
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/UnroutableMessageException.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
+
+public class UnroutableMessageException extends Exception
+{
+    private final ErrorCondition _errorCondition;
+
+    public UnroutableMessageException(final ErrorCondition errorCondition, final String message)
+    {
+        super(message);
+        _errorCondition = errorCondition;
+    }
+
+    public ErrorCondition getErrorCondition()
+    {
+        return _errorCondition;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2495d194/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousrelay/AnonymousRelayTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousrelay/AnonymousRelayTest.java
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousrelay/AnonymousRelayTest.java
index c3c69d1..4fc18dd 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousrelay/AnonymousRelayTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousrelay/AnonymousRelayTest.java
@@ -70,8 +70,10 @@ import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 public class AnonymousRelayTest extends BrokerAdminUsingTestBase
 {
     private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+    private static final Symbol DELIVERY_TAG = Symbol.valueOf("delivery-tag");
     private static final String TEST_MESSAGE_CONTENT = "test";
     private InetSocketAddress _brokerAddress;
+    private Binary _deliveryTag;
 
     @Before
     public void setUp()
@@ -79,6 +81,7 @@ public class AnonymousRelayTest extends BrokerAdminUsingTestBase
         final BrokerAdmin brokerAdmin = getBrokerAdmin();
         brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
         _brokerAddress = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        _deliveryTag = new Binary("testTag".getBytes(StandardCharsets.UTF_8));
     }
 
     @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.
Sending A Message",
@@ -101,6 +104,7 @@ public class AnonymousRelayTest extends BrokerAdminUsingTestBase
 
                        .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME))
                        .transferSettled(Boolean.TRUE)
+                       .transferDeliveryTag(_deliveryTag)
                        .transfer()
                        .sync();
 
@@ -116,7 +120,9 @@ public class AnonymousRelayTest extends BrokerAdminUsingTestBase
                           + " if the address cannot be resolved to a node). In this case
the routing node"
                           + " MUST communicate the error back to the sender of the message."
                           + " [...] the message has already been settled by the sender,"
-                          + " then the routing node MUST detach the link with an error.")
+                          + " then the routing node MUST detach the link with an error."
+                          + " [...] the info field of error MUST contain an entry with symbolic
key delivery-tag"
+                          + " and binary value of the delivery-tag of the message which caused
the failure.")
     @Test
     public void transferPreSettledToUnknownDestination() throws Exception
     {
@@ -133,12 +139,15 @@ public class AnonymousRelayTest extends BrokerAdminUsingTestBase
 
                        .transferPayload(generateMessagePayloadToDestination("Unknown"))
                        .transferSettled(Boolean.TRUE)
+                       .transferDeliveryTag(_deliveryTag)
                        .transfer();
 
             Detach detach = interaction.consumeResponse().getLatestResponse(Detach.class);
             Error error = detach.getError();
             assertThat(error, is(notNullValue()));
             assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
+            assertThat(error.getInfo(), is(notNullValue()));
+            assertThat(error.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag)));
         }
     }
 
@@ -150,7 +159,9 @@ public class AnonymousRelayTest extends BrokerAdminUsingTestBase
                           + " MUST communicate the error back to the sender of the message."
                           + " If the source of the link supports the rejected outcome,"
                           + " and the message has not already been settled by the sender,
then the routing node"
-                          + " MUST reject the message.")
+                          + " MUST reject the message."
+                          + " [...] the info field of error MUST contain an entry with symbolic
key delivery-tag"
+                          + " and binary value of the delivery-tag of the message which caused
the failure.")
     @Test
     public void transferUnsettledToUnknownDestinationWhenRejectedOutcomeSupportedBySource()
throws Exception
     {
@@ -167,6 +178,7 @@ public class AnonymousRelayTest extends BrokerAdminUsingTestBase
                        .consumeResponse(Flow.class)
 
                        .transferPayload(generateMessagePayloadToDestination("Unknown"))
+                       .transferDeliveryTag(_deliveryTag)
                        .transfer()
                        .consumeResponse();
 
@@ -181,6 +193,8 @@ public class AnonymousRelayTest extends BrokerAdminUsingTestBase
             Error error = rejected.getError();
             assertThat(error, is(notNullValue()));
             assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
+            assertThat(error.getInfo(), is(notNullValue()));
+            assertThat(error.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag)));
         }
     }
 
@@ -192,7 +206,9 @@ public class AnonymousRelayTest extends BrokerAdminUsingTestBase
                           + " MUST communicate the error back to the sender of the message."
                           + " [...]"
                           + " If the source of the link does not support the rejected outcome,"
-                          + " [...] then the routing node MUST detach the link with an error.")
+                          + " [...] then the routing node MUST detach the link with an error."
+                          + " [...] the info field of error MUST contain an entry with symbolic
key delivery-tag"
+                          + " and binary value of the delivery-tag of the message which caused
the failure.")
     @Test
     public void transferUnsettledToUnknownDestinationWhenRejectedOutcomeNotSupportedBySource()
throws Exception
     {
@@ -209,12 +225,15 @@ public class AnonymousRelayTest extends BrokerAdminUsingTestBase
                        .consumeResponse(Flow.class)
 
                        .transferPayload(generateMessagePayloadToDestination("Unknown"))
+                       .transferDeliveryTag(_deliveryTag)
                        .transfer();
 
             Detach detach = interaction.consumeResponse().getLatestResponse(Detach.class);
             Error error = detach.getError();
             assertThat(error, is(notNullValue()));
             assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
+            assertThat(error.getInfo(), is(notNullValue()));
+            assertThat(error.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag)));
         }
     }
 
@@ -243,6 +262,7 @@ public class AnonymousRelayTest extends BrokerAdminUsingTestBase
 
                        .transferHandle(linkHandle)
                        .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME))
+                       .transferDeliveryTag(_deliveryTag)
                        .transferTransactionalState(txnState.getCurrentTransactionId())
                        .transferSettled(Boolean.TRUE)
                        .transfer()
@@ -263,9 +283,12 @@ public class AnonymousRelayTest extends BrokerAdminUsingTestBase
                           + " [...]"
                           + " <Not in spec yet>"
                           + " AMQP-140"
+                          + " If a message cannot be routed to the destination implied in
the \"to:\" field,"
+                          + " and the source does not allow for the rejected outcome"
+                          + " [...] when messages are being sent within a transaction and
have been sent pre-settled."
                           + " In this case the behaviour defined for transactions (of essentially
marking"
                           + " the transaction as rollback only) should take precedence. "
-                          + ""
+                            + ""
                           + " AMQP spec 4.3 Discharging a Transaction"
                           + " If the coordinator is unable to complete the discharge, the
coordinator MUST convey"
                           + " the error to the controller as a transaction-error. If the
source for the link to"
@@ -285,7 +308,7 @@ public class AnonymousRelayTest extends BrokerAdminUsingTestBase
             interaction.begin()
                        .consumeResponse(Begin.class)
 
-                       // attaching coordinator link with supported outcomes Accept and Reject
+                       // attaching coordinator link with supported outcomes Accepted and
Rejected
                        .txnAttachCoordinatorLink(txnState)
                        .txnDeclare(txnState)
 
@@ -297,6 +320,7 @@ public class AnonymousRelayTest extends BrokerAdminUsingTestBase
 
                        .transferHandle(linkHandle)
                        .transferPayload(generateMessagePayloadToDestination("Unknown"))
+                       .transferDeliveryTag(_deliveryTag)
                        .transferTransactionalState(txnState.getCurrentTransactionId())
                        .transferSettled(Boolean.TRUE)
                        .transferDeliveryId(UnsignedInteger.valueOf(1))
@@ -351,6 +375,11 @@ public class AnonymousRelayTest extends BrokerAdminUsingTestBase
                           + " [...]"
                           + " <Not in spec yet>"
                           + " AMQP-140"
+                          + " If a message cannot be routed to the destination implied in
the \"to:\" field,"
+                          + " and the source does not allow for the rejected outcome"
+                          + " [...] when messages are being sent within a transaction and
have been sent pre-settled."
+                          + " In this case the behaviour defined for transactions (of essentially
marking"
+                          + " the transaction as rollback only) should take precedence. "
                           + ""
                           + " AMQP spec 4.3 Discharging a Transaction"
                           + " If the coordinator is unable to complete the discharge, the
coordinator MUST convey"
@@ -394,6 +423,7 @@ public class AnonymousRelayTest extends BrokerAdminUsingTestBase
 
                        .transferHandle(linkHandle)
                        .transferPayload(generateMessagePayloadToDestination("Unknown"))
+                       .transferDeliveryTag(_deliveryTag)
                        .transferTransactionalState(txnState.getCurrentTransactionId())
                        .transferSettled(Boolean.TRUE)
                        .transfer();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message