activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1202153 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java test/java/org/apache/activemq/store/MessagePriorityTest.java
Date Tue, 15 Nov 2011 12:21:20 GMT
Author: gtully
Date: Tue Nov 15 12:21:20 2011
New Revision: 1202153

URL: http://svn.apache.org/viewvc?rev=1202153&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3596 - FilePendingMessageCursor memory list does
not respect priority for non persistent messages. Fix with test. Reuse pendinglist from vm
cursor for the file pending message cursor in memory list

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1202153&r1=1202152&r2=1202153&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Tue Nov 15 12:21:20 2011
@@ -53,7 +53,7 @@ public class FilePendingMessageCursor ex
     protected Broker broker;
     private final PListStore store;
     private final String name;
-    private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>();
+    private PendingList memoryList;
     private PList diskList;
     private Iterator<MessageReference> iter;
     private Destination regionDestination;
@@ -68,6 +68,11 @@ public class FilePendingMessageCursor ex
      */
     public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages)
{
         super(prioritizedMessages);
+        if (this.prioritizedMessages) {
+            this.memoryList = new PrioritizedPendingList();
+        } else {
+            this.memoryList = new OrderedPendingList();
+        }
         this.broker = broker;
         // the store can be null if the BrokerService has persistence
         // turned off
@@ -204,7 +209,7 @@ public class FilePendingMessageCursor ex
                 regionDestination = node.getMessage().getRegionDestination();
                 if (isDiskListEmpty()) {
                     if (hasSpace() || this.store == null) {
-                        memoryList.add(node);
+                        memoryList.addMessageLast(node);
                         node.incrementReferenceCount();
                         setCacheEnabled(true);
                         return true;
@@ -214,7 +219,7 @@ public class FilePendingMessageCursor ex
                     if (isDiskListEmpty()) {
                         expireOldMessages();
                         if (hasSpace()) {
-                            memoryList.add(node);
+                            memoryList.addMessageLast(node);
                             node.incrementReferenceCount();
                             return true;
                         } else {
@@ -252,7 +257,7 @@ public class FilePendingMessageCursor ex
                 regionDestination = node.getMessage().getRegionDestination();
                 if (isDiskListEmpty()) {
                     if (hasSpace()) {
-                        memoryList.addFirst(node);
+                        memoryList.addMessageFirst(node);
                         node.incrementReferenceCount();
                         setCacheEnabled(true);
                         return;
@@ -262,7 +267,7 @@ public class FilePendingMessageCursor ex
                     if (isDiskListEmpty()) {
                         expireOldMessages();
                         if (hasSpace()) {
-                            memoryList.addFirst(node);
+                            memoryList.addMessageFirst(node);
                             node.incrementReferenceCount();
                             return;
                         } else {
@@ -325,7 +330,7 @@ public class FilePendingMessageCursor ex
      */
     @Override
     public synchronized void remove(MessageReference node) {
-        if (memoryList.remove(node)) {
+        if (memoryList.remove(node) != null) {
             node.decrementReferenceCount();
         }
         if (!isDiskListEmpty()) {
@@ -406,19 +411,15 @@ public class FilePendingMessageCursor ex
 
     protected synchronized void expireOldMessages() {
         if (!memoryList.isEmpty()) {
-            LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
-            this.memoryList = new LinkedList<MessageReference>();
-            while (!tmpList.isEmpty()) {
-                MessageReference node = tmpList.removeFirst();
+            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();)
{
+                MessageReference node = iterator.next();
                 if (node.isExpired()) {
                     node.decrementReferenceCount();
                     discardExpiredMessage(node);
-                } else {
-                    memoryList.add(node);
+                    iterator.remove();
                 }
             }
         }
-
     }
 
     protected synchronized void flushToDisk() {
@@ -428,8 +429,8 @@ public class FilePendingMessageCursor ex
                 start = System.currentTimeMillis();
                 LOG.trace("" + name + ", flushToDisk() mem list size: " +memoryList.size()
 + " " +  (systemUsage != null ? systemUsage.getMemoryUsage() : "") );
              }
-            while (!memoryList.isEmpty()) {
-                MessageReference node = memoryList.removeFirst();
+            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();)
{
+                MessageReference node = iterator.next();
                 node.decrementReferenceCount();
                 ByteSequence bs;
                 try {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=1202153&r1=1202152&r2=1202153&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
Tue Nov 15 12:21:20 2011
@@ -18,6 +18,7 @@
 package org.apache.activemq.store;
 
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -53,6 +54,7 @@ abstract public class MessagePriorityTes
     protected Session sess;
     
     public boolean useCache = true;
+    public int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
     public boolean dispatchAsync = true;
     public boolean prioritizeMessages = true;
     public boolean immediatePriorityDispatch = true;
@@ -150,6 +152,7 @@ abstract public class MessagePriorityTes
             try {
                 MessageProducer producer = sess.createProducer(dest);
                 producer.setPriority(priority);
+                producer.setDeliveryMode(deliveryMode);
                 for (int i = 0; i < messageCount; i++) {
                     producer.send(sess.createTextMessage("message priority: " + priority));
                 }
@@ -170,6 +173,7 @@ abstract public class MessagePriorityTes
     
     public void initCombosForTestQueues() {
         addCombinationValues("useCache", new Object[] {new Boolean(true), new Boolean(false)});
+        addCombinationValues("deliveryMode", new Object[] {new Integer(DeliveryMode.NON_PERSISTENT),
new Integer(DeliveryMode.PERSISTENT)});
     }
     
     public void testQueues() throws Exception {



Mime
View raw message