Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D30F7200B3E for ; Wed, 7 Sep 2016 18:29:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D173C160AC1; Wed, 7 Sep 2016 16:29:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CB4CA160ABF for ; Wed, 7 Sep 2016 18:29:00 +0200 (CEST) Received: (qmail 34365 invoked by uid 500); 7 Sep 2016 16:29:00 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 34356 invoked by uid 99); 7 Sep 2016 16:29:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Sep 2016 16:29:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E1939DFBBB; Wed, 7 Sep 2016 16:28:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gtully@apache.org To: commits@activemq.apache.org Message-Id: <026bd7819e164f47af27a2cb7f2c8c90@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: AMQ-6422 - include the inflight count in the prefetch for positive remote credit flows. Fix and test Date: Wed, 7 Sep 2016 16:28:59 +0000 (UTC) archived-at: Wed, 07 Sep 2016 16:29:02 -0000 Repository: activemq Updated Branches: refs/heads/master 88af1c70d -> 88daeec28 AMQ-6422 - include the inflight count in the prefetch for positive remote credit flows. Fix and test Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/88daeec2 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/88daeec2 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/88daeec2 Branch: refs/heads/master Commit: 88daeec28f25266c6fce15e200ac6d2ca9d11eb6 Parents: 88af1c7 Author: gtully Authored: Wed Sep 7 17:28:35 2016 +0100 Committer: gtully Committed: Wed Sep 7 17:28:35 2016 +0100 ---------------------------------------------------------------------- .../transport/amqp/protocol/AmqpConnection.java | 13 +++ .../transport/amqp/protocol/AmqpSender.java | 24 ++++- .../amqp/interop/AmqpSendReceiveTest.java | 106 +++++++++++++++++++ .../activemq/broker/region/RegionBroker.java | 2 +- 4 files changed, 139 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/88daeec2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java index 929fa24..5a402ba 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java @@ -45,8 +45,10 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.jms.InvalidClientIDException; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.AbstractRegion; import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTempDestination; @@ -712,6 +714,17 @@ public class AmqpConnection implements AmqpProtocolConverter { return result; } + + Subscription lookupPrefetchSubscription(ConsumerInfo consumerInfo) { + Subscription subscription = null; + try { + subscription = ((AbstractRegion)((RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class)).getRegion(consumerInfo.getDestination())).getSubscriptions().get(consumerInfo.getConsumerId()); + } catch (Exception e) { + LOG.warn("Error finding subscription for: " + consumerInfo + ": " + e.getMessage(), false, e); + } + return subscription; + } + ActiveMQDestination createTemporaryDestination(final Link link, Symbol[] capabilities) { ActiveMQDestination rc = null; if (contains(capabilities, TEMP_TOPIC_CAPABILITY)) { http://git-wip-us.apache.org/repos/asf/activemq/blob/88daeec2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 12bd627..0b85858 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -21,6 +21,7 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; import java.io.IOException; import java.util.LinkedList; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ConsumerControl; @@ -52,6 +53,7 @@ import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Sender; import org.fusesource.hawtbuf.Buffer; import org.slf4j.Logger; @@ -79,6 +81,7 @@ public class AmqpSender extends AmqpAbstractLink { private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT"; private final ConsumerInfo consumerInfo; + private Subscription subscription; private final boolean presettle; private boolean draining; @@ -108,6 +111,7 @@ public class AmqpSender extends AmqpAbstractLink { public void open() { if (!isClosed()) { session.registerSender(getConsumerId(), this); + subscription = session.getConnection().lookupPrefetchSubscription(consumerInfo); } super.open(); @@ -162,13 +166,14 @@ public class AmqpSender extends AmqpAbstractLink { @Override public void flow() throws Exception { + Link endpoint = getEndpoint(); if (LOG.isTraceEnabled()) { - LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}", - draining, getEndpoint().getDrain(), - getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued()); + LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}, unsettled={}", + draining, endpoint.getDrain(), + endpoint.getCredit(), endpoint.getRemoteCredit(), endpoint.getQueued(), endpoint.getUnsettled()); } - if (getEndpoint().getDrain() && !draining) { + if (endpoint.getDrain() && !draining) { // Revert to a pull consumer. ConsumerControl control = new ConsumerControl(); @@ -207,7 +212,16 @@ public class AmqpSender extends AmqpAbstractLink { ConsumerControl control = new ConsumerControl(); control.setConsumerId(getConsumerId()); control.setDestination(getDestination()); - control.setPrefetch(getEndpoint().getCredit()); + + int remoteCredit = endpoint.getRemoteCredit(); + if (remoteCredit > 0 && subscription != null) { + // ensure prefetch exceeds credit + inflight + if (remoteCredit + endpoint.getUnsettled() + endpoint.getQueued() > subscription.getPrefetchSize()) { + LOG.trace("Adding dispatched size to credit for sub: " + subscription); + remoteCredit += subscription.getDispatchedQueueSize(); + } + } + control.setPrefetch(remoteCredit); LOG.trace("Flow: update -> consumer control with prefetch {}", control.getPrefetch()); http://git-wip-us.apache.org/repos/asf/activemq/blob/88daeec2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java index e48fef7..c27c0f9 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java @@ -22,6 +22,10 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.util.LinkedList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.activemq.broker.jmx.QueueViewMBean; @@ -92,6 +96,108 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { } @Test(timeout = 60000) + public void testReceiveFlowDispositionSingleCredit() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + for (int i=0;i<2; i++) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("msg" + i); + sender.send(message); + } + sender.close(); + connection.close(); + + LOG.info("Starting consumer connection"); + connection = client.connect(); + session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + + receiver.flow(1); + received.accept(); + + received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testReceiveFlowDispositionSingleCreditTopic() throws Exception { + final AmqpClient client = createAmqpClient(); + final LinkedList errors = new LinkedList(); + final CountDownLatch receiverReady = new CountDownLatch(1); + ExecutorService executorService = Executors.newCachedThreadPool(); + + executorService.submit(new Runnable() { + @Override + public void run() { + try { + LOG.info("Starting consumer connection"); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver("topic://" + getTestName()); + receiver.flow(1); + receiverReady.countDown(); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + + receiver.flow(1); + received.accept(); + + received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + + receiver.close(); + connection.close(); + + } catch (Exception error) { + errors.add(error); + } + + } + }); + + // producer + executorService.submit(new Runnable() { + @Override + public void run() { + try { + + receiverReady.await(20, TimeUnit.SECONDS); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("topic://" + getTestName()); + for (int i = 0; i < 2; i++) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("msg" + i); + sender.send(message); + } + sender.close(); + connection.close(); + } catch (Exception ignored) { + ignored.printStackTrace(); + } + + } + }); + + executorService.shutdown(); + executorService.awaitTermination(20, TimeUnit.SECONDS); + assertTrue("no errors: " + errors, errors.isEmpty()); + } + + + @Test(timeout = 60000) public void testReceiveWithJMSSelectorFilter() throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = client.connect(); http://git-wip-us.apache.org/repos/asf/activemq/blob/88daeec2/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 69e0930..036eed3 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -484,7 +484,7 @@ public class RegionBroker extends EmptyBroker { consumerExchange.getRegion().acknowledge(consumerExchange, ack); } - protected Region getRegion(ActiveMQDestination destination) throws JMSException { + public Region getRegion(ActiveMQDestination destination) throws JMSException { switch (destination.getDestinationType()) { case ActiveMQDestination.QUEUE_TYPE: return queueRegion;