activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: AMQ-6422 - include the inflight count in the prefetch for positive remote credit flows. Fix and test
Date Wed, 07 Sep 2016 16:28:59 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 88af1c70d -> 88daeec28


AMQ-6422 - include the inflight count in the prefetch for positive remote credit flows. Fix
and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/88daeec2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/88daeec2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/88daeec2

Branch: refs/heads/master
Commit: 88daeec28f25266c6fce15e200ac6d2ca9d11eb6
Parents: 88af1c7
Author: gtully <gary.tully@gmail.com>
Authored: Wed Sep 7 17:28:35 2016 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Sep 7 17:28:35 2016 +0100

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpConnection.java |  13 +++
 .../transport/amqp/protocol/AmqpSender.java     |  24 ++++-
 .../amqp/interop/AmqpSendReceiveTest.java       | 106 +++++++++++++++++++
 .../activemq/broker/region/RegionBroker.java    |   2 +-
 4 files changed, 139 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/88daeec2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index 929fa24..5a402ba 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -45,8 +45,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.InvalidClientIDException;
 
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.AbstractRegion;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTempDestination;
@@ -712,6 +714,17 @@ public class AmqpConnection implements AmqpProtocolConverter {
         return result;
     }
 
+
+    Subscription lookupPrefetchSubscription(ConsumerInfo consumerInfo)  {
+        Subscription subscription = null;
+        try {
+            subscription = ((AbstractRegion)((RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class)).getRegion(consumerInfo.getDestination())).getSubscriptions().get(consumerInfo.getConsumerId());
+        } catch (Exception e) {
+            LOG.warn("Error finding subscription for: " + consumerInfo + ": " + e.getMessage(),
false, e);
+        }
+        return subscription;
+    }
+
     ActiveMQDestination createTemporaryDestination(final Link link, Symbol[] capabilities)
{
         ActiveMQDestination rc = null;
         if (contains(capabilities, TEMP_TOPIC_CAPABILITY)) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/88daeec2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 12bd627..0b85858 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -21,6 +21,7 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
 import java.io.IOException;
 import java.util.LinkedList;
 
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ConsumerControl;
@@ -52,6 +53,7 @@ import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Sender;
 import org.fusesource.hawtbuf.Buffer;
 import org.slf4j.Logger;
@@ -79,6 +81,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
 
     private final ConsumerInfo consumerInfo;
+    private Subscription subscription;
     private final boolean presettle;
 
     private boolean draining;
@@ -108,6 +111,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     public void open() {
         if (!isClosed()) {
             session.registerSender(getConsumerId(), this);
+            subscription = session.getConnection().lookupPrefetchSubscription(consumerInfo);
         }
 
         super.open();
@@ -162,13 +166,14 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
 
     @Override
     public void flow() throws Exception {
+        Link endpoint = getEndpoint();
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}",
-                      draining, getEndpoint().getDrain(),
-                      getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
+            LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={},
unsettled={}",
+                    draining, endpoint.getDrain(),
+                    endpoint.getCredit(), endpoint.getRemoteCredit(), endpoint.getQueued(),
endpoint.getUnsettled());
         }
 
-        if (getEndpoint().getDrain() && !draining) {
+        if (endpoint.getDrain() && !draining) {
 
             // Revert to a pull consumer.
             ConsumerControl control = new ConsumerControl();
@@ -207,7 +212,16 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
             ConsumerControl control = new ConsumerControl();
             control.setConsumerId(getConsumerId());
             control.setDestination(getDestination());
-            control.setPrefetch(getEndpoint().getCredit());
+
+            int remoteCredit = endpoint.getRemoteCredit();
+            if (remoteCredit > 0 && subscription != null) {
+                // ensure prefetch exceeds credit + inflight
+                if (remoteCredit + endpoint.getUnsettled() + endpoint.getQueued() > subscription.getPrefetchSize())
{
+                    LOG.trace("Adding dispatched size to credit for sub: " + subscription);
+                    remoteCredit += subscription.getDispatchedQueueSize();
+                }
+            }
+            control.setPrefetch(remoteCredit);
 
             LOG.trace("Flow: update -> consumer control with prefetch {}", control.getPrefetch());
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/88daeec2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index e48fef7..c27c0f9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -22,6 +22,10 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
@@ -92,6 +96,108 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 60000)
+    public void testReceiveFlowDispositionSingleCredit() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        for (int i=0;i<2; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("msg" + i);
+            sender.send(message);
+        }
+        sender.close();
+        connection.close();
+
+        LOG.info("Starting consumer connection");
+        connection = client.connect();
+        session = connection.createSession();
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(1);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+
+        receiver.flow(1);
+        received.accept();
+
+        received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+        received.accept();
+
+        receiver.close();
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiveFlowDispositionSingleCreditTopic() throws Exception {
+        final AmqpClient client = createAmqpClient();
+        final LinkedList<Throwable> errors = new LinkedList<Throwable>();
+        final CountDownLatch receiverReady = new CountDownLatch(1);
+        ExecutorService executorService = Executors.newCachedThreadPool();
+
+        executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    LOG.info("Starting consumer connection");
+                    AmqpConnection connection = client.connect();
+                    AmqpSession session = connection.createSession();
+                    AmqpReceiver receiver = session.createReceiver("topic://" + getTestName());
+                    receiver.flow(1);
+                    receiverReady.countDown();
+                    AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+                    assertNotNull(received);
+
+                    receiver.flow(1);
+                    received.accept();
+
+                    received = receiver.receive(5, TimeUnit.SECONDS);
+                    assertNotNull(received);
+                    received.accept();
+
+                    receiver.close();
+                    connection.close();
+
+                } catch (Exception error) {
+                    errors.add(error);
+                }
+
+            }
+        });
+
+        // producer
+        executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+
+                    receiverReady.await(20, TimeUnit.SECONDS);
+                    AmqpConnection connection = client.connect();
+                    AmqpSession session = connection.createSession();
+
+                    AmqpSender sender = session.createSender("topic://" + getTestName());
+                    for (int i = 0; i < 2; i++) {
+                        AmqpMessage message = new AmqpMessage();
+                        message.setMessageId("msg" + i);
+                        sender.send(message);
+                    }
+                    sender.close();
+                    connection.close();
+                } catch (Exception ignored) {
+                    ignored.printStackTrace();
+                }
+
+            }
+        });
+
+        executorService.shutdown();
+        executorService.awaitTermination(20, TimeUnit.SECONDS);
+        assertTrue("no errors: " + errors, errors.isEmpty());
+    }
+
+
+    @Test(timeout = 60000)
     public void testReceiveWithJMSSelectorFilter() throws Exception {
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = client.connect();

http://git-wip-us.apache.org/repos/asf/activemq/blob/88daeec2/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index 69e0930..036eed3 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -484,7 +484,7 @@ public class RegionBroker extends EmptyBroker {
         consumerExchange.getRegion().acknowledge(consumerExchange, ack);
     }
 
-    protected Region getRegion(ActiveMQDestination destination) throws JMSException {
+    public Region getRegion(ActiveMQDestination destination) throws JMSException {
         switch (destination.getDestinationType()) {
             case ActiveMQDestination.QUEUE_TYPE:
                 return queueRegion;


Mime
View raw message