Repository: qpid-broker-j
Updated Branches:
refs/heads/master 66a15452a -> 074ccdd81
QPID-7634: [Java Broker] Ensure flow is sent after receiving Flow.drain=true
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/074ccdd8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/074ccdd8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/074ccdd8
Branch: refs/heads/master
Commit: 074ccdd81f1e103fb9a329ecd020540931e83002
Parents: 5017eb0
Author: Lorenz Quack <lquack@apache.org>
Authored: Wed May 10 16:41:23 2017 +0100
Committer: Lorenz Quack <lquack@apache.org>
Committed: Wed May 10 17:09:35 2017 +0100
----------------------------------------------------------------------
.../protocol/v1_0/ConsumerTarget_1_0.java | 5 +--
.../protocol/v1_0/SendingLinkEndpoint.java | 24 +++----------
.../protocol/v1_0/transport/link/FlowTest.java | 38 ++++++++++++++++++++
3 files changed, 43 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/074ccdd8/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 41a3fbc..2f98340 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -309,10 +309,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
public void queueEmpty()
{
- if(_linkEndpoint.drained())
- {
- updateNotifyWorkDesired();
- }
+
}
public void flowStateChanged()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/074ccdd8/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index d89b637..0f6c7b4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -48,7 +48,6 @@ import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -89,7 +88,6 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source,
Target>
private Integer _priority;
private final List<Binary> _resumeAcceptedTransfers = new ArrayList<>();
private final List<MessageInstance> _resumeFullTransfers = new ArrayList<>();
- private volatile boolean _draining = false;
private final ConcurrentMap<Binary, UnsettledAction> _unsettledActionMap = new
ConcurrentHashMap<>();
private SendingDestination _destination;
private EnumSet<ConsumerOption> _consumerOptions;
@@ -413,23 +411,6 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source,
Target>
return true;
}
-
- public boolean drained()
- {
- if (_draining)
- {
- setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
- setLinkCredit(UnsignedInteger.ZERO);
- sendFlow();
- _draining = false;
- return true;
- }
- else
- {
- return false;
- }
- }
-
@Override
public void receiveFlow(final Flow flow)
{
@@ -468,8 +449,11 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source,
Target>
{
if(Boolean.TRUE.equals(getDrain()) && getLinkCredit().compareTo(UnsignedInteger.ZERO)
> 0)
{
- _draining = true;
getConsumerTarget().flush();
+ setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
+ setLinkCredit(UnsignedInteger.ZERO);
+ sendFlow();
+ getConsumerTarget().updateNotifyWorkDesired();
}
while(!_resumeAcceptedTransfers.isEmpty() && hasCreditToSend())
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/074ccdd8/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index 564df3b..722432a 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -168,4 +168,42 @@ public class FlowTest extends ProtocolTestBase
assertThat(data, is(equalTo("foo")));
}
}
+
+
+ @Test
+ @SpecificationTest(section = "2.6.7",
+ description = "If the sender's drain flag is set and there are no available messages,"
+ + " the sender MUST advance its delivery-count until link-credit
is zero,"
+ + " and send its updated flow state to the receiver.")
+ public void drainEmptyQueue() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr))
+ {
+ transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME);
+
+ // Flow{nextIncomingId=0,incomingWindow=2047,nextOutgoingId=1,outgoingWindow=2147483647,handle=1,deliveryCount=0,linkCredit=1,drain=true}
+
+ Flow flow = new Flow();
+ flow.setIncomingWindow(UnsignedInteger.valueOf(2047));
+ flow.setNextIncomingId(UnsignedInteger.ZERO);
+ flow.setOutgoingWindow(UnsignedInteger.valueOf(2147483647));
+ flow.setNextOutgoingId(UnsignedInteger.ONE);
+ flow.setDeliveryCount(UnsignedInteger.ZERO);
+ flow.setLinkCredit(UnsignedInteger.ONE);
+ flow.setDrain(Boolean.TRUE);
+ flow.setHandle(UnsignedInteger.ZERO);
+
+ transport.sendPerformative(flow);
+ PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Flow.class)));
+ Flow responseFlow = (Flow) response.getFrameBody();
+ assertThat(responseFlow.getHandle(), is(notNullValue()));
+ assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+ assertThat(responseFlow.getDrain(), is(equalTo(Boolean.TRUE)));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
|