activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r516475 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/ broker/region/ command/ memory/
Date Fri, 09 Mar 2007 17:29:32 GMT
Author: chirino
Date: Fri Mar  9 09:29:30 2007
New Revision: 516475

URL: http://svn.apache.org/viewvc?view=rev&rev=516475
Log:
Adding the bits need to do producer flow control with a window to the broker. Just implemented
on the Queue case for now.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
Fri Mar  9 09:29:30 2007
@@ -17,6 +17,9 @@
  */
 package org.apache.activemq;
 
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
@@ -36,9 +39,6 @@
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.util.IntrospectionSupport;
 
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
  * destination. A <CODE>MessageProducer</CODE> object is created by passing a
@@ -496,11 +496,7 @@
 			}
         }
         
-        int size = this.session.send(this, dest, message, deliveryMode, priority, timeToLive);
-
-        if( producerWindow!=null ) {
-			producerWindow.increaseUsage(size);
-        }
+        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow);
         
         stats.onMessage();            
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Fri
Mar  9 09:29:30 2007
@@ -17,32 +17,78 @@
  */
 package org.apache.activemq;
 
-import org.apache.activemq.command.*;
+import java.io.File;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
+
+import org.apache.activemq.blob.BlobTransferPolicy;
+import org.apache.activemq.blob.BlobUploader;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.management.JMSSessionStatsImpl;
 import org.apache.activemq.management.StatsCapable;
 import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.LongSequenceGenerator;
-import org.apache.activemq.blob.BlobUploader;
-import org.apache.activemq.blob.BlobTransferPolicy;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.jms.Message;
-import java.io.Serializable;
-import java.io.File;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * <P>
  * A <CODE>Session</CODE> object is a single-threaded context for producing
@@ -1546,11 +1592,13 @@
      *            message priority.
      * @param timeToLive -
      *            message expiration.
+     * @param producerWindow 
      * @throws JMSException
      */
-    protected int send(ActiveMQMessageProducer producer,
+    protected void send(ActiveMQMessageProducer producer,
 	        ActiveMQDestination destination,Message message,int deliveryMode,
-	        int priority,long timeToLive) throws JMSException{
+	        int priority,long timeToLive, UsageManager producerWindow) throws JMSException{
+    	
 		checkClosed();
 		if(destination.isTemporary()&&connection.isDeleted(destination)){
 			throw new JMSException("Cannot publish to a deleted Destination: "
@@ -1598,15 +1646,18 @@
 			}
 			if(!connection.isAlwaysSyncSend()&&(!msg.isPersistent()||connection.isUseAsyncSend()||txid!=null)){
                 this.connection.asyncSendPacket(msg);
+    			if( producerWindow!=null ) {
+    				// Since we defer lots of the marshaling till we hit the wire, this might not 
+    				// provide and accurate size.  We may change over to doing more aggressive marshaling,
+    				// to get more accurate sizes.. this is more important once users start using producer
window
+    				// flow control.			
+    				int size = msg.getSize();
+    				producerWindow.increaseUsage(size);
+    			}
             }else{
                 this.connection.syncSendPacket(msg);
             }
 
-			// Since we defer lots of the marshaling till we hit the wire, this might not 
-			// provide and accurate size.  We may change over to doing more aggressive marshaling,
-			// to get more accurate sizes.. this is more important once users start using producer
window
-			// flow control.
-			return msg.getSize();
 		}
 	}
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
Fri Mar  9 09:29:30 2007
@@ -58,6 +58,7 @@
     private boolean networkConnection;
     private final AtomicBoolean stopping = new AtomicBoolean();
     private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
+	private boolean dontSendReponse;
     
     public ConnectionContext() {
     }
@@ -258,6 +259,14 @@
 	
 	public AtomicBoolean getStopping() {
 		return stopping;
+	}
+
+	public void setDontSendReponse(boolean b) {
+		this.dontSendReponse=b;		
+	}
+
+	public boolean isDontSendReponse() {
+		return dontSendReponse;
 	}	
 	
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Mar  9 09:29:30 2007
@@ -124,6 +124,7 @@
     private final Map<ConsumerId,ConsumerBrokerExchange>consumerExchanges = new HashMap<ConsumerId,ConsumerBrokerExchange>();
     private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
     protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
+    private ConnectionContext context;
     private boolean networkConnection;
     private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
     
@@ -284,6 +285,16 @@
             }
             response.setCorrelationId(commandId);
         }
+        
+        // The context may have been flagged so that the response is not sent.
+        if( context!=null ) {
+        	if( context.isDontSendReponse() ) {
+        		context.setDontSendReponse(false);
+        		response=null;
+        	}
+            context=null;
+        }
+        
         return response;
     }
 
@@ -344,7 +355,7 @@
 
     synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception{
         ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        ConnectionContext context=null;
+        context=null;
         if(cs!=null){
             context=cs.getContext();
         }
@@ -365,7 +376,7 @@
 
     synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception{
         ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        ConnectionContext context=null;
+        context=null;
         if(cs!=null){
             context=cs.getContext();
         }
@@ -388,7 +399,7 @@
 
     synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws
Exception{
         ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        ConnectionContext context=null;
+        context=null;
         if(cs!=null){
             context=cs.getContext();
         }
@@ -399,7 +410,7 @@
 
     synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws
Exception{
         ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        ConnectionContext context=null;
+        context=null;
         if(cs!=null){
             context=cs.getContext();
         }
@@ -410,7 +421,7 @@
 
     synchronized public Response processRollbackTransaction(TransactionInfo info) throws
Exception{
         ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        ConnectionContext context=null;
+        context=null;
         if(cs!=null){
             context=cs.getContext();
         }
@@ -421,7 +432,7 @@
 
     synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception{
         ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        ConnectionContext context=null;
+        context=null;
         if(cs!=null){
             context=cs.getContext();
         }
@@ -431,7 +442,7 @@
 
     synchronized public Response processRecoverTransactions(TransactionInfo info) throws
Exception{
         ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        ConnectionContext context=null;
+        context=null;
         if(cs!=null){
             context=cs.getContext();
         }
@@ -626,7 +637,7 @@
         log.debug("Setting up new connection: "+this);
         // Setup the context.
         String clientId=info.getClientId();
-        ConnectionContext context=new ConnectionContext();
+        context=new ConnectionContext();
         context.setConnection(this);
         context.setBroker(broker);
         context.setConnector(connector);
@@ -1096,7 +1107,7 @@
             synchronized(producerExchanges){
                 result=new ProducerBrokerExchange();
                 ConnectionState state=lookupConnectionState(id);
-                ConnectionContext context=state.getContext();
+                context=state.getContext();
                 result.setConnectionContext(context);
                 SessionState ss=state.getSessionState(id.getParentId());
                 if(ss!=null){
@@ -1125,7 +1136,7 @@
             synchronized(consumerExchanges){
                 result=new ConsumerBrokerExchange();
                 ConnectionState state=lookupConnectionState(id);
-                ConnectionContext context=state.getContext();
+                context=state.getContext();
                 result.setConnectionContext(context);
                 SessionState ss=state.getSessionState(id.getParentId());
                 if(ss!=null){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Mar  9 09:29:30 2007
@@ -42,9 +42,12 @@
 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.kaha.Store;
@@ -78,7 +81,6 @@
     private final DestinationStatistics destinationStatistics = new DestinationStatistics();
     private  PendingMessageCursor messages;
     private final LinkedList pagedInMessages = new LinkedList();
-
     private LockOwner exclusiveOwner;
     private MessageGroupMap messageGroupOwners;
 
@@ -95,7 +97,7 @@
     private final Object doDispatchMutex = new Object();
     private TaskRunner taskRunner;
     private boolean started = false;
-
+    
     public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore
store, DestinationStatistics parentStats,
             TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
         this.destination = destination;
@@ -318,6 +320,23 @@
         }
 
     }
+    
+    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
+    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
+    	public void run() {
+    		
+    		// We may need to do this in async thread since this is run for within a synchronization
+    		// that the UsageManager is holding.
+    		
+    		synchronized( messagesWaitingForSpace ) {
+	    		while( !usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
+	    			Runnable op = messagesWaitingForSpace.removeFirst();
+	    			op.run();
+	    		}
+    		}
+    		
+    	};
+    };
 
     public void send(final ProducerBrokerExchange producerExchange,final Message message)
throws Exception {
     	final ConnectionContext context = producerExchange.getConnectionContext(); 
@@ -327,27 +346,88 @@
             if (log.isDebugEnabled()) {
                 log.debug("Expired message: " + message);
             }
+            if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired()
) {
+        		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+				context.getConnection().dispatchAsync(ack);	    	            	        		
+            }
             return;
         }
-        if (context.isProducerFlowControl() && !context.isNetworkConnection()) {
-            if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
-                throw new javax.jms.ResourceAllocationException("Usage Manager memory limit
reached");
-            }else{
-                while( !usageManager.waitForSpace(1000) ) {
-                    if( context.getStopping().get() )
-                        throw new IOException("Connection closed, send aborted.");
-                }
-                // The usage manager could have delayed us by the time
-                // we unblock the message could have expired..
-                if(message.isExpired()){
-                    if (log.isDebugEnabled()) {
-                        log.debug("Expired message: " + message);
-                    }
-                    return;
-                }
-            }
+        if ( context.isProducerFlowControl() ) {
+        	if( usageManager.isFull() ) {
+	            if(usageManager.isSendFailIfNoSpace()){
+	                throw new javax.jms.ResourceAllocationException("Usage Manager memory limit
reached");
+	            }else{
+	            	
+	            	// We can avoid blocking due to low usage if the producer is sending a sync
message or
+	            	// if it is using a producer window
+	            	if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 ||
message.isResponseRequired() ) {
+	            		synchronized( messagesWaitingForSpace ) {
+		            		messagesWaitingForSpace.add(new Runnable() {
+	            				public void run() {
+	    	            	        try {							
+	    	            	        	doMessageSend(producerExchange, message);
+	    	            	        	if( message.isResponseRequired() ) {
+		    				                Response response = new Response();
+		    				                response.setCorrelationId(message.getCommandId());
+		    								context.getConnection().dispatchAsync(response);
+	    	            	        	} else {
+	    	            	        		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+		    								context.getConnection().dispatchAsync(ack);	    	            	        		
+	    	            	        	}
+	    							} catch (Exception e) {
+	    	            	        	if( message.isResponseRequired() ) {
+		    				                ExceptionResponse response = new ExceptionResponse(e);
+		    				                response.setCorrelationId(message.getCommandId());
+		    								context.getConnection().dispatchAsync(response);	    								
+	    	            	        	} else {
+	    	            	        		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+		    								context.getConnection().dispatchAsync(ack);	    	            	        		
+	    	            	        	}
+	    							}
+	            				}
+	            			});
+		            		
+		            		// If the user manager is not full, then the task will not get called..
+			            	if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)
) {
+			            		// so call it directly here.
+			            		sendMessagesWaitingForSpaceTask.run();
+			            	}
+			            	
+		            		context.setDontSendReponse(true);
+		            		return;
+	            		}
+	            		
+	            	} else {
+	            		
+	            		// Producer flow control cannot be used, so we have do the flow control at
the broker 
+	            		// by blocking this thread until there is space available.	            		
+		                while( !usageManager.waitForSpace(1000) ) {
+		                    if( context.getStopping().get() )
+		                        throw new IOException("Connection closed, send aborted.");
+		                }
+		                
+		                // The usage manager could have delayed us by the time
+		                // we unblock the message could have expired..
+		                if(message.isExpired()){
+		                    if (log.isDebugEnabled()) {
+		                        log.debug("Expired message: " + message);
+		                    }
+		                    if( producerExchange.getProducerState().getInfo().getWindowSize() >
0 || !message.isResponseRequired() ) {
+		                		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+		        				context.getConnection().dispatchAsync(ack);	    	            	        		
+		                    }
+		                    return;
+		                }
+	            	}
+	            }
+        	}
         }
-        message.setRegionDestination(this);
+        doMessageSend(producerExchange, message);
+    }
+
+	private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message
message) throws IOException, Exception {
+		final ConnectionContext context = producerExchange.getConnectionContext();
+		message.setRegionDestination(this);
         if(store!=null&&message.isPersistent()){
             store.addMessage(context,message);
         }
@@ -361,12 +441,16 @@
                         messages.addMessageLast(message);
                     }
                     // It could take while before we receive the commit
-                    // operration.. by that time the message could have expired..
+                    // op, by that time the message could have expired..
                     if(message.isExpired()){
                         // TODO: remove message from store.
                         if (log.isDebugEnabled()) {
                             log.debug("Expired message: " + message);
                         }
+                        if( producerExchange.getProducerState().getInfo().getWindowSize()
> 0 || !message.isResponseRequired() ) {
+                    		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+            				context.getConnection().dispatchAsync(ack);	    	            	        		
+                        }
                         return;
                     }
                     sendMessage(context,message);
@@ -379,9 +463,7 @@
             sendMessage(context,message);
             
         }
-    }
-       
-    
+	}    
 
     public void dispose(ConnectionContext context) throws IOException {
         if (store != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
Fri Mar  9 09:29:30 2007
@@ -38,6 +38,11 @@
     public ProducerAck() {
     }
     
+    public ProducerAck(ProducerId producerId, int size) {
+    	this.producerId = producerId;
+    	this.size = size;
+    }
+    
     public void copy(ProducerAck copy) {
         super.copy(copy);
         copy.producerId = producerId;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
Fri Mar  9 09:29:30 2007
@@ -18,14 +18,14 @@
 package org.apache.activemq.memory;
 
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.activemq.Service;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.concurrent.CopyOnWriteArrayList;
-
 
 /**
  * Used to keep track of how much of something is being used so that 
@@ -60,6 +60,7 @@
     private String name = "";
     private float usagePortion = 1.0f;
     private List<UsageManager> children = new CopyOnWriteArrayList<UsageManager>();
+    private final LinkedList<Runnable> callbacks = new LinkedList<Runnable>();
 
     public UsageManager() {
         this(null,"default");
@@ -292,6 +293,11 @@
         if(oldPercentUsage>=100&&newPercentUsage<100){
             synchronized(usageMutex){
                 usageMutex.notifyAll();
+                for (Iterator iter = callbacks.iterator(); iter.hasNext();) {
+					Runnable callback = (Runnable) iter.next();
+					callback.run();
+				}
+                callbacks.clear();
             }
         }
         // Let the listeners know
@@ -331,4 +337,37 @@
     private void removeChild(UsageManager child){
         children.remove(child);
     }
+    
+    /**
+     * @param callback
+     * @return true if the UsageManager was full.  The callback will only be called if this
method returns true.
+     */
+    public boolean notifyCallbackWhenNotFull( final Runnable callback ) {
+        
+    	if(parent!=null) {
+    		Runnable r = new Runnable(){
+				public void run() {
+			        synchronized (usageMutex) {
+			            if( percentUsage >= 100 ) {
+			            	callbacks.add(callback);
+			            } else {
+			            	callback.run();
+			            }
+			        }
+				}
+            };
+    		if( parent.notifyCallbackWhenNotFull(r) ) {
+    			return true;
+    		}
+    	}
+        synchronized (usageMutex) {
+            if( percentUsage >= 100 ) {
+            	callbacks.add(callback);
+            	return true;
+            } else {
+            	return false;
+            }
+        }
+    }
+
 }



Mime
View raw message