activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1241077 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Date Mon, 06 Feb 2012 16:59:08 GMT
Author: tabish
Date: Mon Feb  6 16:59:07 2012
New Revision: 1241077

URL: http://svn.apache.org/viewvc?rev=1241077&view=rev
Log:
fix for: for: https://issues.apache.org/jira/browse/AMQ-3700

Prevent any calls to wakeup becoming recursive calls into iterate() and instead queue a wakeup
so that we don't miss dispatching any messages as things change in the memory usage.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1241077&r1=1241076&r2=1241077&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Feb  6 16:59:07 2012
@@ -82,8 +82,6 @@ import org.slf4j.MDC;
 /**
  * The Queue is a List of MessageEntry objects that are dispatched to matching
  * subscriptions.
- *
- *
  */
 public class Queue extends BaseDestination implements Task, UsageListener {
     protected static final Logger LOG = LoggerFactory.getLogger(Queue.class);
@@ -105,12 +103,12 @@ public class Queue extends BaseDestinati
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
     final Lock sendLock = new ReentrantLock();
     private ExecutorService executor;
-    protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections
-            .synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
+    private final Map<MessageId, Runnable> messagesWaitingForSpace = new LinkedHashMap<MessageId,
Runnable>();
     private boolean useConsumerPriority = true;
     private boolean strictOrderDispatch = false;
     private final QueueDispatchSelector dispatchSelector;
     private boolean optimizedDispatch = false;
+    private boolean iterationRunning = false;
     private boolean firstConsumer = false;
     private int timeBeforeDispatchStarts = 0;
     private int consumersBeforeDispatchStarts = 0;
@@ -1403,6 +1401,11 @@ public class Queue extends BaseDestinati
         boolean pageInMoreMessages = false;
         synchronized (iteratingMutex) {
 
+            // If optimize dispatch is on or this is a slave this method could be called
recursively
+            // we set this state value to short-circuit wakeup in those cases to avoid that
as it
+            // could lead to errors.
+            iterationRunning = true;
+
             // do early to allow dispatch of these waiting messages
             synchronized (messagesWaitingForSpace) {
                 Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
@@ -1454,14 +1457,14 @@ public class Queue extends BaseDestinati
             messagesLock.readLock().lock();
             try{
                 pageInMoreMessages |= !messages.isEmpty();
-            }finally {
+            } finally {
                 messagesLock.readLock().unlock();
             }
 
             pagedInPendingDispatchLock.readLock().lock();
             try {
                 pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
-            }finally {
+            } finally {
                 pagedInPendingDispatchLock.readLock().unlock();
             }
 
@@ -1517,6 +1520,8 @@ public class Queue extends BaseDestinati
                 pendingWakeups.decrementAndGet();
             }
             MDC.remove("activemq.destination");
+            iterationRunning = false;
+
             return pendingWakeups.get() > 0;
         }
     }
@@ -1677,7 +1682,7 @@ public class Queue extends BaseDestinati
     }
 
     public void wakeup() {
-        if (optimizedDispatch || isSlave()) {
+        if ((optimizedDispatch || isSlave()) && !iterationRunning) {
             iterate();
             pendingWakeups.incrementAndGet();
         } else {



Mime
View raw message