activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r559915 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Date Thu, 26 Jul 2007 18:07:23 GMT
Author: chirino
Date: Thu Jul 26 11:07:16 2007
New Revision: 559915

URL: http://svn.apache.org/viewvc?view=rev&rev=559915
Log:
- Made the dispatchValve handling a little safer.. it was previously possible that an exception
could cause the broker to miss turning it off before trying to turn it on.
- better producer flow control logic, was not working for sync send producers without a window

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=559915&r1=559914&r2=559915
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Jul 26 11:07:16 2007
@@ -19,10 +19,12 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -48,6 +50,7 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.kaha.Store;
@@ -217,28 +220,31 @@
 			// duplicates
             // etc.
             dispatchValve.turnOff();
-            msgContext.setDestination(destination);
-            synchronized(pagedInMessages){
-                // Add all the matching messages in the queue to the
-                // subscription.
-                for(Iterator i=pagedInMessages.iterator();i.hasNext();){
-                    QueueMessageReference node=(QueueMessageReference)i.next();
-                    if(node.isDropped()){
-                        continue;
-                    }
-                    try{
-                        msgContext.setMessageReference(node);
-                        if(sub.matches(node,msgContext)){
-                            sub.add(node);
-                        }
-                    }catch(IOException e){
-                        log.warn("Could not load message: "+e,e);
-                    }
-                }
+            try { 
+	            msgContext.setDestination(destination);
+	            synchronized(pagedInMessages){
+	                // Add all the matching messages in the queue to the
+	                // subscription.
+	                for(Iterator i=pagedInMessages.iterator();i.hasNext();){
+	                    QueueMessageReference node=(QueueMessageReference)i.next();
+	                    if(node.isDropped()){
+	                        continue;
+	                    }
+	                    try{
+	                        msgContext.setMessageReference(node);
+	                        if(sub.matches(node,msgContext)){
+	                            sub.add(node);
+	                        }
+	                    }catch(IOException e){
+	                        log.warn("Could not load message: "+e,e);
+	                    }
+	                }
+	            }
+            } finally {
+                dispatchValve.turnOn();
             }
         }finally{
             msgContext.clear();
-            dispatchValve.turnOn();
         }
     }
 
@@ -251,7 +257,6 @@
         // while
         // removing up a subscription.
         dispatchValve.turnOff();
-
         try {
 
             synchronized (consumers) {
@@ -353,10 +358,12 @@
     	final ConnectionContext context = producerExchange.getConnectionContext(); 
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
+    	
+    	final boolean sendProducerAck = ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize()
> 0 ) && !context.isInRecoveryMode();
         if(message.isExpired()){
             broker.messageExpired(context,message);
             destinationStatistics.getMessages().decrement();
-            if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize()
> 0 ) && !context.isInRecoveryMode() ) {
+            if( sendProducerAck ) {
         		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
 				context.getConnection().dispatchAsync(ack);	    	            	        		
             }
@@ -373,24 +380,28 @@
         		synchronized( messagesWaitingForSpace ) {
             		messagesWaitingForSpace.add(new Runnable() {
         				public void run() {
-        					
-        					// While waiting for space to free up... the message may have expired.
-        			        if(message.isExpired()){
-        			            broker.messageExpired(context,message);
-                                destinationStatistics.getMessages().decrement();
-        			            
-        			            if( !message.isResponseRequired() && !context.isInRecoveryMode()
) {
-        			        		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
-        							context.getConnection().dispatchAsync(ack);	    	            	        		
-        			            }
-        			            return;
-        			        }
-        					
-        					
+        					        			                    					
 	            	        try {							
-	            	        	doMessageSend(producerExchange, message);
+	        			        
+	        					// While waiting for space to free up... the message may have expired.
+	            	        	if(message.isExpired()) {
+	        			            broker.messageExpired(context,message);
+	                                destinationStatistics.getMessages().decrement();
+	        			        } else {
+	        			        	doMessageSend(producerExchange, message);
+	        			        }
+	        			        
+	            	            if( sendProducerAck ) {
+	            	        		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+	            					context.getConnection().dispatchAsync(ack);	    	            	        
	
+	            	            } else {
+	            	            	Response response = new Response();
+    				                response.setCorrelationId(message.getCommandId());
+    								context.getConnection().dispatchAsync(response);	    								
+	            	            }
+	            	            
 							} catch (Exception e) {
-	            	        	if( message.isResponseRequired() && !context.isInRecoveryMode()
) {
+	            	        	if( !sendProducerAck && !context.isInRecoveryMode() ) {
     				                ExceptionResponse response = new ExceptionResponse(e);
     				                response.setCorrelationId(message.getCommandId());
     								context.getConnection().dispatchAsync(response);	    								
@@ -428,6 +439,10 @@
         	}
         }
         doMessageSend(producerExchange, message);
+        if( sendProducerAck ) {
+    		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+			context.getConnection().dispatchAsync(ack);	    	            	        		
+        }
     }
 
 	void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
throws IOException, Exception {
@@ -436,10 +451,6 @@
         if(store!=null&&message.isPersistent()){
             store.addMessage(context,message);
         }
-        if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize()
> 0 ) && !context.isInRecoveryMode() ) {
-    		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
-			context.getConnection().dispatchAsync(ack);	    	            	        		
-        }
         if(context.isInTransaction()){
         	// If this is a transacted message.. increase the usage now so that a big TX does
not blow up
         	// our memory.  This increment is decremented once the tx finishes..
@@ -986,6 +997,7 @@
     }
     
     private List doPageIn(boolean force) throws Exception{
+    	        
         final int toPageIn=maximumPagedInMessages-pagedInMessages.size();
         List result=null;
         if((force||!consumers.isEmpty())&&toPageIn>0){
@@ -995,6 +1007,7 @@
                 int count=0;
                 result=new ArrayList(toPageIn);
                 synchronized(messages){
+                	
                     try{
                         messages.reset();
                         while(messages.hasNext()&&count<toPageIn){



Mime
View raw message