This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git commit 2909380370a86dc0deb66bedae0245cfcb8274fb Author: Alex Rudyy AuthorDate: Thu Jun 27 16:46:27 2019 +0100 QPID-8323: [Broker-J] Make sure that the same delivery tags can be used by different links on the same session --- .../v1_0/AbstractReceivingLinkEndpoint.java | 2 +- .../server/protocol/v1_0/SendingLinkEndpoint.java | 6 +- .../qpid/server/protocol/v1_0/Session_1_0.java | 44 ++++++-- .../v1_0/StandardReceivingLinkEndpoint.java | 5 + .../protocol/v1_0/delivery/DeliveryRegistry.java | 2 +- .../v1_0/delivery/DeliveryRegistryImpl.java | 12 +- .../protocol/v1_0/delivery/UnsettledDelivery.java | 24 ++++ .../v1_0/delivery/DeliveryRegistryImplTest.java | 123 +++++++++++++++++++++ .../v1_0/delivery/UnsettledDeliveryTest.java | 99 +++++++++++++++++ .../protocol/v1_0/messaging/TransferTest.java | 62 ++++++++++- 10 files changed, 353 insertions(+), 26 deletions(-) diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java index 3b922e4..f346dbe 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java @@ -341,7 +341,7 @@ public abstract class AbstractReceivingLinkEndpoint extend if (outcomeUpdate || settled) { - getSession().updateDisposition(getRole(), unsettledKeys, state, settled); + getSession().updateDisposition(this, deliveryTags, state, settled); } if (settled) diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java index 2fb7421..5081d14 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java @@ -24,7 +24,6 @@ package org.apache.qpid.server.protocol.v1_0; import java.security.AccessControlException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -50,10 +49,7 @@ import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.NotFoundException; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.protocol.LinkModel; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; -import org.apache.qpid.server.protocol.v1_0.type.BaseSource; -import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; import org.apache.qpid.server.protocol.v1_0.type.Outcome; @@ -648,7 +644,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint { if (settled && (_unsettled.remove(deliveryTag) != null)) { - getSession().updateDisposition(getRole(), deliveryTag, state, settled); + getSession().updateDisposition(this, deliveryTag, state, settled); } } diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index cd8eb83..931cf95 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -248,32 +248,54 @@ public class Session_1_0 extends AbstractAMQPSession linkEndpoint, final Binary deliveryTag, final DeliveryState state, final boolean settled) { - final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry; - UnsignedInteger deliveryId = deliveryRegistry.getDeliveryIdByTag(deliveryTag); + final UnsignedInteger deliveryId = getDeliveryId(deliveryTag, linkEndpoint); + updateDisposition(linkEndpoint.getRole(), deliveryId, deliveryId, state, settled); + } + + private UnsignedInteger getDeliveryId(final DeliveryRegistry deliveryRegistry, + final Binary deliveryTag, + final LinkEndpoint linkEndpoint) + { + final UnsignedInteger deliveryId = deliveryRegistry.getDeliveryId(deliveryTag, linkEndpoint); if (deliveryId == null) { throw new ConnectionScopedRuntimeException(String.format( "Delivery with tag '%s' is not found in unsettled deliveries", deliveryTag)); } - updateDisposition(role, deliveryId, deliveryId, state, settled); + return deliveryId; } - void updateDisposition(final Role role, + private SortedSet getDeliveryIds(final Set deliveryTags, final LinkEndpoint linkEndpoint) + { + final DeliveryRegistry deliveryRegistry = getDeliveryRegistry(linkEndpoint.getRole()); + return deliveryTags.stream() + .map(deliveryTag -> getDeliveryId(deliveryRegistry, deliveryTag, linkEndpoint)) + .collect(Collectors.toCollection(TreeSet::new)); + } + + private UnsignedInteger getDeliveryId(final Binary deliveryTag, final LinkEndpoint linkEndpoint) + { + final DeliveryRegistry deliveryRegistry = getDeliveryRegistry(linkEndpoint.getRole()); + return getDeliveryId(deliveryRegistry, deliveryTag, linkEndpoint); + } + + private DeliveryRegistry getDeliveryRegistry(final Role role) + { + return role == Role.RECEIVER ? getIncomingDeliveryRegistry() : getOutgoingDeliveryRegistry(); + } + + void updateDisposition(final LinkEndpoint linkEndpoint, final Set deliveryTags, final DeliveryState state, final boolean settled) { - final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry; - SortedSet deliveryIds = deliveryTags.stream() - .map(deliveryRegistry::getDeliveryIdByTag) - .collect(Collectors.toCollection(TreeSet::new)); - - final Iterator iterator = deliveryIds.iterator(); + final Role role = linkEndpoint.getRole(); + final Iterator iterator = getDeliveryIds(deliveryTags, linkEndpoint).iterator(); if (iterator.hasNext()) { UnsignedInteger begin = iterator.next(); diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java index 9be4dcc..bc7d4b0 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java @@ -32,7 +32,10 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; @@ -46,6 +49,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.PublishingLink; import org.apache.qpid.server.plugin.MessageFormat; import org.apache.qpid.server.protocol.MessageFormatRegistry; +import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorRuntimeException; import org.apache.qpid.server.protocol.v1_0.type.Binary; @@ -67,6 +71,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; import org.apache.qpid.server.protocol.v1_0.type.transport.Error; import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; +import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.AsyncCommand; import org.apache.qpid.server.txn.AutoCommitTransaction; diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java index a018824..3127653 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java @@ -30,6 +30,6 @@ public interface DeliveryRegistry void removeDelivery(UnsignedInteger deliveryId); UnsettledDelivery getDelivery(UnsignedInteger deliveryId); void removeDeliveriesForLinkEndpoint(LinkEndpoint linkEndpoint); - UnsignedInteger getDeliveryIdByTag(Binary deliveryTag); + UnsignedInteger getDeliveryId(Binary deliveryTag, LinkEndpoint linkEndpoint); int size(); } diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java index bf7fe56..3e264fc 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java @@ -31,13 +31,13 @@ import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; public class DeliveryRegistryImpl implements DeliveryRegistry { private final Map _deliveries = new ConcurrentHashMap<>(); - private final Map _deliveryIds = new ConcurrentHashMap<>(); + private final Map _deliveryIds = new ConcurrentHashMap<>(); @Override public void addDelivery(final UnsignedInteger deliveryId, final UnsettledDelivery unsettledDelivery) { _deliveries.put(deliveryId, unsettledDelivery); - _deliveryIds.put(unsettledDelivery.getDeliveryTag(), deliveryId); + _deliveryIds.put(unsettledDelivery, deliveryId); } @Override @@ -46,7 +46,7 @@ public class DeliveryRegistryImpl implements DeliveryRegistry UnsettledDelivery unsettledDelivery = _deliveries.remove(deliveryId); if (unsettledDelivery != null) { - _deliveryIds.remove(unsettledDelivery.getDeliveryTag()); + _deliveryIds.remove(unsettledDelivery); } } @@ -66,15 +66,15 @@ public class DeliveryRegistryImpl implements DeliveryRegistry if (unsettledDelivery.getLinkEndpoint() == linkEndpoint) { iterator.remove(); - _deliveryIds.remove(unsettledDelivery.getDeliveryTag()); + _deliveryIds.remove(unsettledDelivery); } } } @Override - public UnsignedInteger getDeliveryIdByTag(final Binary deliveryTag) + public UnsignedInteger getDeliveryId(final Binary deliveryTag, final LinkEndpoint linkEndpoint) { - return _deliveryIds.get(deliveryTag); + return _deliveryIds.get(new UnsettledDelivery(deliveryTag, linkEndpoint)); } @Override diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java index 48d2ab1..d581142 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol.v1_0.delivery; +import java.util.Objects; + import org.apache.qpid.server.protocol.v1_0.LinkEndpoint; import org.apache.qpid.server.protocol.v1_0.type.Binary; @@ -43,4 +45,26 @@ public class UnsettledDelivery { return _linkEndpoint; } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + final UnsettledDelivery that = (UnsettledDelivery) o; + return Objects.equals(_deliveryTag, that._deliveryTag) && + Objects.equals(_linkEndpoint, that._linkEndpoint); + } + + @Override + public int hashCode() + { + return Objects.hash(_deliveryTag, _linkEndpoint); + } } diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java new file mode 100644 index 0000000..54e7fb4 --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0.delivery; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.protocol.v1_0.LinkEndpoint; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.test.utils.UnitTestBase; + +public class DeliveryRegistryImplTest extends UnitTestBase +{ + private static final UnsignedInteger DELIVERY_ID = UnsignedInteger.ZERO; + private static final UnsignedInteger DELIVERY_ID_2 = UnsignedInteger.ONE; + private static final Binary DELIVERY_TAG = new Binary(new byte[]{(byte) 32, (byte) 33}); + private static final Binary DELIVERY_TAG_2 = new Binary(new byte[]{(byte) 32}); + + private DeliveryRegistryImpl _registry; + private UnsettledDelivery _unsettledDelivery; + + @Before + public void setUp() + { + _registry = new DeliveryRegistryImpl(); + _unsettledDelivery = new UnsettledDelivery(DELIVERY_TAG, mock(LinkEndpoint.class)); + } + + @Test + public void addDelivery() + { + assertThat(_registry.size(), is(equalTo(0))); + + _registry.addDelivery(DELIVERY_ID, _unsettledDelivery); + + assertThat(_registry.size(), is(equalTo(1))); + } + + @Test + public void removeDelivery() + { + _registry.addDelivery(DELIVERY_ID, _unsettledDelivery); + assertThat(_registry.size(), is(equalTo(1))); + _registry.removeDelivery(DELIVERY_ID); + assertThat(_registry.size(), is(equalTo(0))); + assertThat(_registry.getDelivery(UnsignedInteger.ZERO), is(nullValue())); + } + + @Test + public void getDelivery() + { + _registry.addDelivery(DELIVERY_ID, _unsettledDelivery); + + assertThat(_registry.size(), is(equalTo(1))); + final UnsettledDelivery expected = + new UnsettledDelivery(_unsettledDelivery.getDeliveryTag(), _unsettledDelivery.getLinkEndpoint()); + assertThat(_registry.getDelivery(UnsignedInteger.ZERO), is(equalTo(expected))); + } + + @Test + public void removeDeliveriesForLinkEndpoint() + { + _registry.addDelivery(DELIVERY_ID, _unsettledDelivery); + _registry.addDelivery(DELIVERY_ID_2, new UnsettledDelivery(DELIVERY_TAG_2, _unsettledDelivery.getLinkEndpoint())); + _registry.addDelivery(UnsignedInteger.valueOf(2), new UnsettledDelivery(DELIVERY_TAG, mock(LinkEndpoint.class))); + + assertThat(_registry.size(), is(equalTo(3))); + + _registry.removeDeliveriesForLinkEndpoint(_unsettledDelivery.getLinkEndpoint()); + + assertThat(_registry.size(), is(equalTo(1))); + } + + @Test + public void getDeliveryId() + { + _registry.addDelivery(DELIVERY_ID, _unsettledDelivery); + _registry.addDelivery(DELIVERY_ID_2, new UnsettledDelivery(DELIVERY_TAG, mock(LinkEndpoint.class))); + + final UnsignedInteger deliveryId = _registry.getDeliveryId(DELIVERY_TAG, _unsettledDelivery.getLinkEndpoint()); + + assertThat(deliveryId, is(equalTo(DELIVERY_ID))); + } + + @Test + public void size() + { + assertThat(_registry.size(), is(equalTo(0))); + + _registry.addDelivery(DELIVERY_ID, _unsettledDelivery); + + assertThat(_registry.size(), is(equalTo(1))); + + _registry.removeDelivery(DELIVERY_ID); + + assertThat(_registry.size(), is(equalTo(0))); + } +} diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java new file mode 100644 index 0000000..51195c6 --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0.delivery; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +import java.util.Objects; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.protocol.v1_0.LinkEndpoint; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.test.utils.UnitTestBase; + +public class UnsettledDeliveryTest extends UnitTestBase +{ + + private static final byte[] DATA = new byte[]{(byte) 32, (byte) 33, (byte) 34}; + private Binary _deliveryTag; + private LinkEndpoint _linkEndpoint; + private UnsettledDelivery _unsettledDelivery; + + @Before + public void setUp() + { + _deliveryTag = new Binary(DATA); + _linkEndpoint = mock(LinkEndpoint.class); + _unsettledDelivery = new UnsettledDelivery(_deliveryTag, _linkEndpoint); + } + + @Test + public void testGetDeliveryTag() + { + assertThat(_unsettledDelivery.getDeliveryTag(), is(equalTo(_deliveryTag))); + } + + @Test + public void testGetLinkEndpoint() + { + assertThat(_unsettledDelivery.getLinkEndpoint(), is(equalTo(_linkEndpoint))); + } + + @Test + public void testEqualsToNewUnsettledDeliveryWithTheSameFields() + { + assertThat(_unsettledDelivery.equals(new UnsettledDelivery(_deliveryTag, _linkEndpoint)), is(equalTo(true))); + } + + @Test + public void testEqualsToNewUnsettledDeliveryWithEqualsFields() + { + assertThat(_unsettledDelivery.equals(new UnsettledDelivery(new Binary(DATA), _linkEndpoint)), + is(equalTo(true))); + } + + @Test + public void testNotEqualsWhenDeliveryTagIsDifferent() + { + assertThat(_unsettledDelivery.equals(new UnsettledDelivery(new Binary(new byte[]{(byte) 32, (byte) 33}), + _linkEndpoint)), is(equalTo(false))); + } + + @Test + public void testNotEqualsWhenLinkEndpointIsDifferent() + { + final LinkEndpoint linkEndpoint = mock(LinkEndpoint.class); + assertThat(_unsettledDelivery.equals(new UnsettledDelivery(new Binary(new byte[]{(byte) 32, (byte) 33}), + linkEndpoint)), is(equalTo(false))); + } + + @Test + public void testHashCode() + { + int expected = Objects.hash(_deliveryTag, _linkEndpoint); + assertThat(_unsettledDelivery.hashCode(), is(equalTo(expected))); + } +} diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java index 7d1b1b6..da7f82a 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java @@ -21,6 +21,7 @@ package org.apache.qpid.tests.protocol.v1_0.messaging; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -31,8 +32,8 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.fail; -import static org.junit.Assume.assumeFalse; import static org.junit.Assume.assumeThat; import java.net.InetSocketAddress; @@ -79,7 +80,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode; import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; -import org.apache.qpid.server.util.SystemUtils; import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; @@ -196,6 +196,64 @@ public class TransferTest extends BrokerAdminUsingTestBase } @Test + @SpecificationTest(section = "2.6.12 Transferring A Message", + description = "The delivery-tag MUST be unique amongst all deliveries" + + " that could be considered unsettled by either end of the link.") + public void transferMessagesWithTheSameDeliveryTagOnSeparateLinksBelongingToTheSameSession() throws Exception + { + try (final FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final UnsignedInteger link1Handle = UnsignedInteger.ONE; + final UnsignedInteger link2Handle = UnsignedInteger.valueOf(2); + final Binary deliveryTag = new Binary("deliveryTag".getBytes(StandardCharsets.UTF_8)); + final Interaction interaction = transport.newInteraction(); + interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + + .attachName("test1") + .attachRole(Role.SENDER) + .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) + .attachSndSettleMode(SenderSettleMode.UNSETTLED) + .attachRcvSettleMode(ReceiverSettleMode.FIRST) + .attachHandle(link1Handle) + .attach().consumeResponse(Attach.class) + .consumeResponse(Flow.class) + + .attachName("test2") + .attachHandle(link2Handle) + .attach().consumeResponse(Attach.class) + .consumeResponse(Flow.class) + + .transferHandle(link1Handle) + .transferPayloadData("testData") + .transferDeliveryTag(deliveryTag) + .transferDeliveryId(UnsignedInteger.ZERO) + .transfer() + .transferHandle(link2Handle) + .transferDeliveryId(UnsignedInteger.ONE) + .transferPayloadData("testData2") + .transferDeliveryTag(deliveryTag) + .transfer(); + + final Disposition disposition1 = interaction.consumeResponse().getLatestResponse(Disposition.class); + final UnsignedInteger first = disposition1.getFirst(); + final UnsignedInteger last = disposition1.getLast(); + + assertThat(first, anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE))); + assertThat(last, anyOf(nullValue(), is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE))); + + if (last == null || first.equals(last)) + { + final Disposition disposition2 = interaction.consumeResponse().getLatestResponse(Disposition.class); + assertThat(disposition2.getFirst(), anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE))); + assertThat(disposition2.getLast(), anyOf(nullValue(), is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE))); + assertThat(disposition2.getFirst(), is(not(equalTo(first)))); + } + } + } + + @Test @SpecificationTest(section = "2.7.5", description = "If first, this indicates that the receiver MUST settle the delivery once it has arrived without waiting for the sender to settle first") public void transferReceiverSettleModeFirst() throws Exception --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org