activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmacn...@apache.org
Subject svn commit: r888227 [2/2] - in /activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker: jmx/ region/ region/policy/
Date Tue, 08 Dec 2009 02:27:30 GMT
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=888227&r1=888226&r2=888227&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Dec  8 02:27:30 2009
@@ -76,7 +76,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-
 /**
  * The Queue is a List of MessageEntry objects that are dispatched to matching
  * subscriptions.
@@ -86,10 +85,10 @@
 public class Queue extends BaseDestination implements Task, UsageListener {
     protected static final Log LOG = LogFactory.getLog(Queue.class);
     protected final TaskRunnerFactory taskFactory;
-    protected TaskRunner taskRunner;    
+    protected TaskRunner taskRunner;
     protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
     protected PendingMessageCursor messages;
-    private final LinkedHashMap<MessageId,QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId,QueueMessageReference>();
+    private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
     // Messages that are paged in but have not yet been targeted at a subscription
     private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
     private MessageGroupMap messageGroupOwners;
@@ -99,16 +98,16 @@
     private ExecutorService executor;
     protected final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
     private final Object dispatchMutex = new Object();
-    private boolean useConsumerPriority=true;
-    private boolean strictOrderDispatch=false;
+    private boolean useConsumerPriority = true;
+    private boolean strictOrderDispatch = false;
     private QueueDispatchSelector dispatchSelector;
-    private boolean optimizedDispatch=false;
+    private boolean optimizedDispatch = false;
     private boolean firstConsumer = false;
     private int timeBeforeDispatchStarts = 0;
     private int consumersBeforeDispatchStarts = 0;
     private CountDownLatch consumersBeforeStartsLatch;
     private AtomicLong pendingWakeups = new AtomicLong();
-    
+
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
         public void run() {
             asyncWakeup();
@@ -116,25 +115,24 @@
     };
     private final Runnable expireMessagesTask = new Runnable() {
         public void run() {
-            expireMessages();          
+            expireMessages();
         }
     };
     private final Object iteratingMutex = new Object() {};
     private static final Scheduler scheduler = Scheduler.getInstance();
-    
-    private static final Comparator<Subscription>orderedCompare = new Comparator<Subscription>() {
+
+    private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
 
         public int compare(Subscription s1, Subscription s2) {
             //We want the list sorted in descending order
             return s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
-        }        
+        }
     };
-               
-    public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,DestinationStatistics parentStats,
-                 TaskRunnerFactory taskFactory) throws Exception {
+
+    public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
         super(brokerService, store, destination, parentStats);
-        this.taskFactory=taskFactory;       
-        this.dispatchSelector=new QueueDispatchSelector(destination);
+        this.taskFactory = taskFactory;
+        this.dispatchSelector = new QueueDispatchSelector(destination);
     }
 
     public List<Subscription> getConsumers() {
@@ -146,13 +144,13 @@
     // make the queue easily visible in the debugger from its task runner threads
     final class QueueThread extends Thread {
         final Queue queue;
-        public QueueThread(Runnable runnable, String name,
-                Queue queue) {
+
+        public QueueThread(Runnable runnable, String name, Queue queue) {
             super(runnable, name);
             this.queue = queue;
         }
     }
-    
+
     public void initialize() throws Exception {
         if (this.messages == null) {
             if (destination.isTemporary() || broker == null || store == null) {
@@ -169,10 +167,9 @@
             this.systemUsage = brokerService.getSystemUsage();
             memoryUsage.setParent(systemUsage.getMemoryUsage());
         }
-        
-        this.taskRunner =
-            taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
-        
+
+        this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
+
         super.initialize();
         if (store != null) {
             // Restore the persistent messages.
@@ -218,12 +215,12 @@
                     public boolean hasSpace() {
                         return true;
                     }
-                    
+
                     public boolean isDuplicate(MessageId id) {
                         return false;
                     }
                 });
-            }else {
+            } else {
                 int messageCount = store.getMessageCount();
                 destinationStatistics.getMessages().setCount(messageCount);
             }
@@ -231,22 +228,20 @@
     }
 
     /*
-     * Holder for subscription and pagedInMessages as a browser
-     * needs access to existing messages in the queue that have
-     * already been dispatched
+     * Holder for subscription and pagedInMessages as a browser needs access to
+     * existing messages in the queue that have already been dispatched
      */
     class BrowserDispatch {
         ArrayList<QueueMessageReference> messages;
         QueueBrowserSubscription browser;
-        
-        public BrowserDispatch(QueueBrowserSubscription browserSubscription,
-                Collection<QueueMessageReference> values) {
-            
-            messages =  new ArrayList<QueueMessageReference>(values);
+
+        public BrowserDispatch(QueueBrowserSubscription browserSubscription, Collection<QueueMessageReference> values) {
+
+            messages = new ArrayList<QueueMessageReference>(values);
             browser = browserSubscription;
             browser.incrementQueueRef();
         }
-        
+
         void done() {
             try {
                 browser.decrementQueueRef();
@@ -259,57 +254,57 @@
             return browser;
         }
     }
-   
+
     LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>();
 
     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
         // synchronize with dispatch method so that no new messages are sent
         // while setting up a subscription. avoid out of order messages,
         // duplicates, etc.
-        synchronized(dispatchMutex) {
-        
+        synchronized (dispatchMutex) {
+
             sub.add(context, this);
             destinationStatistics.getConsumers().increment();
 
             // needs to be synchronized - so no contention with dispatching
             synchronized (consumers) {
-            	
-            	// set a flag if this is a first consumer
-            	if (consumers.size() == 0) {
-            		firstConsumer = true;
-            		if (consumersBeforeDispatchStarts != 0) {
-            			consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
-            		}
-            	} else {
-                	if (consumersBeforeStartsLatch != null) {
-                		consumersBeforeStartsLatch.countDown();
-                	}
-            	}
-            	
+
+                // set a flag if this is a first consumer
+                if (consumers.size() == 0) {
+                    firstConsumer = true;
+                    if (consumersBeforeDispatchStarts != 0) {
+                        consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
+                    }
+                } else {
+                    if (consumersBeforeStartsLatch != null) {
+                        consumersBeforeStartsLatch.countDown();
+                    }
+                }
+
                 addToConsumerList(sub);
                 if (sub.getConsumerInfo().isExclusive()) {
                     Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
-                    if(exclusiveConsumer==null) {
-                        exclusiveConsumer=sub;
-                    }else if (sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()){
-                        exclusiveConsumer=sub;
+                    if (exclusiveConsumer == null) {
+                        exclusiveConsumer = sub;
+                    } else if (sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) {
+                        exclusiveConsumer = sub;
                     }
                     dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
                 }
             }
-            
-            if (sub instanceof QueueBrowserSubscription ) { 
+
+            if (sub instanceof QueueBrowserSubscription) {
                 QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
-            	
+
                 // do again in iterate to ensure new messages are dispatched
                 pageInMessages(false);
-                
-            	synchronized (pagedInMessages) {
-            	    if (!pagedInMessages.isEmpty()) {
-            	        BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription, pagedInMessages.values());
-            	        browserDispatches.addLast(browserDispatch);
-            	    }
-            	}
+
+                synchronized (pagedInMessages) {
+                    if (!pagedInMessages.isEmpty()) {
+                        BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription, pagedInMessages.values());
+                        browserDispatches.addLast(browserDispatch);
+                    }
+                }
             }
             if (!(this.optimizedDispatch || isSlave())) {
                 wakeup();
@@ -322,30 +317,23 @@
         }
     }
 
-    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
-            throws Exception {
+    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) throws Exception {
         destinationStatistics.getConsumers().decrement();
         // synchronize with dispatch method so that no new messages are sent
         // while removing up a subscription.
-        synchronized(dispatchMutex) {
+        synchronized (dispatchMutex) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId
-                        + ", dequeues: " + getDestinationStatistics().getDequeues().getCount()
-                        + ", dispatched: " + getDestinationStatistics().getDispatched().getCount()
-                        + ", inflight: " + getDestinationStatistics().getInflight().getCount());
+                LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: " + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
+                        + getDestinationStatistics().getDispatched().getCount() + ", inflight: " + getDestinationStatistics().getInflight().getCount());
             }
             synchronized (consumers) {
                 removeFromConsumerList(sub);
                 if (sub.getConsumerInfo().isExclusive()) {
-                    Subscription exclusiveConsumer = dispatchSelector
-                            .getExclusiveConsumer();
+                    Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
                     if (exclusiveConsumer == sub) {
                         exclusiveConsumer = null;
                         for (Subscription s : consumers) {
-                            if (s.getConsumerInfo().isExclusive()
-                                    && (exclusiveConsumer == null
-                                    || s.getConsumerInfo().getPriority() > exclusiveConsumer
-                                            .getConsumerInfo().getPriority())) {
+                            if (s.getConsumerInfo().isExclusive() && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority())) {
                                 exclusiveConsumer = s;
 
                             }
@@ -355,12 +343,12 @@
                 }
                 ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
                 getMessageGroupOwners().removeConsumer(consumerId);
-                
+
                 // redeliver inflight messages
                 List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
                 for (MessageReference ref : sub.remove(context, this)) {
-                    QueueMessageReference qmr = (QueueMessageReference)ref;
-                    if( qmr.getLockOwner()==sub ) {
+                    QueueMessageReference qmr = (QueueMessageReference) ref;
+                    if (qmr.getLockOwner() == sub) {
                         qmr.unlock();
                         // only increment redelivery if it was delivered or we have no delivery information
                         if (lastDeiveredSequenceId == 0 || qmr.getMessageId().getBrokerSequenceId() <= lastDeiveredSequenceId) {
@@ -369,7 +357,7 @@
                     }
                     list.add(qmr);
                 }
-                
+
                 if (!list.isEmpty()) {
                     doDispatch(list);
                 }
@@ -401,32 +389,33 @@
             }
             return;
         }
-        if(memoryUsage.isFull()) {
+        if (memoryUsage.isFull()) {
             isFull(context, memoryUsage);
             fastProducer(context, producerInfo);
             if (isProducerFlowControl() && context.isProducerFlowControl()) {
-                if(warnOnProducerFlowControl) {
+                if (warnOnProducerFlowControl) {
                     warnOnProducerFlowControl = false;
-                    LOG.info("Usage Manager memory limit reached on " +getActiveMQDestination().getQualifiedName() + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." +
-                            " See http://activemq.apache.org/producer-flow-control.html for more info");
+                    LOG.info("Usage Manager Memory Limit reached on " + getActiveMQDestination().getQualifiedName()
+                            + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
+                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
                 }
-                
+
                 if (systemUsage.isSendFailIfNoSpace()) {
-                    throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " +getActiveMQDestination().getQualifiedName() + "." +
-                            " See http://activemq.apache.org/producer-flow-control.html for more info");
+                    throw new javax.jms.ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+                            + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
                 }
-    
+
                 // We can avoid blocking due to low usage if the producer is sending
                 // a sync message or
                 // if it is using a producer window
                 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
                     // copy the exchange state since the context will be modified while we are waiting
                     // for space.
-                    final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy(); 
+                    final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
                     synchronized (messagesWaitingForSpace) {
                         messagesWaitingForSpace.add(new Runnable() {
                             public void run() {
-    
+
                                 try {
                                     // While waiting for space to free up... the
                                     // message may have expired.
@@ -437,7 +426,7 @@
                                     } else {
                                         doMessageSend(producerExchangeCopy, message);
                                     }
-    
+
                                     if (sendProducerAck) {
                                         ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
                                         context.getConnection().dispatchAsync(ack);
@@ -446,7 +435,7 @@
                                         response.setCorrelationId(message.getCommandId());
                                         context.getConnection().dispatchAsync(response);
                                     }
-    
+
                                 } catch (Exception e) {
                                     if (!sendProducerAck && !context.isInRecoveryMode()) {
                                         ExceptionResponse response = new ExceptionResponse(e);
@@ -456,7 +445,7 @@
                                 }
                             }
                         });
-    
+
                         // If the user manager is not full, then the task will not
                         // get called..
                         if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
@@ -466,18 +455,14 @@
                         context.setDontSendReponse(true);
                         return;
                     }
-    
+
                 } else {
-    
-                    // Producer flow control cannot be used, so we have do the flow
-                    // control at the broker
-                    // by blocking this thread until there is space available.
-                    while (!memoryUsage.waitForSpace(1000)) {
-                        if (context.getStopping().get()) {
-                            throw new IOException("Connection closed, send aborted.");
-                        }
+
+                    if (memoryUsage.isFull()) {
+                        waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer (" + message.getProducerId() + ") stopped to prevent flooding "
+                                + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
                     }
-    
+
                     // The usage manager could have delayed us by the time
                     // we unblock the message could have expired..
                     if (message.isExpired()) {
@@ -502,19 +487,15 @@
         synchronized (sendLock) {
             if (store != null && message.isPersistent()) {
                 if (systemUsage.getStoreUsage().isFull()) {
-                    final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." +
-                            " See http://activemq.apache.org/producer-flow-control.html for more info";
-                    LOG.info(logMessage);
+
+                    String logMessage = "Usage Manager Store is Full. Producer (" + message.getProducerId() + ") stopped to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+                            + " See http://activemq.apache.org/producer-flow-control.html for more info";
+
                     if (systemUsage.isSendFailIfNoSpace()) {
                         throw new javax.jms.ResourceAllocationException(logMessage);
                     }
-                }
-                while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
-                    if (context.getStopping().get()) {
-                        throw new IOException(
-                                "Connection closed, send aborted.");
-                    }
-                    LOG.debug(this  + ", waiting for store space... msg: " + message);
+
+                    waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
                 }
                 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
                 store.addMessage(context, message);
@@ -553,12 +534,12 @@
             sendMessage(context, message);
         }
     }
-    
+
     private void expireMessages() {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Expiring messages ..");
         }
-        
+
         // just track the insertion count
         List<Message> browsedMessages = new AbstractList<Message>() {
             int size = 0;
@@ -582,9 +563,9 @@
         asyncWakeup();
     }
 
-    public void gc(){
+    public void gc() {
     }
-    
+
     public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
         messageConsumed(context, node);
         if (store != null && node.isPersistent()) {
@@ -617,8 +598,8 @@
         synchronized (messages) {
             size = messages.size();
         }
-        return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size
-               + ", in flight groups=" + messageGroupOwners;
+        return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
+                + messageGroupOwners;
     }
 
     public void start() throws Exception {
@@ -633,25 +614,25 @@
         doPageIn(false);
     }
 
-    public void stop() throws Exception{
+    public void stop() throws Exception {
         if (taskRunner != null) {
             taskRunner.shutdown();
         }
         if (this.executor != null) {
             this.executor.shutdownNow();
         }
-        
+
         scheduler.cancel(expireMessagesTask);
-        
+
         if (messages != null) {
             messages.stop();
         }
-        
+
         systemUsage.getMemoryUsage().removeUsageListener(this);
         if (memoryUsage != null) {
             memoryUsage.stop();
         }
-        if (store!=null) {
+        if (store != null) {
             store.stop();
         }
     }
@@ -662,7 +643,6 @@
         return destination;
     }
 
-    
     public MessageGroupMap getMessageGroupOwners() {
         if (messageGroupOwners == null) {
             messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
@@ -693,7 +673,7 @@
     public void setMessages(PendingMessageCursor messages) {
         this.messages = messages;
     }
-    
+
     public boolean isUseConsumerPriority() {
         return useConsumerPriority;
     }
@@ -709,7 +689,6 @@
     public void setStrictOrderDispatch(boolean strictOrderDispatch) {
         this.strictOrderDispatch = strictOrderDispatch;
     }
-    
 
     public boolean isOptimizedDispatch() {
         return optimizedDispatch;
@@ -718,21 +697,22 @@
     public void setOptimizedDispatch(boolean optimizedDispatch) {
         this.optimizedDispatch = optimizedDispatch;
     }
-	public int getTimeBeforeDispatchStarts() {
-		return timeBeforeDispatchStarts;
-	}
-
-	public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
-		this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
-	}
-
-	public int getConsumersBeforeDispatchStarts() {
-		return consumersBeforeDispatchStarts;
-	}
-
-	public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
-		this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
-	}
+
+    public int getTimeBeforeDispatchStarts() {
+        return timeBeforeDispatchStarts;
+    }
+
+    public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
+        this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
+    }
+
+    public int getConsumersBeforeDispatchStarts() {
+        return consumersBeforeDispatchStarts;
+    }
+
+    public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
+        this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
+    }
 
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -741,19 +721,18 @@
         return result;
     }
 
-    public Message[] browse() {    
+    public Message[] browse() {
         List<Message> l = new ArrayList<Message>();
         doBrowse(l, getMaxBrowsePageSize());
         return l.toArray(new Message[l.size()]);
     }
-    
-    
+
     public void doBrowse(List<Message> l, int max) {
         final ConnectionContext connectionContext = createConnectionContext();
         try {
             pageInMessages(false);
             List<MessageReference> toExpire = new ArrayList<MessageReference>();
-            synchronized(dispatchMutex) {
+            synchronized (dispatchMutex) {
                 synchronized (pagedInPendingDispatch) {
                     addAll(pagedInPendingDispatch, l, max, toExpire);
                     for (MessageReference ref : toExpire) {
@@ -776,17 +755,16 @@
                         }
                     }
                 }
-                
+
                 if (l.size() < getMaxBrowsePageSize()) {
                     synchronized (messages) {
                         try {
                             messages.reset();
                             while (messages.hasNext() && l.size() < max) {
-                                MessageReference node = messages.next();        
+                                MessageReference node = messages.next();
                                 if (node.isExpired()) {
                                     if (broker.isExpired(node)) {
-                                        messageExpired(connectionContext,
-                                                createMessageReference(node.getMessage()));
+                                        messageExpired(connectionContext, createMessageReference(node.getMessage()));
                                     }
                                     messages.remove();
                                 } else {
@@ -801,16 +779,14 @@
                         }
                     }
                 }
-            } 
+            }
         } catch (Exception e) {
             LOG.error("Problem retrieving message for browse", e);
-        }     
+        }
     }
 
-    private void addAll(Collection<QueueMessageReference> refs,
-            List<Message> l, int maxBrowsePageSize, List<MessageReference> toExpire) throws Exception {
-        for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext()
-                && l.size() < getMaxBrowsePageSize();) {
+    private void addAll(Collection<QueueMessageReference> refs, List<Message> l, int maxBrowsePageSize, List<MessageReference> toExpire) throws Exception {
+        for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
             QueueMessageReference ref = i.next();
             if (ref.isExpired()) {
                 toExpire.add(ref);
@@ -844,8 +820,7 @@
                                 break;
                             }
                         } catch (IOException e) {
-                            LOG.error("got an exception retrieving message "
-                                    + id);
+                            LOG.error("got an exception retrieving message " + id);
                         }
                     }
                 } finally {
@@ -858,10 +833,10 @@
         return null;
     }
 
-    public void purge() throws Exception {   
+    public void purge() throws Exception {
         ConnectionContext c = createConnectionContext();
         List<MessageReference> list = null;
-        do {        
+        do {
             doPageIn(true);
             synchronized (pagedInMessages) {
                 list = new ArrayList<MessageReference>(pagedInMessages.values());
@@ -870,11 +845,11 @@
             for (MessageReference ref : list) {
                 try {
                     QueueMessageReference r = (QueueMessageReference) ref;
-                        removeMessage(c,(IndirectMessageReference) r);
+                    removeMessage(c, (IndirectMessageReference) r);
                 } catch (IOException e) {
                 }
             }
-            
+
         } while (!pagedInMessages.isEmpty() || this.destinationStatistics.getMessages().getCount() > 0);
         gc();
         this.destinationStatistics.getMessages().setCount(0);
@@ -922,15 +897,14 @@
             synchronized (pagedInMessages) {
                 set.addAll(pagedInMessages.values());
             }
-            List <MessageReference>list = new ArrayList<MessageReference>(set);
+            List<MessageReference> list = new ArrayList<MessageReference>(set);
             for (MessageReference ref : list) {
                 IndirectMessageReference r = (IndirectMessageReference) ref;
                 if (filter.evaluate(context, r)) {
 
                     removeMessage(context, r);
                     set.remove(r);
-                    if (++movedCounter >= maximumMessages
-                            && maximumMessages > 0) {
+                    if (++movedCounter >= maximumMessages && maximumMessages > 0) {
                         return movedCounter;
                     }
                 }
@@ -976,24 +950,23 @@
         int count = 0;
         Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
         do {
-            int oldMaxSize=getMaxPageSize();
+            int oldMaxSize = getMaxPageSize();
             setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
             doPageIn(true);
             setMaxPageSize(oldMaxSize);
             synchronized (pagedInMessages) {
                 set.addAll(pagedInMessages.values());
             }
-            List <MessageReference>list = new ArrayList<MessageReference>(set);
+            List<MessageReference> list = new ArrayList<MessageReference>(set);
             for (MessageReference ref : list) {
                 IndirectMessageReference r = (IndirectMessageReference) ref;
                 if (filter.evaluate(context, r)) {
-                    
-                    r.incrementReferenceCount();                    
+
+                    r.incrementReferenceCount();
                     try {
                         Message m = r.getMessage();
                         BrokerSupport.resend(context, m, dest);
-                        if (++movedCounter >= maximumMessages
-                                && maximumMessages > 0) {
+                        if (++movedCounter >= maximumMessages && maximumMessages > 0) {
                             return movedCounter;
                         }
                     } finally {
@@ -1005,15 +978,16 @@
         } while (count < this.destinationStatistics.getMessages().getCount());
         return movedCounter;
     }
-    
+
     /**
      * Move a message
+     * 
      * @param context connection context
      * @param m message
      * @param dest ActiveMQDestination
      * @throws Exception
      */
-    public boolean moveMessageTo(ConnectionContext context,Message m,ActiveMQDestination dest) throws Exception {
+    public boolean moveMessageTo(ConnectionContext context, Message m, ActiveMQDestination dest) throws Exception {
         QueueMessageReference r = createMessageReference(m);
         BrokerSupport.resend(context, m, dest);
         removeMessage(context, r);
@@ -1036,7 +1010,7 @@
      * @return the number of messages removed
      */
     public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception {
-        return moveMatchingMessagesTo(context, selector, dest,Integer.MAX_VALUE);
+        return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
     }
 
     /**
@@ -1051,9 +1025,7 @@
      * Moves the messages matching the given filter up to the maximum number of
      * matched messages
      */
-    public int moveMatchingMessagesTo(ConnectionContext context,
-            MessageReferenceFilter filter, ActiveMQDestination dest,
-            int maximumMessages) throws Exception {
+    public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
         int movedCounter = 0;
         Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
         do {
@@ -1068,20 +1040,18 @@
                     // We should only move messages that can be locked.
                     moveMessageTo(context, ref.getMessage(), dest);
                     set.remove(r);
-                    if (++movedCounter >= maximumMessages
-                            && maximumMessages > 0) {
+                    if (++movedCounter >= maximumMessages && maximumMessages > 0) {
                         return movedCounter;
                     }
                 }
             }
-        } while (set.size() < this.destinationStatistics.getMessages().getCount()
-                && set.size() < maximumMessages);
+        } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
         return movedCounter;
     }
-    
+
     BrowserDispatch getNextBrowserDispatch() {
         synchronized (pagedInMessages) {
-            if( browserDispatches.isEmpty() ) {
+            if (browserDispatches.isEmpty()) {
                 return null;
             }
             return browserDispatches.removeFirst();
@@ -1094,93 +1064,93 @@
      * @see org.apache.activemq.thread.Task#iterate()
      */
     public boolean iterate() {
-        boolean pageInMoreMessages = false;   
-        synchronized(iteratingMutex) {
-            
+        boolean pageInMoreMessages = false;
+        synchronized (iteratingMutex) {
+
             // do early to allow dispatch of these waiting messages
-            synchronized(messagesWaitingForSpace) {
+            synchronized (messagesWaitingForSpace) {
                 while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
                     Runnable op = messagesWaitingForSpace.removeFirst();
                     op.run();
                 }
             }
-            
+
             BrowserDispatch rd;
-	        while ((rd = getNextBrowserDispatch()) != null) {
-	            pageInMoreMessages = true;
-	            
-	            try {
-	                MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
-	                msgContext.setDestination(destination);
-	    
-	                QueueBrowserSubscription browser = rd.getBrowser();
-	                for (QueueMessageReference node : rd.messages) {
-	                    if (!node.isAcked()) {
-	                        msgContext.setMessageReference(node);
-	                        if (browser.matches(node, msgContext)) {
-	                            browser.add(node);
-	                        }
-	                    }
-	                }
-	                                    
+            while ((rd = getNextBrowserDispatch()) != null) {
+                pageInMoreMessages = true;
+
+                try {
+                    MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
+                    msgContext.setDestination(destination);
+
+                    QueueBrowserSubscription browser = rd.getBrowser();
+                    for (QueueMessageReference node : rd.messages) {
+                        if (!node.isAcked()) {
+                            msgContext.setMessageReference(node);
+                            if (browser.matches(node, msgContext)) {
+                                browser.add(node);
+                            }
+                        }
+                    }
+
                     rd.done();
 
-	            } catch (Exception e) {
-	                LOG.warn("exception on dispatch to browser: " + rd.getBrowser(), e);
-	            }
-	        }
-	        
-	        if (firstConsumer) {
-	        	firstConsumer = false;
-	        	try {
-	        		if (consumersBeforeDispatchStarts > 0) {
-	        			int timeout = 1000; // wait one second by default if consumer count isn't reached  
-	        			if (timeBeforeDispatchStarts > 0) {
-	        				timeout = timeBeforeDispatchStarts;
-	        			}
-	        			if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
-	        				if (LOG.isDebugEnabled()) {
-	        					LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch.");
-	        				}
-	        			} else {
-	        				if (LOG.isDebugEnabled()) {
-	        					LOG.debug(timeout + " ms elapsed and " +  consumers.size() + " consumers subscribed. Starting dispatch.");
-	        				}
-	        			}
-	        		}	        		
-	        		if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
-	        			iteratingMutex.wait(timeBeforeDispatchStarts);
-	        			if (LOG.isDebugEnabled()) {
-	        				LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch.");
-	        			}
-	        		}
-	        	} catch (Exception e) {
-	        		LOG.error(e);
-	        	}
-	        }
-	        
-	        synchronized (messages) {
-	            pageInMoreMessages |= !messages.isEmpty();
-	        }               
-	        
-	        // Kinda ugly.. but I think dispatchLock is the only mutex protecting the 
-	        // pagedInPendingDispatch variable. 	        
-	        synchronized(dispatchMutex) {
-	            pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
-	        } 
-	        
-	        // Perhaps we should page always into the pagedInPendingDispatch list if 
-	        // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
-	        // then we do a dispatch.
-	        if (pageInMoreMessages) {
-	            try {
-	               pageInMessages(false);
-	               
-	            } catch (Throwable e) {
-	                LOG.error("Failed to page in more queue messages ", e);
+                } catch (Exception e) {
+                    LOG.warn("exception on dispatch to browser: " + rd.getBrowser(), e);
+                }
+            }
+
+            if (firstConsumer) {
+                firstConsumer = false;
+                try {
+                    if (consumersBeforeDispatchStarts > 0) {
+                        int timeout = 1000; // wait one second by default if consumer count isn't reached  
+                        if (timeBeforeDispatchStarts > 0) {
+                            timeout = timeBeforeDispatchStarts;
+                        }
+                        if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch.");
+                            }
+                        } else {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(timeout + " ms elapsed and " + consumers.size() + " consumers subscribed. Starting dispatch.");
+                            }
+                        }
+                    }
+                    if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
+                        iteratingMutex.wait(timeBeforeDispatchStarts);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch.");
+                        }
+                    }
+                } catch (Exception e) {
+                    LOG.error(e);
+                }
+            }
+
+            synchronized (messages) {
+                pageInMoreMessages |= !messages.isEmpty();
+            }
+
+            // Kinda ugly.. but I think dispatchLock is the only mutex protecting the 
+            // pagedInPendingDispatch variable. 	        
+            synchronized (dispatchMutex) {
+                pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
+            }
+
+            // Perhaps we should page always into the pagedInPendingDispatch list if 
+            // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
+            // then we do a dispatch.
+            if (pageInMoreMessages) {
+                try {
+                    pageInMessages(false);
+
+                } catch (Throwable e) {
+                    LOG.error("Failed to page in more queue messages ", e);
                 }
-	        }        
-	        return pendingWakeups.decrementAndGet() > 0;
+            }
+            return pendingWakeups.decrementAndGet() > 0;
         }
     }
 
@@ -1189,8 +1159,9 @@
             public boolean evaluate(ConnectionContext context, MessageReference r) {
                 return messageId.equals(r.getMessageId().toString());
             }
+
             public String toString() {
-                return "MessageIdFilter: "+messageId;
+                return "MessageIdFilter: " + messageId;
             }
         };
     }
@@ -1214,22 +1185,22 @@
 
     protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
         removeMessage(c, null, r);
-        synchronized(dispatchMutex) {            
+        synchronized (dispatchMutex) {
             synchronized (pagedInPendingDispatch) {
                 pagedInPendingDispatch.remove(r);
             }
         }
     }
-    
-    protected void removeMessage(ConnectionContext c, Subscription subs,QueueMessageReference r) throws IOException {
+
+    protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException {
         MessageAck ack = new MessageAck();
         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
         ack.setDestination(destination);
         ack.setMessageID(r.getMessageId());
         removeMessage(c, subs, r, ack);
     }
-    
-    protected void removeMessage(ConnectionContext context,Subscription sub,final QueueMessageReference reference,MessageAck ack) throws IOException {
+
+    protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference, MessageAck ack) throws IOException {
         reference.setAcked(true);
         // This sends the ack the the journal..
         if (!ack.isInTransaction()) {
@@ -1241,13 +1212,13 @@
                 acknowledge(context, sub, ack, reference);
             } finally {
                 context.getTransaction().addSynchronization(new Synchronization() {
-                
+
                     public void afterCommit() throws Exception {
                         getDestinationStatistics().getDequeues().increment();
                         dropMessage(reference);
                         wakeup();
                     }
-                
+
                     public void afterRollback() throws Exception {
                         reference.setAcked(false);
                     }
@@ -1256,38 +1227,38 @@
         }
         if (ack.isPoisonAck()) {
             // message gone to DLQ, is ok to allow redelivery
-            synchronized(messages) {
+            synchronized (messages) {
                 messages.rollback(reference.getMessageId());
             }
         }
 
     }
-    
+
     private void dropMessage(QueueMessageReference reference) {
         reference.drop();
         destinationStatistics.getMessages().decrement();
-        synchronized(pagedInMessages) {
+        synchronized (pagedInMessages) {
             pagedInMessages.remove(reference.getMessageId());
         }
     }
-    
-    public void messageExpired(ConnectionContext context,MessageReference reference) {
-        messageExpired(context,null,reference);
+
+    public void messageExpired(ConnectionContext context, MessageReference reference) {
+        messageExpired(context, null, reference);
     }
-    
-    public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) {
+
+    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("message expired: " + reference);
         }
         broker.messageExpired(context, reference);
         destinationStatistics.getExpired().increment();
         try {
-            removeMessage(context,subs,(QueueMessageReference)reference);
+            removeMessage(context, subs, (QueueMessageReference) reference);
         } catch (IOException e) {
-            LOG.error("Failed to remove expired Message from the store ",e);
+            LOG.error("Failed to remove expired Message from the store ", e);
         }
     }
-    
+
     protected ConnectionContext createConnectionContext() {
         ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
         answer.setBroker(this.broker);
@@ -1298,17 +1269,18 @@
 
     final void sendMessage(final ConnectionContext context, Message msg) throws Exception {
         if (!msg.isPersistent() && messages.getSystemUsage() != null) {
-        	if (systemUsage.getTempUsage().isFull()) {
-                final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." +
-                        " See http://activemq.apache.org/producer-flow-control.html for more info";
-                LOG.info(logMessage);
+            if (systemUsage.getTempUsage().isFull()) {
+                final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
                 if (systemUsage.isSendFailIfNoSpace()) {
                     throw new javax.jms.ResourceAllocationException(logMessage);
                 }
+
+                waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
             }
-            messages.getSystemUsage().getTempUsage().waitForSpace();
+
         }
-        synchronized(messages) {
+        synchronized (messages) {
             messages.addMessageLast(msg);
         }
         destinationStatistics.getEnqueues().increment();
@@ -1321,7 +1293,7 @@
         }
         wakeup();
     }
-    
+
     public void wakeup() {
         if (optimizedDispatch || isSlave()) {
             iterate();
@@ -1334,12 +1306,12 @@
     private void asyncWakeup() {
         try {
             pendingWakeups.incrementAndGet();
-            this.taskRunner.wakeup();    
+            this.taskRunner.wakeup();
         } catch (InterruptedException e) {
             LOG.warn("Async task tunner failed to wakeup ", e);
         }
     }
-  
+
     private boolean isSlave() {
         return broker.getBrokerService().isSlave();
     }
@@ -1347,14 +1319,13 @@
     private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
         List<QueueMessageReference> result = null;
         List<QueueMessageReference> resultList = null;
-        synchronized(dispatchMutex) {
+        synchronized (dispatchMutex) {
             int toPageIn = Math.min(getMaxPageSize(), messages.size());
             if (LOG.isDebugEnabled()) {
-                LOG.debug(destination.getPhysicalName() + " toPageIn: "  + toPageIn + ", Inflight: "
-                        + destinationStatistics.getInflight().getCount()
-                        + ", pagedInMessages.size " + pagedInMessages.size());
+                LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: " + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
+                        + pagedInMessages.size());
             }
-           
+
             if (isLazyDispatch() && !force) {
                 // Only page in the minimum number of messages which can be dispatched immediately.
                 toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
@@ -1378,7 +1349,7 @@
                             } else {
                                 result.add(ref);
                                 count++;
-                            }   
+                            }
                         }
                     } finally {
                         messages.release();
@@ -1387,7 +1358,7 @@
                 // Only add new messages, not already pagedIn to avoid multiple dispatch attempts
                 synchronized (pagedInMessages) {
                     resultList = new ArrayList<QueueMessageReference>(result.size());
-                    for(QueueMessageReference ref : result) {
+                    for (QueueMessageReference ref : result) {
                         if (!pagedInMessages.containsKey(ref.getMessageId())) {
                             pagedInMessages.put(ref.getMessageId(), ref);
                             resultList.add(ref);
@@ -1404,8 +1375,8 @@
 
     private void doDispatch(List<QueueMessageReference> list) throws Exception {
         boolean doWakeUp = false;
-        synchronized(dispatchMutex) {
-       
+        synchronized (dispatchMutex) {
+
             synchronized (pagedInPendingDispatch) {
                 if (!pagedInPendingDispatch.isEmpty()) {
                     // Try to first dispatch anything that had not been
@@ -1427,20 +1398,20 @@
                     }
                 }
             }
-        } 
+        }
         if (doWakeUp) {
             // avoid lock order contention
             asyncWakeup();
         }
     }
-    
+
     /**
      * @return list of messages that could get dispatched to consumers if they
      *         were not full.
      */
     private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception {
         List<Subscription> consumers;
-        
+
         synchronized (this.consumers) {
             if (this.consumers.isEmpty() || isSlave()) {
                 // slave dispatch happens in processDispatchNotification
@@ -1451,15 +1422,15 @@
 
         List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
         Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
-        
+
         for (MessageReference node : list) {
             Subscription target = null;
-            int interestCount=0;
+            int interestCount = 0;
             for (Subscription s : consumers) {
-            	if (s instanceof QueueBrowserSubscription) {
-            		interestCount++;
-            		continue;
-            	}
+                if (s instanceof QueueBrowserSubscription) {
+                    interestCount++;
+                    continue;
+                }
                 if (dispatchSelector.canSelect(s, node)) {
                     if (!fullConsumers.contains(s)) {
                         if (!s.isFull()) {
@@ -1474,23 +1445,22 @@
                     }
                     interestCount++;
                 } else {
-                	// makes sure it gets dispatched again
-                	if (!node.isDropped() && !((QueueMessageReference)node).isAcked() && (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
-                		interestCount++;
-                	}
+                    // makes sure it gets dispatched again
+                    if (!node.isDropped() && !((QueueMessageReference) node).isAcked() && (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
+                        interestCount++;
+                    }
                 }
             }
-            
-            if ((target == null && interestCount>0) || consumers.size() == 0) {
+
+            if ((target == null && interestCount > 0) || consumers.size() == 0) {
                 // This means all subs were full or that there are no consumers...
-                rc.add((QueueMessageReference)node);
+                rc.add((QueueMessageReference) node);
             }
 
             // If it got dispatched, rotate the consumer list to get round robin distribution. 
-            if (target != null && !strictOrderDispatch && consumers.size() > 1 &&
-                    !dispatchSelector.isExclusiveConsumer(target)) {
+            if (target != null && !strictOrderDispatch && consumers.size() > 1 && !dispatchSelector.isExclusiveConsumer(target)) {
                 synchronized (this.consumers) {
-                    if( removeFromConsumerList(target) ) {
+                    if (removeFromConsumerList(target)) {
                         addToConsumerList(target);
                         consumers = new ArrayList<Subscription>(this.consumers);
                     }
@@ -1498,15 +1468,13 @@
             }
         }
 
-        
         return rc;
     }
 
-
     protected void pageInMessages(boolean force) throws Exception {
-            doDispatch(doPageIn(force));
+        doDispatch(doPageIn(force));
     }
-    
+
     private void addToConsumerList(Subscription sub) {
         if (useConsumerPriority) {
             consumers.add(sub);
@@ -1515,42 +1483,43 @@
             consumers.add(sub);
         }
     }
-    
+
     private boolean removeFromConsumerList(Subscription sub) {
         return consumers.remove(sub);
     }
-    
+
     private int getConsumerMessageCountBeforeFull() throws Exception {
         int total = 0;
         boolean zeroPrefetch = false;
         synchronized (consumers) {
             for (Subscription s : consumers) {
-            	zeroPrefetch |= s.getPrefetchSize() == 0;
-            	int countBeforeFull = s.countBeforeFull();
+                zeroPrefetch |= s.getPrefetchSize() == 0;
+                int countBeforeFull = s.countBeforeFull();
                 total += countBeforeFull;
             }
         }
-        if (total==0 && zeroPrefetch){
-        	total=1;
+        if (total == 0 && zeroPrefetch) {
+            total = 1;
         }
         return total;
     }
 
-    /* 
-     * In slave mode, dispatch is ignored till we get this notification as the dispatch
-     * process is non deterministic between master and slave.
-     * On a notification, the actual dispatch to the subscription (as chosen by the master) 
-     * is completed. 
-     * (non-Javadoc)
-     * @see org.apache.activemq.broker.region.BaseDestination#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
+    /*
+     * In slave mode, dispatch is ignored till we get this notification as the
+     * dispatch process is non deterministic between master and slave. On a
+     * notification, the actual dispatch to the subscription (as chosen by the
+     * master) is completed. (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
+     * (org.apache.activemq.command.MessageDispatchNotification)
      */
-    public void processDispatchNotification(
-            MessageDispatchNotification messageDispatchNotification) throws Exception {
+    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
         // do dispatch
         Subscription sub = getMatchingSubscription(messageDispatchNotification);
         if (sub != null) {
             MessageReference message = getMatchingMessage(messageDispatchNotification);
-            sub.add(message);   
+            sub.add(message);
             sub.processMessageDispatchNotification(messageDispatchNotification);
         }
     }
@@ -1558,25 +1527,25 @@
     private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification) throws Exception {
         QueueMessageReference message = null;
         MessageId messageId = messageDispatchNotification.getMessageId();
-        
-       synchronized(dispatchMutex) {
+
+        synchronized (dispatchMutex) {
             synchronized (pagedInPendingDispatch) {
-               for(QueueMessageReference ref : pagedInPendingDispatch) {
-                   if (messageId.equals(ref.getMessageId())) {
-                       message = ref;
-                       pagedInPendingDispatch.remove(ref);
-                       break;
-                   }
-               }
+                for (QueueMessageReference ref : pagedInPendingDispatch) {
+                    if (messageId.equals(ref.getMessageId())) {
+                        message = ref;
+                        pagedInPendingDispatch.remove(ref);
+                        break;
+                    }
+                }
             }
-    
+
             if (message == null) {
                 synchronized (pagedInMessages) {
                     message = pagedInMessages.get(messageId);
                 }
             }
-            
-            if (message == null) {            
+
+            if (message == null) {
                 synchronized (messages) {
                     try {
                         messages.setMaxBatchSize(getMaxPageSize());
@@ -1595,28 +1564,25 @@
                     }
                 }
             }
-            
+
             if (message == null) {
                 Message msg = loadMessage(messageId);
                 if (msg != null) {
                     message = this.createMessageReference(msg);
                 }
-            }          
-            
-        } 
+            }
+
+        }
         if (message == null) {
-            throw new JMSException(
-                    "Slave broker out of sync with master - Message: "
-                    + messageDispatchNotification.getMessageId()
-                    + " on " + messageDispatchNotification.getDestination()
-                    + " does not exist among pending(" + pagedInPendingDispatch.size() + ") for subscription: "
-                    + messageDispatchNotification.getConsumerId());
+            throw new JMSException("Slave broker out of sync with master - Message: " + messageDispatchNotification.getMessageId() + " on " + messageDispatchNotification.getDestination()
+                    + " does not exist among pending(" + pagedInPendingDispatch.size() + ") for subscription: " + messageDispatchNotification.getConsumerId());
         }
         return message;
     }
 
     /**
      * Find a consumer that matches the id in the message dispatch notification
+     * 
      * @param messageDispatchNotification
      * @return sub or null if the subscription has been removed before dispatch
      * @throws JMSException
@@ -1639,4 +1605,20 @@
             asyncWakeup();
         }
     }
+
+    private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException {
+        long start = System.currentTimeMillis();
+        long nextWarn = start + blockedProducerWarningInterval;
+        while (!usage.waitForSpace(1000)) {
+            if (context.getStopping().get()) {
+                throw new IOException("Connection closed, send aborted.");
+            }
+
+            long now = System.currentTimeMillis();
+            if (now >= nextWarn) {
+                LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
+                nextWarn = now + blockedProducerWarningInterval;
+            }
+        }
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=888227&r1=888226&r2=888227&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Tue Dec  8 02:27:30 2009
@@ -41,6 +41,7 @@
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.Usage;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -59,11 +60,11 @@
  * 
  * @version $Revision: 1.21 $
  */
-public class Topic  extends BaseDestination  implements Task{
+public class Topic extends BaseDestination implements Task {
     protected static final Log LOG = LogFactory.getLog(Topic.class);
     private final TopicMessageStore topicStore;
     protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
-    protected final Valve dispatchValve = new Valve(true);   
+    protected final Valve dispatchValve = new Valve(true);
     private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
@@ -71,24 +72,22 @@
     private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
         public void run() {
-                try {
-                    Topic.this.taskRunner.wakeup();
-                } catch (InterruptedException e) {
-                }
+            try {
+                Topic.this.taskRunner.wakeup();
+            } catch (InterruptedException e) {
+            }
         };
     };
-   
 
-    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats,
-                 TaskRunnerFactory taskFactory) throws Exception {
+    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
         super(brokerService, store, destination, parentStats);
-        this.topicStore=store;
+        this.topicStore = store;
         //set default subscription recovery policy
-        subscriptionRecoveryPolicy= new NoSubscriptionRecoveryPolicy();
+        subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
         this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
     }
-    
-    public void initialize() throws Exception{
+
+    public void initialize() throws Exception {
         super.initialize();
         if (store != null) {
             int messageCount = store.getMessageCount();
@@ -140,7 +139,7 @@
             }
         } else {
             sub.add(context, this);
-            DurableTopicSubscription dsub = (DurableTopicSubscription)sub;
+            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
             durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
         }
     }
@@ -171,7 +170,7 @@
         // we are recovering a subscription to avoid out of order messages.
         dispatchValve.turnOff();
         try {
-        	
+
             if (topicStore == null) {
                 return;
             }
@@ -195,21 +194,20 @@
                 }
             }
             // Do we need to create the subscription?
-            if(info==null){
-                info=new SubscriptionInfo();
+            if (info == null) {
+                info = new SubscriptionInfo();
                 info.setClientId(clientId);
                 info.setSelector(selector);
                 info.setSubscriptionName(subscriptionName);
-                info.setDestination(getActiveMQDestination()); 
+                info.setDestination(getActiveMQDestination());
                 // This destination is an actual destination id.
-                info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); 
+                info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
                 // This destination might be a pattern
                 synchronized (consumers) {
                     consumers.add(subscription);
-                    topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
+                    topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
                 }
             }
-            
 
             final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
             msgContext.setDestination(destination);
@@ -223,7 +221,7 @@
                                 subscription.add(message);
                             }
                         } catch (IOException e) {
-                           LOG.error("Failed to recover this message " + message);
+                            LOG.error("Failed to recover this message " + message);
                         }
                         return true;
                     }
@@ -235,7 +233,7 @@
                     public boolean hasSpace() {
                         return true;
                     }
-                    
+
                     public boolean isDuplicate(MessageId id) {
                         return false;
                     }
@@ -277,23 +275,24 @@
             return;
         }
 
-        if(memoryUsage.isFull()) {
+        if (memoryUsage.isFull()) {
             isFull(context, memoryUsage);
             fastProducer(context, producerInfo);
-            
+
             if (isProducerFlowControl() && context.isProducerFlowControl()) {
-                
-                if(warnOnProducerFlowControl) {
+
+                if (warnOnProducerFlowControl) {
                     warnOnProducerFlowControl = false;
-                    LOG.info("Usage Manager memory limit reached for " +getActiveMQDestination().getQualifiedName() + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." +
-                            " See http://activemq.apache.org/producer-flow-control.html for more info");
+                    LOG.info("Usage Manager memory limit reached for " + getActiveMQDestination().getQualifiedName()
+                            + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
+                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
                 }
-                
+
                 if (systemUsage.isSendFailIfNoSpace()) {
-                    throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " +getActiveMQDestination().getQualifiedName() + "." +
-                            " See http://activemq.apache.org/producer-flow-control.html for more info");
+                    throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+                            + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
                 }
-   
+
                 // We can avoid blocking due to low usage if the producer is sending
                 // a sync message or
                 // if it is using a producer window
@@ -301,9 +300,9 @@
                     synchronized (messagesWaitingForSpace) {
                         messagesWaitingForSpace.add(new Runnable() {
                             public void run() {
-                                
+
                                 try {
-    
+
                                     // While waiting for space to free up... the
                                     // message may have expired.
                                     if (message.isExpired()) {
@@ -312,7 +311,7 @@
                                     } else {
                                         doMessageSend(producerExchange, message);
                                     }
-    
+
                                     if (sendProducerAck) {
                                         ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
                                         context.getConnection().dispatchAsync(ack);
@@ -321,7 +320,7 @@
                                         response.setCorrelationId(message.getCommandId());
                                         context.getConnection().dispatchAsync(response);
                                     }
-    
+
                                 } catch (Exception e) {
                                     if (!sendProducerAck && !context.isInRecoveryMode()) {
                                         ExceptionResponse response = new ExceptionResponse(e);
@@ -329,10 +328,10 @@
                                         context.getConnection().dispatchAsync(response);
                                     }
                                 }
-                                
+
                             }
                         });
-    
+
                         // If the user manager is not full, then the task will not
                         // get called..
                         if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
@@ -342,24 +341,32 @@
                         context.setDontSendReponse(true);
                         return;
                     }
-    
+
                 } else {
-    
                     // Producer flow control cannot be used, so we have do the flow
                     // control at the broker
                     // by blocking this thread until there is space available.
-                    int count = 0;
-                    while (!memoryUsage.waitForSpace(1000)) {
-                        if (context.getStopping().get()) {
-                            throw new IOException("Connection closed, send aborted.");
-                        }
-                        if (count > 2 && context.isInTransaction()) {
-                            count =0;
-                            int size = context.getTransaction().size();
-                            LOG.warn("Waiting for space to send  transacted message - transaction elements = " + size + " need more space to commit. Message = " + message);
+                    
+                    if (memoryUsage.isFull()) {
+                        if (context.isInTransaction()) {
+
+                            int count = 0;
+                            while (!memoryUsage.waitForSpace(1000)) {
+                                if (context.getStopping().get()) {
+                                    throw new IOException("Connection closed, send aborted.");
+                                }
+                                if (count > 2 && context.isInTransaction()) {
+                                    count = 0;
+                                    int size = context.getTransaction().size();
+                                    LOG.warn("Waiting for space to send  transacted message - transaction elements = " + size + " need more space to commit. Message = " + message);
+                                }
+                            }
+                        } else {
+                            waitForSpace(context, memoryUsage, "Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+                                    + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
                         }
                     }
-    
+
                     // The usage manager could have delayed us by the time
                     // we unblock the message could have expired..
                     if (message.isExpired()) {
@@ -382,35 +389,28 @@
     }
 
     /**
-     * do send the message - this needs to be synchronized to ensure messages are stored AND dispatched in 
-     * the right order
+     * do send the message - this needs to be synchronized to ensure messages
+     * are stored AND dispatched in the right order
+     * 
      * @param producerExchange
      * @param message
      * @throws IOException
      * @throws Exception
      */
-    synchronized void doMessageSend(
-            final ProducerBrokerExchange producerExchange, final Message message)
-            throws IOException, Exception {
-        final ConnectionContext context = producerExchange
-                .getConnectionContext();
+    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
+        final ConnectionContext context = producerExchange.getConnectionContext();
         message.setRegionDestination(this);
         message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
 
-        if (topicStore != null && message.isPersistent()
-                && !canOptimizeOutPersistence()) {
+        if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
             if (systemUsage.getStoreUsage().isFull()) {
-                final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." +
-                        " See http://activemq.apache.org/producer-flow-control.html for more info";
-                LOG.info(logMessage);
+                final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
                 if (systemUsage.isSendFailIfNoSpace()) {
-            	    throw new javax.jms.ResourceAllocationException(logMessage);
-                }
-            }
-            while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
-                if (context.getStopping().get()) {
-                    throw new IOException("Connection closed, send aborted.");
+                    throw new javax.jms.ResourceAllocationException(logMessage);
                 }
+
+                waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
             }
             topicStore.addMessage(context, message);
         }
@@ -457,14 +457,13 @@
 
     public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException {
         if (topicStore != null && node.isPersistent()) {
-            DurableTopicSubscription dsub = (DurableTopicSubscription)sub;
+            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
             SubscriptionKey key = dsub.getSubscriptionKey();
             topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId());
         }
         messageConsumed(context, node);
     }
 
-    
     public void gc() {
     }
 
@@ -488,7 +487,7 @@
         if (memoryUsage != null) {
             memoryUsage.stop();
         }
-        if(this.topicStore != null) {
+        if (this.topicStore != null) {
             this.topicStore.stop();
         }
     }
@@ -510,7 +509,7 @@
                     public boolean hasSpace() {
                         return true;
                     }
-                    
+
                     public boolean isDuplicate(MessageId id) {
                         return false;
                     }
@@ -527,9 +526,9 @@
         }
         return result.toArray(new Message[result.size()]);
     }
-    
+
     public boolean iterate() {
-        synchronized(messagesWaitingForSpace) {
+        synchronized (messagesWaitingForSpace) {
             while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
                 Runnable op = messagesWaitingForSpace.removeFirst();
                 op.run();
@@ -538,12 +537,9 @@
         return false;
     }
 
-
     // Properties
     // -------------------------------------------------------------------------
 
-    
-
     public DispatchPolicy getDispatchPolicy() {
         return dispatchPolicy;
     }
@@ -560,17 +556,16 @@
         this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
     }
 
-    
     // Implementation methods
     // -------------------------------------------------------------------------
-    
+
     public final void wakeup() {
     }
-    
+
     protected void dispatch(final ConnectionContext context, Message message) throws Exception {
         destinationStatistics.getMessages().increment();
         destinationStatistics.getEnqueues().increment();
-        dispatchValve.increment();   
+        dispatchValve.increment();
         MessageEvaluationContext msgContext = null;
         try {
             if (!subscriptionRecoveryPolicy.add(context, message)) {
@@ -587,17 +582,17 @@
             msgContext.setMessageReference(message);
             if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
                 onMessageWithNoConsumers(context, message);
-            }  
-            
+            }
+
         } finally {
             dispatchValve.decrement();
-            if(msgContext != null) {
+            if (msgContext != null) {
                 msgContext.clear();
             }
         }
     }
-    
-    public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) {
+
+    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
         broker.messageExpired(context, reference);
         destinationStatistics.getMessages().decrement();
         destinationStatistics.getEnqueues().decrement();
@@ -609,9 +604,24 @@
         try {
             acknowledge(context, subs, ack, reference);
         } catch (IOException e) {
-            LOG.error("Failed to remove expired Message from the store ",e);
+            LOG.error("Failed to remove expired Message from the store ", e);
         }
     }
 
+    private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException {
+        long start = System.currentTimeMillis();
+        long nextWarn = start + blockedProducerWarningInterval;
+        while (!usage.waitForSpace(1000)) {
+            if (context.getStopping().get()) {
+                throw new IOException("Connection closed, send aborted.");
+            }
+
+            long now = System.currentTimeMillis();
+            if (now >= nextWarn) {
+                LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
+                nextWarn = now + blockedProducerWarningInterval;
+            }
+        }
+    }
 
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=888227&r1=888226&r2=888227&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Tue Dec  8 02:27:30 2009
@@ -60,6 +60,7 @@
     private int maxQueueAuditDepth=2048;
     private boolean enableAudit=true;
     private boolean producerFlowControl = true;
+    private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
     private boolean optimizedDispatch=false;
     private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
     private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE;
@@ -125,6 +126,7 @@
     
     public void baseConfiguration(BaseDestination destination) {
         destination.setProducerFlowControl(isProducerFlowControl());
+        destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
         destination.setEnableAudit(isEnableAudit());
         destination.setMaxAuditDepth(getMaxQueueAuditDepth());
         destination.setMaxProducersToAudit(getMaxProducersToAudit());
@@ -373,6 +375,27 @@
     }
 
     /**
+     * Set's the interval at which warnings about producers being blocked by
+     * resource usage will be triggered. Values of 0 or less will disable
+     * warnings
+     * 
+     * @param blockedProducerWarningInterval the interval at which warning about
+     *            blocked producers will be triggered.
+     */
+    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
+        this.blockedProducerWarningInterval = blockedProducerWarningInterval;
+    }
+
+    /**
+     * 
+     * @return the interval at which warning about blocked producers will be
+     *         triggered.
+     */
+    public long getBlockedProducerWarningInterval() {
+        return blockedProducerWarningInterval;
+    }
+    
+    /**
      * @return the maxProducersToAudit
      */
     public int getMaxProducersToAudit() {



Mime
View raw message