activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r519233 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: Queue.java Topic.java
Date Sat, 17 Mar 2007 04:04:44 GMT
Author: chirino
Date: Fri Mar 16 21:04:43 2007
New Revision: 519233

URL: http://svn.apache.org/viewvc?view=rev&rev=519233
Log:
Implemented the per producer flow control on the Topic case too.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=519233&r1=519232&r2=519233
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Mar 16 21:04:43 2007
@@ -352,73 +352,70 @@
             }
             return;
         }
-        if ( context.isProducerFlowControl() ) {
-        	if( usageManager.isFull() ) {
-	            if(usageManager.isSendFailIfNoSpace()){
-	                throw new javax.jms.ResourceAllocationException("Usage Manager memory limit
reached");
-	            }else{
-	            	
-	            	// We can avoid blocking due to low usage if the producer is sending a sync
message or
-	            	// if it is using a producer window
-	            	if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 ||
message.isResponseRequired() ) {
-	            		synchronized( messagesWaitingForSpace ) {
-		            		messagesWaitingForSpace.add(new Runnable() {
-	            				public void run() {
-	            					
-	            					// While waiting for space to free up... the message may have expired.
-	            			        if(message.isExpired()){
-	            			            if (log.isDebugEnabled()) {
-	            			                log.debug("Expired message: " + message);
-	            			            }
-	            			            
-	            			            if( !message.isResponseRequired() ) {
-	            			        		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
-	            							context.getConnection().dispatchAsync(ack);	    	            	      
 		
-	            			            }
-	            			            return;
-	            			        }
-	            					
-	            					
-	    	            	        try {							
-	    	            	        	doMessageSend(producerExchange, message);
-	    							} catch (Exception e) {
-	    	            	        	if( message.isResponseRequired() ) {
-		    				                ExceptionResponse response = new ExceptionResponse(e);
-		    				                response.setCorrelationId(message.getCommandId());
-		    								context.getConnection().dispatchAsync(response);	    								
-	    	            	        	}
-	    							}
-	            				}
-	            			});
-		            		
-		            		// If the user manager is not full, then the task will not get called..
-			            	if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)
) {
-			            		// so call it directly here.
-			            		sendMessagesWaitingForSpaceTask.run();
-			            	}			            	
-		            		context.setDontSendReponse(true);
-		            		return;
-	            		}
-	            		
-	            	} else {
-	            		
-	            		// Producer flow control cannot be used, so we have do the flow control at
the broker 
-	            		// by blocking this thread until there is space available.	            		
-		                while( !usageManager.waitForSpace(1000) ) {
-		                    if( context.getStopping().get() )
-		                        throw new IOException("Connection closed, send aborted.");
-		                }
-		                
-		                // The usage manager could have delayed us by the time
-		                // we unblock the message could have expired..
-		                if(message.isExpired()){
-		                    if (log.isDebugEnabled()) {
-		                        log.debug("Expired message: " + message);
-		                    }
-		                    return;
-		                }
-	            	}
-	            }
+        if ( context.isProducerFlowControl() && usageManager.isFull() ) {
+            if(usageManager.isSendFailIfNoSpace()){
+                throw new javax.jms.ResourceAllocationException("Usage Manager memory limit
reached");
+            } 
+            	
+        	// We can avoid blocking due to low usage if the producer is sending a sync message
or
+        	// if it is using a producer window
+        	if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || message.isResponseRequired()
) {
+        		synchronized( messagesWaitingForSpace ) {
+            		messagesWaitingForSpace.add(new Runnable() {
+        				public void run() {
+        					
+        					// While waiting for space to free up... the message may have expired.
+        			        if(message.isExpired()){
+        			            if (log.isDebugEnabled()) {
+        			                log.debug("Expired message: " + message);
+        			            }
+        			            
+        			            if( !message.isResponseRequired() ) {
+        			        		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+        							context.getConnection().dispatchAsync(ack);	    	            	        		
+        			            }
+        			            return;
+        			        }
+        					
+        					
+	            	        try {							
+	            	        	doMessageSend(producerExchange, message);
+							} catch (Exception e) {
+	            	        	if( message.isResponseRequired() ) {
+    				                ExceptionResponse response = new ExceptionResponse(e);
+    				                response.setCorrelationId(message.getCommandId());
+    								context.getConnection().dispatchAsync(response);	    								
+	            	        	}
+							}
+        				}
+        			});
+            		
+            		// If the user manager is not full, then the task will not get called..
+	            	if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)
) {
+	            		// so call it directly here.
+	            		sendMessagesWaitingForSpaceTask.run();
+	            	}			            	
+            		context.setDontSendReponse(true);
+            		return;
+        		}
+        		
+        	} else {
+        		
+        		// Producer flow control cannot be used, so we have do the flow control at the
broker 
+        		// by blocking this thread until there is space available.	            		
+                while( !usageManager.waitForSpace(1000) ) {
+                    if( context.getStopping().get() )
+                        throw new IOException("Connection closed, send aborted.");
+                }
+                
+                // The usage manager could have delayed us by the time
+                // we unblock the message could have expired..
+                if(message.isExpired()){
+                    if (log.isDebugEnabled()) {
+                        log.debug("Expired message: " + message);
+                    }
+                    return;
+                }
         	}
         }
         doMessageSend(producerExchange, message);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=519233&r1=519232&r2=519233
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Fri Mar 16 21:04:43 2007
@@ -18,6 +18,7 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -33,9 +34,11 @@
 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.memory.UsageManager;
@@ -235,6 +238,22 @@
     }
     
 
+    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
+    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
+    	public void run() {
+    		
+    		// We may need to do this in async thread since this is run for within a synchronization
+    		// that the UsageManager is holding.
+    		
+    		synchronized( messagesWaitingForSpace ) {
+	    		while( !usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
+	    			Runnable op = messagesWaitingForSpace.removeFirst();
+	    			op.run();
+	    		}
+    		}
+    		
+    	};
+    };
 
     public void send(final ProducerBrokerExchange producerExchange, final Message message)
throws Exception {
     	final ConnectionContext context = producerExchange.getConnectionContext();
@@ -242,27 +261,89 @@
     	// There is delay between the client sending it and it arriving at the
     	// destination.. it may have expired.
     	if( message.isExpired() ) {
+            if (log.isDebugEnabled()) {
+                log.debug("Expired message: " + message);
+            }
+            if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired()
) {
+        		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+				context.getConnection().dispatchAsync(ack);	    	            	        		
+            }
     		return;
     	}
-    	if (context.isProducerFlowControl()  && !context.isNetworkConnection() ) {
-            if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
+    	
+        if ( context.isProducerFlowControl() && usageManager.isFull() ) {
+            if(usageManager.isSendFailIfNoSpace()){
                 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit
reached");
-            } else {
+            } 
+            	
+        	// We can avoid blocking due to low usage if the producer is sending a sync message
or
+        	// if it is using a producer window
+        	if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || message.isResponseRequired()
) {
+        		synchronized( messagesWaitingForSpace ) {
+            		messagesWaitingForSpace.add(new Runnable() {
+        				public void run() {
+        					
+        					// While waiting for space to free up... the message may have expired.
+        			        if(message.isExpired()){
+        			            if (log.isDebugEnabled()) {
+        			                log.debug("Expired message: " + message);
+        			            }
+        			            
+        			            if( !message.isResponseRequired() ) {
+        			        		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+        							context.getConnection().dispatchAsync(ack);	    	            	        		
+        			            }
+        			            return;
+        			        }
+        					
+        					
+	            	        try {							
+	            	        	doMessageSend(producerExchange, message);
+							} catch (Exception e) {
+	            	        	if( message.isResponseRequired() ) {
+    				                ExceptionResponse response = new ExceptionResponse(e);
+    				                response.setCorrelationId(message.getCommandId());
+    								context.getConnection().dispatchAsync(response);	    								
+	            	        	}
+							}
+        				}
+        			});
+            		
+            		// If the user manager is not full, then the task will not get called..
+	            	if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)
) {
+	            		// so call it directly here.
+	            		sendMessagesWaitingForSpaceTask.run();
+	            	}			            	
+            		context.setDontSendReponse(true);
+            		return;
+        		}
+        		
+        	} else {
+        		
+        		// Producer flow control cannot be used, so we have do the flow control at the
broker 
+        		// by blocking this thread until there is space available.	            		
                 while( !usageManager.waitForSpace(1000) ) {
                     if( context.getStopping().get() )
                         throw new IOException("Connection closed, send aborted.");
-                }            	
-                usageManager.waitForSpace();
+                }
                 
                 // The usage manager could have delayed us by the time
                 // we unblock the message could have expired..
-            	if( message.isExpired() ) {
-            		return;
-            	}
-            }    
+                if(message.isExpired()){
+                    if (log.isDebugEnabled()) {
+                        log.debug("Expired message: " + message);
+                    }
+                    return;
+                }
+        	}
         }
 
-        message.setRegionDestination(this);
+        doMessageSend(producerExchange, message);
+    }
+
+	private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message
message) throws IOException, Exception {
+		final ConnectionContext context = producerExchange.getConnectionContext();
+		message.setRegionDestination(this);
 
         if (store != null && message.isPersistent() && !canOptimizeOutPersistence()
)
             store.addMessage(context, message);
@@ -292,8 +373,7 @@
         finally {
             message.decrementReferenceCount();
         }
-
-    }
+	}
 
     private boolean canOptimizeOutPersistence() {
         return durableSubcribers.size()==0;



Mime
View raw message