qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject [1/2] qpid-broker-j git commit: QPID-XXXX: [AMQP 1.0] Fix evaluation of sending link credit
Date Sun, 05 Aug 2018 21:29:38 GMT
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master ac3b1d0e9 -> c03452289


QPID-XXXX: [AMQP 1.0] Fix evaluation of sending link credit


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/42bbdb65
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/42bbdb65
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/42bbdb65

Branch: refs/heads/master
Commit: 42bbdb6598f402fb7f2cb7ab194db27de9653e2c
Parents: ac3b1d0
Author: Alex Rudyy <orudyy@apache.org>
Authored: Sat Aug 4 22:35:26 2018 +0100
Committer: Alex Rudyy <orudyy@apache.org>
Committed: Sat Aug 4 22:35:26 2018 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/SendingLinkEndpoint.java      |   7 +-
 .../server/protocol/v1_0/SequenceNumber.java    |   5 +
 .../protocol/v1_0/SendingLinkEndpointTest.java  | 110 +++++++++++++++++++
 3 files changed, 119 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/42bbdb65/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index a69b96a..17bcfa1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -488,14 +488,15 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source,
Target>
         else
         {
             // 2.6.7 Flow Control : link_credit_snd := delivery_count_rcv + link_credit_rcv
- delivery_count_snd
-            UnsignedInteger limit = receiverDeliveryCount.add(receiverLinkCredit);
-            if(limit.compareTo(getDeliveryCount().unsignedIntegerValue())<=0)
+            SequenceNumber limit =
+                    new SequenceNumber(receiverDeliveryCount.intValue()).add(receiverLinkCredit.intValue());
+            if (limit.compareTo(getDeliveryCount()) <= 0)
             {
                 setLinkCredit(UnsignedInteger.valueOf(0));
             }
             else
             {
-                setLinkCredit(limit.subtract(getDeliveryCount().unsignedIntegerValue()));
+                setLinkCredit(limit.subtract(getDeliveryCount().intValue()).unsignedIntegerValue());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/42bbdb65/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
index 5195b00..e3bef20 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
@@ -49,6 +49,11 @@ public class SequenceNumber implements Comparable<SequenceNumber>
         _seqNo+=i;
         return this;
     }
+    public SequenceNumber subtract(final int value)
+    {
+        _seqNo-=value;
+        return this;
+    }
 
     @Override
     public boolean equals(Object o)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/42bbdb65/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpointTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpointTest.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpointTest.java
new file mode 100644
index 0000000..3ce0f9f
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpointTest.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+
+public class SendingLinkEndpointTest
+{
+    private static final String ADDRESS = "test";
+
+    private SendingLinkEndpoint _sendingLinkEndpoint;
+
+    @Before
+    public void setUp() throws Exception
+    {
+        NamedAddressSpace addressSpace = mock(NamedAddressSpace.class);
+
+        final LinkImpl<Source, Target> link = mock(LinkImpl.class);
+        when(link.getSource()).thenReturn(new Source());
+        Target target = new Target();
+        target.setAddress(ADDRESS);
+        when(link.getTarget()).thenReturn(target);
+
+        final AMQPConnection_1_0 connection = mock(AMQPConnection_1_0.class);
+        when(connection.getAddressSpace()).thenReturn(addressSpace);
+        when(connection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD)).thenReturn(1L);
+        final Session_1_0 session = mock(Session_1_0.class);
+        when(session.getConnection()).thenReturn(connection);
+        when(session.getAMQPConnection()).thenReturn(connection);
+        when(session.getOutgoingDeliveryRegistry()).thenReturn(mock(DeliveryRegistry.class));
+        final SendingDestination destination = new StandardSendingDestination(mock(MessageSource.class));
+        when(session.getSendingDestination(any(Link_1_0.class), any(Source.class))).thenReturn(destination);
+        _sendingLinkEndpoint = new SendingLinkEndpoint(session, link);
+    }
+
+    @Test
+    public void receiveFlow() throws Exception
+    {
+        receiveAttach(_sendingLinkEndpoint);
+
+        _sendingLinkEndpoint.setDeliveryCount(new SequenceNumber(-1));
+
+        Flow flow = new Flow();
+        flow.setDeliveryCount(new SequenceNumber(-1).unsignedIntegerValue());
+        flow.setLinkCredit(UnsignedInteger.ONE);
+
+        _sendingLinkEndpoint.receiveFlow(flow);
+
+        UnsignedInteger linkCredit = _sendingLinkEndpoint.getLinkCredit();
+        assertThat(linkCredit, is(equalTo(UnsignedInteger.ONE)));
+    }
+
+    private void receiveAttach(final SendingLinkEndpoint sendingLinkEndpoint) throws Exception
+    {
+        Attach attach = new Attach();
+        Source source = new Source();
+        source.setDurable(TerminusDurability.NONE);
+        source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+
+        attach.setSource(source);
+        Target target = new Target();
+        attach.setTarget(target);
+        attach.setHandle(new UnsignedInteger(0));
+        attach.setIncompleteUnsettled(false);
+        attach.setName("test");
+        attach.setRole(Role.RECEIVER);
+        source.setAddress(ADDRESS);
+
+        sendingLinkEndpoint.receiveAttach(attach);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message