Author: tabish
Date: Mon Feb 6 16:59:07 2012
New Revision: 1241077
URL: http://svn.apache.org/viewvc?rev=1241077&view=rev
Log:
fix for: for: https://issues.apache.org/jira/browse/AMQ-3700
Prevent any calls to wakeup becoming recursive calls into iterate() and instead queue a wakeup
so that we don't miss dispatching any messages as things change in the memory usage.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1241077&r1=1241076&r2=1241077&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Feb 6 16:59:07 2012
@@ -82,8 +82,6 @@ import org.slf4j.MDC;
/**
* The Queue is a List of MessageEntry objects that are dispatched to matching
* subscriptions.
- *
- *
*/
public class Queue extends BaseDestination implements Task, UsageListener {
protected static final Logger LOG = LoggerFactory.getLogger(Queue.class);
@@ -105,12 +103,12 @@ public class Queue extends BaseDestinati
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
final Lock sendLock = new ReentrantLock();
private ExecutorService executor;
- protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections
- .synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
+ private final Map<MessageId, Runnable> messagesWaitingForSpace = new LinkedHashMap<MessageId,
Runnable>();
private boolean useConsumerPriority = true;
private boolean strictOrderDispatch = false;
private final QueueDispatchSelector dispatchSelector;
private boolean optimizedDispatch = false;
+ private boolean iterationRunning = false;
private boolean firstConsumer = false;
private int timeBeforeDispatchStarts = 0;
private int consumersBeforeDispatchStarts = 0;
@@ -1403,6 +1401,11 @@ public class Queue extends BaseDestinati
boolean pageInMoreMessages = false;
synchronized (iteratingMutex) {
+ // If optimize dispatch is on or this is a slave this method could be called
recursively
+ // we set this state value to short-circuit wakeup in those cases to avoid that
as it
+ // could lead to errors.
+ iterationRunning = true;
+
// do early to allow dispatch of these waiting messages
synchronized (messagesWaitingForSpace) {
Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
@@ -1454,14 +1457,14 @@ public class Queue extends BaseDestinati
messagesLock.readLock().lock();
try{
pageInMoreMessages |= !messages.isEmpty();
- }finally {
+ } finally {
messagesLock.readLock().unlock();
}
pagedInPendingDispatchLock.readLock().lock();
try {
pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
- }finally {
+ } finally {
pagedInPendingDispatchLock.readLock().unlock();
}
@@ -1517,6 +1520,8 @@ public class Queue extends BaseDestinati
pendingWakeups.decrementAndGet();
}
MDC.remove("activemq.destination");
+ iterationRunning = false;
+
return pendingWakeups.get() > 0;
}
}
@@ -1677,7 +1682,7 @@ public class Queue extends BaseDestinati
}
public void wakeup() {
- if (optimizedDispatch || isSlave()) {
+ if ((optimizedDispatch || isSlave()) && !iterationRunning) {
iterate();
pendingWakeups.incrementAndGet();
} else {
|