Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 017AB19794 for ; Wed, 6 Apr 2016 14:10:07 +0000 (UTC) Received: (qmail 68019 invoked by uid 500); 6 Apr 2016 14:10:06 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 67941 invoked by uid 500); 6 Apr 2016 14:10:06 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 67932 invoked by uid 99); 6 Apr 2016 14:10:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Apr 2016 14:10:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EED43DFD5B; Wed, 6 Apr 2016 14:10:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gtully@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-6151 - retain list for redeliveries and combine for dispatch/iteration such that redeliveries retain per priority order after prefetch Date: Wed, 6 Apr 2016 14:10:05 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/master a3a8c1c52 -> 2a8218a9a https://issues.apache.org/jira/browse/AMQ-6151 - retain list for redeliveries and combine for dispatch/iteration such that redeliveries retain per priority order after prefetch Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2a8218a9 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2a8218a9 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2a8218a9 Branch: refs/heads/master Commit: 2a8218a9a8cfa74e1049249481f601e042f33358 Parents: a3a8c1c Author: gtully Authored: Wed Apr 6 15:09:36 2016 +0100 Committer: gtully Committed: Wed Apr 6 15:09:36 2016 +0100 ---------------------------------------------------------------------- .../cursors/QueueDispatchPendingList.java | 86 +++++--- .../usecases/PriorityRedeliveryOrderTest.java | 210 +++++++++++++++++++ 2 files changed, 267 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/2a8218a9/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java index 385e2b8..788b5e5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java @@ -40,7 +40,6 @@ public class QueueDispatchPendingList implements PendingList { private PendingList pagedInPendingDispatch = new OrderedPendingList(); private PendingList redeliveredWaitingDispatch = new OrderedPendingList(); - // when true use one PrioritizedPendingList for everything private boolean prioritized = false; @@ -87,7 +86,7 @@ public class QueueDispatchPendingList implements PendingList { public PendingNode remove(MessageReference message) { if (pagedInPendingDispatch.contains(message)) { return pagedInPendingDispatch.remove(message); - }else if (redeliveredWaitingDispatch.contains(message)) { + } else if (redeliveredWaitingDispatch.contains(message)) { return redeliveredWaitingDispatch.remove(message); } return null; @@ -105,31 +104,64 @@ public class QueueDispatchPendingList implements PendingList { @Override public Iterator iterator() { - return new Iterator() { + if (prioritized && hasRedeliveries()) { + final QueueDispatchPendingList delegate = this; + final PrioritizedPendingList priorityOrderedRedeliveredAndPending = new PrioritizedPendingList(); + priorityOrderedRedeliveredAndPending.addAll(redeliveredWaitingDispatch); + priorityOrderedRedeliveredAndPending.addAll(pagedInPendingDispatch); - Iterator redeliveries = redeliveredWaitingDispatch.iterator(); - Iterator pendingDispatch = pagedInPendingDispatch.iterator(); - Iterator current = redeliveries; + return new Iterator() { + Iterator combinedIterator = priorityOrderedRedeliveredAndPending.iterator(); + MessageReference current = null; - @Override - public boolean hasNext() { - if (!redeliveries.hasNext() && (current == redeliveries)) { - current = pendingDispatch; + @Override + public boolean hasNext() { + return combinedIterator.hasNext(); } - return current.hasNext(); - } - - @Override - public MessageReference next() { - return current.next(); - } - - @Override - public void remove() { - current.remove(); - } - }; + + @Override + public MessageReference next() { + current = combinedIterator.next(); + return current; + } + + @Override + public void remove() { + if (current!=null) { + delegate.remove(current); + } + } + }; + + } else { + + return new Iterator() { + + Iterator redeliveries = redeliveredWaitingDispatch.iterator(); + Iterator pendingDispatch = pagedInPendingDispatch.iterator(); + Iterator current = redeliveries; + + + @Override + public boolean hasNext() { + if (!redeliveries.hasNext() && (current == redeliveries)) { + current = pendingDispatch; + } + return current.hasNext(); + } + + @Override + public MessageReference next() { + return current.next(); + } + + @Override + public void remove() { + current.remove(); + } + }; + } } @Override @@ -173,14 +205,10 @@ public class QueueDispatchPendingList implements PendingList { } public void addMessageForRedelivery(QueueMessageReference qmr) { - if (prioritized) { - pagedInPendingDispatch.addMessageLast(qmr); - } else { - redeliveredWaitingDispatch.addMessageLast(qmr); - } + redeliveredWaitingDispatch.addMessageLast(qmr); } public boolean hasRedeliveries(){ - return prioritized ? !pagedInPendingDispatch.isEmpty() : !redeliveredWaitingDispatch.isEmpty(); + return !redeliveredWaitingDispatch.isEmpty(); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/2a8218a9/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/PriorityRedeliveryOrderTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/PriorityRedeliveryOrderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/PriorityRedeliveryOrderTest.java new file mode 100644 index 0000000..35f68f5 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/PriorityRedeliveryOrderTest.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.util.ArrayList; +import java.util.List; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + + +/** + * Sends X messages with a sequence number held in a JMS property "appId" + * Uses all priority 4 message (normal priority) + * closed the consumer connection multiple times so the already prefetched messages will be available + * for dispatch again. + */ + +public class PriorityRedeliveryOrderTest { + + private static final Logger LOG = LoggerFactory.getLogger(PriorityRedeliveryOrderTest.class); + + private static final String DESTINATION = "testQ1"; + private static final int MESSAGES_TO_SEND = 1000; + private static final int MESSAGES_PER_CONSUMER = 200; + private int consumedAppId = -1; + private int totalConsumed; + BrokerService broker; + + @Before + public void createBroker() throws Exception { + + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + + PolicyMap policyMap = new PolicyMap(); + List entries = new ArrayList(); + PolicyEntry pe = new PolicyEntry(); + + pe.setPrioritizedMessages(true); + + pe.setQueue(">"); + entries.add(pe); + policyMap.setPolicyEntries(entries); + broker.setDestinationPolicy(policyMap); + + + broker.addConnector("tcp://0.0.0.0:0"); + + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + @Test + public void testMessageDeliveryOrderAfterPrefetch() throws Exception { + + //send X messages with with a sequence number number in the message property. + sendMessages(MESSAGES_TO_SEND); + + for (int i = 0; i < (MESSAGES_TO_SEND / MESSAGES_PER_CONSUMER); i++) { + totalConsumed += consumeMessages(MESSAGES_PER_CONSUMER); + } + assertEquals("number of messages consumed should be equal to number of messages sent", MESSAGES_TO_SEND, totalConsumed); + } + + private Long sendMessages(int messageCount) throws Exception { + + long numberOfMessageSent = 0; + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()); + + Connection connection = connectionFactory.createConnection(); + connection.start(); + + try { + + Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer jmsProducer = producerSession.createProducer(producerSession.createQueue(DESTINATION)); + + Message sendMessage = producerSession.createTextMessage("test_message"); + + for (int i = 0; i < messageCount; i++) { + + sendMessage.setIntProperty("appID", i); + jmsProducer.send(sendMessage); + producerSession.commit(); + numberOfMessageSent++; + + } + + LOG.info(" Finished after producing : " + numberOfMessageSent); + return numberOfMessageSent; + + } catch (Exception ex) { + LOG.info("Exception received producing ", ex); + LOG.info("finishing after exception :" + numberOfMessageSent); + return numberOfMessageSent; + } finally { + if (connection != null) { + connection.close(); + } + } + } + + /* + Ensure messages are consumed in the expected sequence + */ + + private int consumeMessages(int numberOfMessage) throws Exception { + + LOG.info("Creating new consumer for:" + numberOfMessage); + + + int numberConsumedMessage = 0; + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()); + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + + try { + + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer jmsConsumer = session.createConsumer(session.createQueue(DESTINATION)); + boolean consume = true; + + + while (consume) { + + Message message = jmsConsumer.receive(4000); + + if (message == null) { + LOG.info("Break on:" + numberConsumedMessage); + break; + } + + + int newAppId = message.getIntProperty("appID"); + + numberConsumedMessage++; + + LOG.debug("Message newAppID" + newAppId); + + //check it is next appID in sequence + + if (newAppId != (consumedAppId + 1)) { + fail(" newAppId is " + newAppId + " expected " + (consumedAppId + 1)); + } + + //increase next AppID + consumedAppId = newAppId; + + session.commit(); + + if (numberConsumedMessage == numberOfMessage) { + LOG.info("closing consumer after 200 message, consumedAppID is " + consumedAppId); + return numberConsumedMessage; + } + + } + } finally { + + if (connection != null) { + try { + connection.close(); + } catch (Exception ex) { + + } + } + } + return numberConsumedMessage; + } + +} +