qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject [qpid-broker-j] 01/02: QPID-8323: [Broker-J] Make sure that the same delivery tags can be used by different links on the same session
Date Thu, 27 Jun 2019 16:22:47 GMT
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 2909380370a86dc0deb66bedae0245cfcb8274fb
Author: Alex Rudyy <orudyy@apache.org>
AuthorDate: Thu Jun 27 16:46:27 2019 +0100

    QPID-8323: [Broker-J] Make sure that the same delivery tags can be used by different links
on the same session
---
 .../v1_0/AbstractReceivingLinkEndpoint.java        |   2 +-
 .../server/protocol/v1_0/SendingLinkEndpoint.java  |   6 +-
 .../qpid/server/protocol/v1_0/Session_1_0.java     |  44 ++++++--
 .../v1_0/StandardReceivingLinkEndpoint.java        |   5 +
 .../protocol/v1_0/delivery/DeliveryRegistry.java   |   2 +-
 .../v1_0/delivery/DeliveryRegistryImpl.java        |  12 +-
 .../protocol/v1_0/delivery/UnsettledDelivery.java  |  24 ++++
 .../v1_0/delivery/DeliveryRegistryImplTest.java    | 123 +++++++++++++++++++++
 .../v1_0/delivery/UnsettledDeliveryTest.java       |  99 +++++++++++++++++
 .../protocol/v1_0/messaging/TransferTest.java      |  62 ++++++++++-
 10 files changed, 353 insertions(+), 26 deletions(-)

diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index 3b922e4..f346dbe 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -341,7 +341,7 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget>
extend
 
             if (outcomeUpdate || settled)
             {
-                getSession().updateDisposition(getRole(), unsettledKeys, state, settled);
+                getSession().updateDisposition(this, deliveryTags, state, settled);
             }
 
             if (settled)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 2fb7421..5081d14 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -24,7 +24,6 @@ package org.apache.qpid.server.protocol.v1_0;
 import java.security.AccessControlException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -50,10 +49,7 @@ import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.NotFoundException;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.LinkModel;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
-import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -648,7 +644,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source,
Target>
     {
         if (settled && (_unsettled.remove(deliveryTag) != null))
         {
-            getSession().updateDisposition(getRole(), deliveryTag, state, settled);
+            getSession().updateDisposition(this, deliveryTag, state, settled);
         }
     }
 
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index cd8eb83..931cf95 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -248,32 +248,54 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0,
ConsumerTarget
         send(disposition);
     }
 
-    void updateDisposition(final Role role,
+    void updateDisposition(final LinkEndpoint<?,?> linkEndpoint,
                            final Binary deliveryTag,
                            final DeliveryState state,
                            final boolean settled)
     {
-        final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry
: _outgoingDeliveryRegistry;
-        UnsignedInteger deliveryId = deliveryRegistry.getDeliveryIdByTag(deliveryTag);
+        final UnsignedInteger deliveryId = getDeliveryId(deliveryTag, linkEndpoint);
+        updateDisposition(linkEndpoint.getRole(), deliveryId, deliveryId, state, settled);
+    }
+
+    private UnsignedInteger getDeliveryId(final DeliveryRegistry deliveryRegistry,
+                                          final Binary deliveryTag,
+                                          final LinkEndpoint<?, ?> linkEndpoint)
+    {
+        final UnsignedInteger deliveryId = deliveryRegistry.getDeliveryId(deliveryTag, linkEndpoint);
         if (deliveryId == null)
         {
             throw new ConnectionScopedRuntimeException(String.format(
                     "Delivery with tag '%s' is not found in unsettled deliveries", deliveryTag));
         }
-        updateDisposition(role, deliveryId, deliveryId, state, settled);
+        return deliveryId;
     }
 
-    void updateDisposition(final Role role,
+    private SortedSet<UnsignedInteger> getDeliveryIds(final Set<Binary> deliveryTags,
final LinkEndpoint<?, ?> linkEndpoint)
+    {
+        final DeliveryRegistry deliveryRegistry = getDeliveryRegistry(linkEndpoint.getRole());
+        return deliveryTags.stream()
+                           .map(deliveryTag -> getDeliveryId(deliveryRegistry, deliveryTag,
linkEndpoint))
+                           .collect(Collectors.toCollection(TreeSet::new));
+    }
+
+    private UnsignedInteger getDeliveryId(final Binary deliveryTag, final LinkEndpoint<?,
?> linkEndpoint)
+    {
+        final DeliveryRegistry deliveryRegistry = getDeliveryRegistry(linkEndpoint.getRole());
+        return getDeliveryId(deliveryRegistry, deliveryTag, linkEndpoint);
+    }
+
+    private DeliveryRegistry getDeliveryRegistry(final Role role)
+    {
+        return role == Role.RECEIVER ? getIncomingDeliveryRegistry() : getOutgoingDeliveryRegistry();
+    }
+
+    void updateDisposition(final LinkEndpoint<?,?> linkEndpoint,
                            final Set<Binary> deliveryTags,
                            final DeliveryState state,
                            final boolean settled)
     {
-        final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry
: _outgoingDeliveryRegistry;
-        SortedSet<UnsignedInteger> deliveryIds = deliveryTags.stream()
-                                                             .map(deliveryRegistry::getDeliveryIdByTag)
-                                                             .collect(Collectors.toCollection(TreeSet::new));
-
-        final Iterator<UnsignedInteger> iterator = deliveryIds.iterator();
+        final Role role = linkEndpoint.getRole();
+        final Iterator<UnsignedInteger> iterator = getDeliveryIds(deliveryTags, linkEndpoint).iterator();
         if (iterator.hasNext())
         {
             UnsignedInteger begin = iterator.next();
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 9be4dcc..bc7d4b0 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -32,7 +32,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
@@ -46,6 +49,7 @@ import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.protocol.MessageFormatRegistry;
+import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorRuntimeException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -67,6 +71,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.AsyncCommand;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
index a018824..3127653 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
@@ -30,6 +30,6 @@ public interface DeliveryRegistry
     void removeDelivery(UnsignedInteger deliveryId);
     UnsettledDelivery getDelivery(UnsignedInteger deliveryId);
     void removeDeliveriesForLinkEndpoint(LinkEndpoint<?, ?> linkEndpoint);
-    UnsignedInteger getDeliveryIdByTag(Binary deliveryTag);
+    UnsignedInteger getDeliveryId(Binary deliveryTag, LinkEndpoint<?, ?> linkEndpoint);
     int size();
 }
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
index bf7fe56..3e264fc 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
@@ -31,13 +31,13 @@ import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 public class DeliveryRegistryImpl implements DeliveryRegistry
 {
     private final Map<UnsignedInteger, UnsettledDelivery> _deliveries = new ConcurrentHashMap<>();
-    private final Map<Binary, UnsignedInteger> _deliveryIds = new ConcurrentHashMap<>();
+    private final Map<UnsettledDelivery, UnsignedInteger> _deliveryIds = new ConcurrentHashMap<>();
 
     @Override
     public void addDelivery(final UnsignedInteger deliveryId, final UnsettledDelivery unsettledDelivery)
     {
         _deliveries.put(deliveryId, unsettledDelivery);
-        _deliveryIds.put(unsettledDelivery.getDeliveryTag(), deliveryId);
+        _deliveryIds.put(unsettledDelivery, deliveryId);
     }
 
     @Override
@@ -46,7 +46,7 @@ public class DeliveryRegistryImpl implements DeliveryRegistry
         UnsettledDelivery unsettledDelivery = _deliveries.remove(deliveryId);
         if (unsettledDelivery != null)
         {
-            _deliveryIds.remove(unsettledDelivery.getDeliveryTag());
+            _deliveryIds.remove(unsettledDelivery);
         }
     }
 
@@ -66,15 +66,15 @@ public class DeliveryRegistryImpl implements DeliveryRegistry
             if (unsettledDelivery.getLinkEndpoint() == linkEndpoint)
             {
                 iterator.remove();
-                _deliveryIds.remove(unsettledDelivery.getDeliveryTag());
+                _deliveryIds.remove(unsettledDelivery);
             }
         }
     }
 
     @Override
-    public UnsignedInteger getDeliveryIdByTag(final Binary deliveryTag)
+    public UnsignedInteger getDeliveryId(final Binary deliveryTag, final LinkEndpoint<?,
?> linkEndpoint)
     {
-        return _deliveryIds.get(deliveryTag);
+        return _deliveryIds.get(new UnsettledDelivery(deliveryTag, linkEndpoint));
     }
 
     @Override
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
index 48d2ab1..d581142 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.protocol.v1_0.delivery;
 
+import java.util.Objects;
+
 import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 
@@ -43,4 +45,26 @@ public class UnsettledDelivery
     {
         return _linkEndpoint;
     }
+
+    @Override
+    public boolean equals(final Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+        final UnsettledDelivery that = (UnsettledDelivery) o;
+        return Objects.equals(_deliveryTag, that._deliveryTag) &&
+               Objects.equals(_linkEndpoint, that._linkEndpoint);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(_deliveryTag, _linkEndpoint);
+    }
 }
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java
new file mode 100644
index 0000000..54e7fb4
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class DeliveryRegistryImplTest extends UnitTestBase
+{
+    private static final UnsignedInteger DELIVERY_ID = UnsignedInteger.ZERO;
+    private static final UnsignedInteger DELIVERY_ID_2 = UnsignedInteger.ONE;
+    private static final Binary DELIVERY_TAG = new Binary(new byte[]{(byte) 32, (byte) 33});
+    private static final Binary DELIVERY_TAG_2 = new Binary(new byte[]{(byte) 32});
+
+    private DeliveryRegistryImpl _registry;
+    private UnsettledDelivery _unsettledDelivery;
+
+    @Before
+    public void setUp()
+    {
+        _registry = new DeliveryRegistryImpl();
+        _unsettledDelivery = new UnsettledDelivery(DELIVERY_TAG, mock(LinkEndpoint.class));
+    }
+
+    @Test
+    public void addDelivery()
+    {
+        assertThat(_registry.size(), is(equalTo(0)));
+
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+
+        assertThat(_registry.size(), is(equalTo(1)));
+    }
+
+    @Test
+    public void removeDelivery()
+    {
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+        assertThat(_registry.size(), is(equalTo(1)));
+        _registry.removeDelivery(DELIVERY_ID);
+        assertThat(_registry.size(), is(equalTo(0)));
+        assertThat(_registry.getDelivery(UnsignedInteger.ZERO), is(nullValue()));
+    }
+
+    @Test
+    public void getDelivery()
+    {
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+
+        assertThat(_registry.size(), is(equalTo(1)));
+        final UnsettledDelivery expected =
+                new UnsettledDelivery(_unsettledDelivery.getDeliveryTag(), _unsettledDelivery.getLinkEndpoint());
+        assertThat(_registry.getDelivery(UnsignedInteger.ZERO), is(equalTo(expected)));
+    }
+
+    @Test
+    public void removeDeliveriesForLinkEndpoint()
+    {
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+        _registry.addDelivery(DELIVERY_ID_2, new UnsettledDelivery(DELIVERY_TAG_2, _unsettledDelivery.getLinkEndpoint()));
+        _registry.addDelivery(UnsignedInteger.valueOf(2), new UnsettledDelivery(DELIVERY_TAG,
mock(LinkEndpoint.class)));
+
+        assertThat(_registry.size(), is(equalTo(3)));
+
+        _registry.removeDeliveriesForLinkEndpoint(_unsettledDelivery.getLinkEndpoint());
+
+        assertThat(_registry.size(), is(equalTo(1)));
+    }
+
+    @Test
+    public void getDeliveryId()
+    {
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+        _registry.addDelivery(DELIVERY_ID_2, new UnsettledDelivery(DELIVERY_TAG, mock(LinkEndpoint.class)));
+
+        final UnsignedInteger deliveryId = _registry.getDeliveryId(DELIVERY_TAG, _unsettledDelivery.getLinkEndpoint());
+
+        assertThat(deliveryId, is(equalTo(DELIVERY_ID)));
+    }
+
+    @Test
+    public void size()
+    {
+        assertThat(_registry.size(), is(equalTo(0)));
+
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+
+        assertThat(_registry.size(), is(equalTo(1)));
+
+        _registry.removeDelivery(DELIVERY_ID);
+
+        assertThat(_registry.size(), is(equalTo(0)));
+    }
+}
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java
new file mode 100644
index 0000000..51195c6
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import java.util.Objects;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class UnsettledDeliveryTest extends UnitTestBase
+{
+
+    private static final byte[] DATA = new byte[]{(byte) 32, (byte) 33, (byte) 34};
+    private Binary _deliveryTag;
+    private LinkEndpoint<?, ?> _linkEndpoint;
+    private UnsettledDelivery _unsettledDelivery;
+
+    @Before
+    public void setUp()
+    {
+        _deliveryTag = new Binary(DATA);
+        _linkEndpoint = mock(LinkEndpoint.class);
+        _unsettledDelivery = new UnsettledDelivery(_deliveryTag, _linkEndpoint);
+    }
+
+    @Test
+    public void testGetDeliveryTag()
+    {
+        assertThat(_unsettledDelivery.getDeliveryTag(), is(equalTo(_deliveryTag)));
+    }
+
+    @Test
+    public void testGetLinkEndpoint()
+    {
+        assertThat(_unsettledDelivery.getLinkEndpoint(), is(equalTo(_linkEndpoint)));
+    }
+
+    @Test
+    public void testEqualsToNewUnsettledDeliveryWithTheSameFields()
+    {
+        assertThat(_unsettledDelivery.equals(new UnsettledDelivery(_deliveryTag, _linkEndpoint)),
is(equalTo(true)));
+    }
+
+    @Test
+    public void testEqualsToNewUnsettledDeliveryWithEqualsFields()
+    {
+        assertThat(_unsettledDelivery.equals(new UnsettledDelivery(new Binary(DATA), _linkEndpoint)),
+                   is(equalTo(true)));
+    }
+
+    @Test
+    public void testNotEqualsWhenDeliveryTagIsDifferent()
+    {
+        assertThat(_unsettledDelivery.equals(new UnsettledDelivery(new Binary(new byte[]{(byte)
32, (byte) 33}),
+                                                                   _linkEndpoint)), is(equalTo(false)));
+    }
+
+    @Test
+    public void testNotEqualsWhenLinkEndpointIsDifferent()
+    {
+        final LinkEndpoint<?, ?> linkEndpoint = mock(LinkEndpoint.class);
+        assertThat(_unsettledDelivery.equals(new UnsettledDelivery(new Binary(new byte[]{(byte)
32, (byte) 33}),
+                                                                   linkEndpoint)), is(equalTo(false)));
+    }
+
+    @Test
+    public void testHashCode()
+    {
+        int expected = Objects.hash(_deliveryTag, _linkEndpoint);
+        assertThat(_unsettledDelivery.hashCode(), is(equalTo(expected)));
+    }
+}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 7d1b1b6..da7f82a 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.tests.protocol.v1_0.messaging;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -31,8 +32,8 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isOneOf;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeFalse;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
@@ -79,7 +80,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.server.util.SystemUtils;
 import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
@@ -196,6 +196,64 @@ public class TransferTest extends BrokerAdminUsingTestBase
     }
 
     @Test
+    @SpecificationTest(section = "2.6.12 Transferring A Message",
+            description = "The delivery-tag MUST be unique amongst all deliveries"
+                          + " that could be considered unsettled by either end of the link.")
+    public void transferMessagesWithTheSameDeliveryTagOnSeparateLinksBelongingToTheSameSession()
throws Exception
+    {
+        try (final FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final UnsignedInteger link1Handle = UnsignedInteger.ONE;
+            final UnsignedInteger link2Handle = UnsignedInteger.valueOf(2);
+            final Binary deliveryTag = new Binary("deliveryTag".getBytes(StandardCharsets.UTF_8));
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                                     .open().consumeResponse(Open.class)
+                                     .begin().consumeResponse(Begin.class)
+
+                                     .attachName("test1")
+                                     .attachRole(Role.SENDER)
+                                     .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                     .attachSndSettleMode(SenderSettleMode.UNSETTLED)
+                                     .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                     .attachHandle(link1Handle)
+                                     .attach().consumeResponse(Attach.class)
+                                     .consumeResponse(Flow.class)
+
+                                     .attachName("test2")
+                                     .attachHandle(link2Handle)
+                                     .attach().consumeResponse(Attach.class)
+                                     .consumeResponse(Flow.class)
+
+                                     .transferHandle(link1Handle)
+                                     .transferPayloadData("testData")
+                                     .transferDeliveryTag(deliveryTag)
+                                     .transferDeliveryId(UnsignedInteger.ZERO)
+                                     .transfer()
+                                     .transferHandle(link2Handle)
+                                     .transferDeliveryId(UnsignedInteger.ONE)
+                                     .transferPayloadData("testData2")
+                                     .transferDeliveryTag(deliveryTag)
+                                     .transfer();
+
+            final Disposition disposition1 = interaction.consumeResponse().getLatestResponse(Disposition.class);
+            final UnsignedInteger first = disposition1.getFirst();
+            final UnsignedInteger last = disposition1.getLast();
+
+            assertThat(first, anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
+            assertThat(last, anyOf(nullValue(), is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
+
+            if (last == null || first.equals(last))
+            {
+                final Disposition disposition2 = interaction.consumeResponse().getLatestResponse(Disposition.class);
+                assertThat(disposition2.getFirst(), anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
+                assertThat(disposition2.getLast(), anyOf(nullValue(), is(UnsignedInteger.ZERO),
is(UnsignedInteger.ONE)));
+                assertThat(disposition2.getFirst(), is(not(equalTo(first))));
+            }
+        }
+    }
+
+    @Test
     @SpecificationTest(section = "2.7.5",
             description = "If first, this indicates that the receiver MUST settle the delivery
once it has arrived without waiting for the sender to settle first")
     public void transferReceiverSettleModeFirst() throws Exception


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message