Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 94FEC200B33 for ; Wed, 29 Jun 2016 18:57:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 937F8160A57; Wed, 29 Jun 2016 16:57:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B2AB6160A3C for ; Wed, 29 Jun 2016 18:57:47 +0200 (CEST) Received: (qmail 86442 invoked by uid 500); 29 Jun 2016 16:57:46 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 86433 invoked by uid 99); 29 Jun 2016 16:57:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Jun 2016 16:57:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5CE26E5CE1; Wed, 29 Jun 2016 16:57:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: <030d45b4b0724b3e8b84823c18f28a1a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-6340 Date: Wed, 29 Jun 2016 16:57:46 +0000 (UTC) archived-at: Wed, 29 Jun 2016 16:57:48 -0000 Repository: activemq Updated Branches: refs/heads/master 03a211ec0 -> 4e23adfcc https://issues.apache.org/jira/browse/AMQ-6340 combine the lists in the correct order for later redispatch. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4e23adfc Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4e23adfc Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4e23adfc Branch: refs/heads/master Commit: 4e23adfcc981d3e13e7f1b1182b89a954160a26a Parents: 03a211e Author: Timothy Bish Authored: Wed Jun 29 12:57:30 2016 -0400 Committer: Timothy Bish Committed: Wed Jun 29 12:57:30 2016 -0400 ---------------------------------------------------------------------- .../amqp/JmsTransactedMessageOrderTest.java | 6 ++---- .../broker/region/PrefetchSubscription.java | 18 ++++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/4e23adfc/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java index c286497..2134759 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java @@ -69,8 +69,6 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport { policyEntry.setQueue(">"); policyEntry.setStrictOrderDispatch(true); - policyEntry.setProducerFlowControl(true); - policyEntry.setMemoryLimit(1024 * 1024); policyEntries.add(policyEntry); @@ -85,7 +83,7 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport { sendMessages(5); int counter = 0; - while (counter++ < 10) { + while (counter++ < 20) { LOG.info("Creating connection using prefetch of: {}", prefetch); JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=" + prefetch)); @@ -100,11 +98,11 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport { Message message = consumer.receive(5000); assertNotNull(message); assertTrue(message instanceof TextMessage); + LOG.info("Read message = {}", ((TextMessage) message).getText()); int sequenceID = message.getIntProperty("sequenceID"); assertEquals(0, sequenceID); - LOG.info("Read message = {}", ((TextMessage) message).getText()); session.rollback(); session.close(); connection.close(); http://git-wip-us.apache.org/repos/asf/activemq/blob/4e23adfc/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 0b2935c..74658cc 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -639,31 +640,32 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } public List remove(ConnectionContext context, Destination destination, List dispatched) throws Exception { - List rc = new ArrayList(); + LinkedList redispatch = new LinkedList(); synchronized(pendingLock) { super.remove(context, destination); // Here is a potential problem concerning Inflight stat: // Messages not already committed or rolled back may not be removed from dispatched list at the moment // Except if each commit or rollback callback action comes before remove of subscriber. - rc.addAll(pending.remove(context, destination)); + redispatch.addAll(pending.remove(context, destination)); if (dispatched == null) { - return rc; + return redispatch; } // Synchronized to DispatchLock if necessary if (dispatched == this.dispatched) { synchronized(dispatchLock) { - updateDestinationStats(rc, destination, dispatched); + addReferencesAndUpdateRedispatch(redispatch, destination, dispatched); } } else { - updateDestinationStats(rc, destination, dispatched); + addReferencesAndUpdateRedispatch(redispatch, destination, dispatched); } } - return rc; + + return redispatch; } - private void updateDestinationStats(List rc, Destination destination, List dispatched) { + private void addReferencesAndUpdateRedispatch(LinkedList redispatch, Destination destination, List dispatched) { ArrayList references = new ArrayList(); for (MessageReference r : dispatched) { if (r.getRegionDestination() == destination) { @@ -671,7 +673,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize()); } } - rc.addAll(references); + redispatch.addAll(0, references); destination.getDestinationStatistics().getInflight().subtract(references.size()); dispatched.removeAll(references); }