qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lqu...@apache.org
Subject [1/3] qpid-broker-j git commit: QPID-7634: [Java Broker] Ensure flow is sent after receiving Flow.drain=true
Date Wed, 10 May 2017 16:10:56 GMT
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 66a15452a -> 074ccdd81


QPID-7634: [Java Broker] Ensure flow is sent after receiving Flow.drain=true


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/074ccdd8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/074ccdd8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/074ccdd8

Branch: refs/heads/master
Commit: 074ccdd81f1e103fb9a329ecd020540931e83002
Parents: 5017eb0
Author: Lorenz Quack <lquack@apache.org>
Authored: Wed May 10 16:41:23 2017 +0100
Committer: Lorenz Quack <lquack@apache.org>
Committed: Wed May 10 17:09:35 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/ConsumerTarget_1_0.java       |  5 +--
 .../protocol/v1_0/SendingLinkEndpoint.java      | 24 +++----------
 .../protocol/v1_0/transport/link/FlowTest.java  | 38 ++++++++++++++++++++
 3 files changed, 43 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/074ccdd8/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 41a3fbc..2f98340 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -309,10 +309,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
 
     public void queueEmpty()
     {
-        if(_linkEndpoint.drained())
-        {
-            updateNotifyWorkDesired();
-        }
+
     }
 
     public void flowStateChanged()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/074ccdd8/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index d89b637..0f6c7b4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -48,7 +48,6 @@ import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.NotFoundException;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -89,7 +88,6 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source,
Target>
     private Integer _priority;
     private final List<Binary> _resumeAcceptedTransfers = new ArrayList<>();
     private final List<MessageInstance> _resumeFullTransfers = new ArrayList<>();
-    private volatile boolean _draining = false;
     private final ConcurrentMap<Binary, UnsettledAction> _unsettledActionMap = new
ConcurrentHashMap<>();
     private SendingDestination _destination;
     private EnumSet<ConsumerOption> _consumerOptions;
@@ -413,23 +411,6 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source,
Target>
         return true;
     }
 
-
-    public boolean drained()
-    {
-        if (_draining)
-        {
-            setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
-            setLinkCredit(UnsignedInteger.ZERO);
-            sendFlow();
-            _draining = false;
-            return true;
-        }
-        else
-        {
-            return false;
-        }
-    }
-
     @Override
     public void receiveFlow(final Flow flow)
     {
@@ -468,8 +449,11 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source,
Target>
     {
         if(Boolean.TRUE.equals(getDrain()) && getLinkCredit().compareTo(UnsignedInteger.ZERO)
> 0)
         {
-            _draining = true;
             getConsumerTarget().flush();
+            setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
+            setLinkCredit(UnsignedInteger.ZERO);
+            sendFlow();
+            getConsumerTarget().updateNotifyWorkDesired();
         }
 
         while(!_resumeAcceptedTransfers.isEmpty() && hasCreditToSend())

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/074ccdd8/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index 564df3b..722432a 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -168,4 +168,42 @@ public class FlowTest extends ProtocolTestBase
             assertThat(data, is(equalTo("foo")));
         }
     }
+
+
+    @Test
+    @SpecificationTest(section = "2.6.7",
+            description = "If the sender's drain flag is set and there are no available messages,"
+                          + " the sender MUST advance its delivery-count until link-credit
is zero,"
+                          + " and send its updated flow state to the receiver.")
+    public void drainEmptyQueue() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr))
+        {
+            transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME);
+
+            // Flow{nextIncomingId=0,incomingWindow=2047,nextOutgoingId=1,outgoingWindow=2147483647,handle=1,deliveryCount=0,linkCredit=1,drain=true}
+
+            Flow flow = new Flow();
+            flow.setIncomingWindow(UnsignedInteger.valueOf(2047));
+            flow.setNextIncomingId(UnsignedInteger.ZERO);
+            flow.setOutgoingWindow(UnsignedInteger.valueOf(2147483647));
+            flow.setNextOutgoingId(UnsignedInteger.ONE);
+            flow.setDeliveryCount(UnsignedInteger.ZERO);
+            flow.setLinkCredit(UnsignedInteger.ONE);
+            flow.setDrain(Boolean.TRUE);
+            flow.setHandle(UnsignedInteger.ZERO);
+
+            transport.sendPerformative(flow);
+            PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getFrameBody(), is(instanceOf(Flow.class)));
+            Flow responseFlow = (Flow) response.getFrameBody();
+            assertThat(responseFlow.getHandle(), is(notNullValue()));
+            assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+            assertThat(responseFlow.getDrain(), is(equalTo(Boolean.TRUE)));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message