activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r698573 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Date Wed, 24 Sep 2008 13:51:18 GMT
Author: rajdavies
Date: Wed Sep 24 06:51:18 2008
New Revision: 698573

URL: http://svn.apache.org/viewvc?rev=698573&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1947

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=698573&r1=698572&r2=698573&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Sep 24 06:51:18 2008
@@ -18,13 +18,11 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
@@ -86,7 +84,7 @@
     protected PendingMessageCursor messages;
     private final LinkedHashMap<MessageId,QueueMessageReference> pagedInMessages =
new LinkedHashMap<MessageId,QueueMessageReference>();
     // Messages that are paged in but have not yet been targeted at a subscription
-    private LinkedHashSet<QueueMessageReference> pagedInPendingDispatch = new LinkedHashSet<QueueMessageReference>(100);
+    private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
     private MessageGroupMap messageGroupOwners;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
@@ -968,7 +966,9 @@
 	                            dispatchLock.lock();
 	                            try {
 	                                synchronized(pagedInPendingDispatch) {
-	                                    pagedInPendingDispatch.add(node);
+	                                    if (!pagedInPendingDispatch.contains(node)) {
+	                                        pagedInPendingDispatch.add(node);
+	                                    }
 	                                }
 	                            } finally {
 	                                dispatchLock.unlock();
@@ -1219,7 +1219,15 @@
                 // the pending
                 // list anything that does not actually get dispatched.
                 if (list != null && !list.isEmpty()) {
-                    pagedInPendingDispatch.addAll(doActualDispatch(list));
+                    if (pagedInPendingDispatch.isEmpty()) {
+                        pagedInPendingDispatch.addAll(doActualDispatch(list));
+                    } else {
+                        for (QueueMessageReference qmr : list) {
+                            if (!pagedInPendingDispatch.contains(qmr)) {
+                                pagedInPendingDispatch.add(qmr);
+                            }
+                        }
+                    }
                 }
             }
         } finally {
@@ -1231,8 +1239,8 @@
      * @return list of messages that could get dispatched to consumers if they
      *         were not full.
      */
-    private LinkedHashSet<QueueMessageReference> doActualDispatch(Collection<QueueMessageReference>
collection) throws Exception {
-        LinkedHashSet<QueueMessageReference> rc = new LinkedHashSet<QueueMessageReference>(collection.size());
+    private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference>
list) throws Exception {
+        List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
         Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
         List<Subscription> consumers;
         
@@ -1240,7 +1248,7 @@
             consumers = new ArrayList<Subscription>(this.consumers);
         }
 
-        for (MessageReference node : collection) {
+        for (MessageReference node : list) {
             Subscription target = null;
             int interestCount=0;
             for (Subscription s : consumers) {



Mime
View raw message