activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6151 - respect prioritizeMessages for pending and redelivered messages
Date Mon, 01 Feb 2016 12:46:25 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 186b5d0f3 -> 5af5b59d3


https://issues.apache.org/jira/browse/AMQ-6151 - respect prioritizeMessages for pending and
redelivered messages


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5af5b59d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5af5b59d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5af5b59d

Branch: refs/heads/master
Commit: 5af5b59d3bf3c84098e55b6cb87631c061990666
Parents: 186b5d0
Author: gtully <gary.tully@gmail.com>
Authored: Mon Feb 1 12:19:24 2016 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Mon Feb 1 12:19:49 2016 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    | 10 ++++
 .../cursors/QueueDispatchPendingList.java       | 11 ++++-
 .../activemq/store/MessagePriorityTest.java     | 51 ++++++++++++++++++++
 3 files changed, 70 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5af5b59d/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 9a7feef..960ac9c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -2001,6 +2001,16 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
 
         pagedInPendingDispatchLock.writeLock().lock();
         try {
+            if (isPrioritizedMessages() && !dispatchPendingList.isEmpty() &&
list != null && !list.isEmpty()) {
+                // merge all to select priority order
+                for (MessageReference qmr : list) {
+                    if (!dispatchPendingList.contains(qmr)) {
+                        dispatchPendingList.addMessageLast(qmr);
+                    }
+                }
+                list = null;
+            }
+
             doActualDispatch(dispatchPendingList);
             // and now see if we can dispatch the new stuff.. and append to the pending
             // list anything that does not actually get dispatched.

http://git-wip-us.apache.org/repos/asf/activemq/blob/5af5b59d/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
index cdddd4c..385e2b8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
@@ -40,6 +40,8 @@ public class QueueDispatchPendingList implements PendingList {
 
     private PendingList pagedInPendingDispatch = new OrderedPendingList();
     private PendingList redeliveredWaitingDispatch = new OrderedPendingList();
+    // when true use one PrioritizedPendingList for everything
+    private boolean prioritized = false;
 
 
     @Override
@@ -160,6 +162,7 @@ public class QueueDispatchPendingList implements PendingList {
     }
 
     public void setPrioritizedMessages(boolean prioritizedMessages) {
+        prioritized = prioritizedMessages;
         if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList)
{
             pagedInPendingDispatch = new PrioritizedPendingList();
             redeliveredWaitingDispatch = new PrioritizedPendingList();
@@ -170,10 +173,14 @@ public class QueueDispatchPendingList implements PendingList {
     }
 
     public void addMessageForRedelivery(QueueMessageReference qmr) {
-        redeliveredWaitingDispatch.addMessageLast(qmr);
+        if (prioritized) {
+            pagedInPendingDispatch.addMessageLast(qmr);
+        } else {
+            redeliveredWaitingDispatch.addMessageLast(qmr);
+        }
     }
 
     public boolean hasRedeliveries(){
-        return !redeliveredWaitingDispatch.isEmpty();
+        return prioritized ? !pagedInPendingDispatch.isEmpty() : !redeliveredWaitingDispatch.isEmpty();
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/5af5b59d/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
index 789e45f..e7c746c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
@@ -110,6 +110,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport
{
         broker.waitUntilStarted();
 
         factory = new ActiveMQConnectionFactory("vm://priorityTest");
+        factory.setMessagePrioritySupported(true);
         ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
         prefetch.setAll(prefetchVal);
         factory.setPrefetchPolicy(prefetch);
@@ -668,6 +669,56 @@ abstract public class MessagePriorityTest extends CombinationTestSupport
{
         queueConsumer.close();
     }
 
+    public void testInterleaveHiNewConsumerGetsHi() throws Exception {
+        ActiveMQQueue queue = (ActiveMQQueue) sess.createQueue("TEST");
+        doTestInterleaveHiNewConsumerGetsHi(queue);
+    }
+
+    public void testInterleaveHiNewConsumerGetsHiPull() throws Exception {
+        ActiveMQQueue queue = (ActiveMQQueue) sess.createQueue("TEST?consumer.prefetchSize=0");
+        doTestInterleaveHiNewConsumerGetsHi(queue);
+    }
+
+    public void doTestInterleaveHiNewConsumerGetsHi(ActiveMQQueue queue) throws Exception
{
+
+        // one hi sandwich
+        ProducerThread producerThread = new ProducerThread(queue, 3, LOW_PRI);
+        producerThread.run();
+        producerThread = new ProducerThread(queue, 1, HIGH_PRI);
+        producerThread.run();
+        producerThread = new ProducerThread(queue, 3, LOW_PRI);
+        producerThread.run();
+
+        // consume hi
+        MessageConsumer queueConsumer = sess.createConsumer(queue);
+        Message message = queueConsumer.receive(10000);
+        assertNotNull("expect #", message);
+        assertEquals("correct priority", HIGH_PRI, message.getJMSPriority());
+        queueConsumer.close();
+
+        // last hi
+        producerThread = new ProducerThread(queue, 3, LOW_PRI);
+        producerThread.run();
+        producerThread = new ProducerThread(queue, 1, HIGH_PRI);
+        producerThread.run();
+
+        // consume hi
+        queueConsumer = sess.createConsumer(queue);
+        message = queueConsumer.receive(10000);
+        assertNotNull("expect #", message);
+        assertEquals("correct priority", HIGH_PRI, message.getJMSPriority());
+        queueConsumer.close();
+
+        // consume the rest
+        queueConsumer = sess.createConsumer(queue);
+        for (int i = 0; i < 9; i++) {
+            message = queueConsumer.receive(10000);
+            assertNotNull("expect #" + i, message);
+            assertEquals("correct priority", LOW_PRI, message.getJMSPriority());
+        }
+        queueConsumer.close();
+    }
+
     public void initCombosForTestEveryXHi() {
         // the cache limits the priority ordering to available memory
         addCombinationValues("useCache", new Object[] {Boolean.FALSE, Boolean.TRUE});


Mime
View raw message