activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r518638 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/ProducerFlowControlTest.java
Date Thu, 15 Mar 2007 14:25:41 GMT
Author: chirino
Date: Thu Mar 15 07:25:40 2007
New Revision: 518638

URL: http://svn.apache.org/viewvc?view=rev&rev=518638
Log:
Added test case that makes use of producer window flow control.  So now even async sends can
be flow controled so that an individual publisher can be stopped without stopping the entire
connection. 

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=518638&r1=518637&r2=518638
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Mar 15 07:25:40 2007
@@ -364,24 +364,28 @@
 	            		synchronized( messagesWaitingForSpace ) {
 		            		messagesWaitingForSpace.add(new Runnable() {
 	            				public void run() {
+	            					
+	            					// While waiting for space to free up... the message may have expired.
+	            			        if(message.isExpired()){
+	            			            if (log.isDebugEnabled()) {
+	            			                log.debug("Expired message: " + message);
+	            			            }
+	            			            
+	            			            if( !message.isResponseRequired() ) {
+	            			        		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+	            							context.getConnection().dispatchAsync(ack);	    	            	      
 		
+	            			            }
+	            			            return;
+	            			        }
+	            					
+	            					
 	    	            	        try {							
 	    	            	        	doMessageSend(producerExchange, message);
-	    	            	        	if( message.isResponseRequired() ) {
-		    				                Response response = new Response();
-		    				                response.setCorrelationId(message.getCommandId());
-		    								context.getConnection().dispatchAsync(response);
-	    	            	        	} else {
-	    	            	        		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
-		    								context.getConnection().dispatchAsync(ack);	    	            	        		
-	    	            	        	}
 	    							} catch (Exception e) {
 	    	            	        	if( message.isResponseRequired() ) {
 		    				                ExceptionResponse response = new ExceptionResponse(e);
 		    				                response.setCorrelationId(message.getCommandId());
 		    								context.getConnection().dispatchAsync(response);	    								
-	    	            	        	} else {
-	    	            	        		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
-		    								context.getConnection().dispatchAsync(ack);	    	            	        		
 	    	            	        	}
 	    							}
 	            				}
@@ -391,8 +395,7 @@
 			            	if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)
) {
 			            		// so call it directly here.
 			            		sendMessagesWaitingForSpaceTask.run();
-			            	}
-			            	
+			            	}			            	
 		            		context.setDontSendReponse(true);
 		            		return;
 	            		}
@@ -412,10 +415,6 @@
 		                    if (log.isDebugEnabled()) {
 		                        log.debug("Expired message: " + message);
 		                    }
-		                    if( producerExchange.getProducerState().getInfo().getWindowSize() >
0 || !message.isResponseRequired() ) {
-		                		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
-		        				context.getConnection().dispatchAsync(ack);	    	            	        		
-		                    }
 		                    return;
 		                }
 	            	}
@@ -431,7 +430,10 @@
         if(store!=null&&message.isPersistent()){
             store.addMessage(context,message);
         }
-        
+        if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired()
) {
+    		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+			context.getConnection().dispatchAsync(ack);	    	            	        		
+        }
         if(context.isInTransaction()){
         	// If this is a transacted message.. increase the usage now so that a big TX does
not blow up
         	// our memory.  This increment is decremented once the tx finishes..
@@ -445,10 +447,6 @@
 	                        // TODO: remove message from store.
 	                        if (log.isDebugEnabled()) {
 	                            log.debug("Expired message: " + message);
-	                        }
-	                        if( producerExchange.getProducerState().getInfo().getWindowSize()
> 0 || !message.isResponseRequired() ) {
-	                    		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
-	            				context.getConnection().dispatchAsync(ack);	    	            	        	

 	                        }
 	                        return;
 	                    }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java?view=diff&rev=518638&r1=518637&r2=518638
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
Thu Mar 15 07:25:40 2007
@@ -30,6 +30,36 @@
 	private TransportConnector connector;
 	private ActiveMQConnection connection;
 
+    public void test2ndPubisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception
{
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
+        factory.setProducerWindowSize(1024*64);
+        connection = (ActiveMQConnection) factory.createConnection();
+        connections.add(connection);
+    	connection.start();
+
+    	Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+    	MessageConsumer consumer = session.createConsumer(queueB);
+
+    	// Test sending to Queue A
+    	// 1 few sends should not block until the producer window is used up. 
+    	fillQueue(queueA);
+
+    	// Test sending to Queue B it should not block since the connection should not be blocked.
+    	CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+    	assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
+    	
+    	TextMessage msg = (TextMessage) consumer.receive();
+    	assertEquals("Message 1", msg.getText());
+    	msg.acknowledge();
+    	
+    	pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
+    	assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
+    	
+    	msg = (TextMessage) consumer.receive();
+    	assertEquals("Message 2", msg.getText());
+    	msg.acknowledge();
+    }
+
     public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
         ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
         factory.setAlwaysSyncSend(true);



Mime
View raw message