activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r511078 - in /activemq/branches/activemq-4.1: ./ activemq-core/src/main/java/org/apache/activemq/ activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/main/...
Date Fri, 23 Feb 2007 20:22:25 GMT
Author: chirino
Date: Fri Feb 23 12:22:24 2007
New Revision: 511078

URL: http://svn.apache.org/viewvc?view=rev&rev=511078
Log:
 r234@34:  chirino | 2007-02-23 14:47:41 -0500
 When a message send blocks on a destination level usage manager, it blocks all publishers
on the same connection even publishers that are publishing to destinations who's limits have
not been reached.  In some scenarios, this can result in a deadlock since it prevents publishing
to a destination that could otherwise receive messages.
 
 This patch delays sending the repsone to sync publishers until the destination usage allows
the message to be sent but does not block on the send.  This allows other producers on the
same connection to get serviced but flow controls the producers on full destinations by delaying
the send response.
 
 In order to take advantage of this new producer flow control which avoid the described deadlock,
sync sends must be used.  To force sync sends for all send requests, a new 'useSyncSend' option
should be set to true on the ActiveMQConnectionFactory.  
 
 Hopefully a future version this patch will be developed that provides the same feaure but
works with async sends and a producer ack to flow control the producer.
 
 

Added:
    activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
    activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
Modified:
    activemq/branches/activemq-4.1/   (props changed)
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java

Propchange: activemq/branches/activemq-4.1/
------------------------------------------------------------------------------
    svk:merge = 635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:234

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?view=diff&rev=511078&r1=511077&r2=511078
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Fri Feb 23 12:22:24 2007
@@ -128,6 +128,7 @@
     private boolean optimizeAcknowledge = false;
     private boolean nestedMapAndListEnabled = true;
     private boolean useRetroactiveConsumer;
+    private boolean useSyncSend=false;
     private int closeTimeout = 15000;
     
     private final Transport transport;
@@ -1903,5 +1904,11 @@
     }
 
 
+	public boolean isUseSyncSend() {
+		return useSyncSend;
+	}
+	public void setUseSyncSend(boolean forceSyncSend) {
+		this.useSyncSend = forceSyncSend;
+	}
     
 }

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?view=diff&rev=511078&r1=511077&r2=511078
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Fri Feb 23 12:22:24 2007
@@ -85,6 +85,8 @@
     private int closeTimeout = 15000;
     private boolean useRetroactiveConsumer;
     private boolean nestedMapAndListEnabled = true;
+    private boolean useSyncSend=false;
+
     JMSStatsImpl factoryStats = new JMSStatsImpl();
 
     static protected final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5,
new ThreadFactory() {
@@ -256,6 +258,7 @@
             connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
             connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
             connection.setRedeliveryPolicy(getRedeliveryPolicy());
+            connection.setUseSyncSend(isUseSyncSend());
 
             transport.start();
 
@@ -507,10 +510,13 @@
             props.setProperty("password", getPassword());
         }
 
+                
+        props.setProperty("useSyncSend", Boolean.toString(isUseSyncSend()));
         props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
         props.setProperty("useCompression", Boolean.toString(isUseCompression()));
         props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
-
+        
+        
         if (getUserName() != null) {
             props.setProperty("userName", getUserName());
         }
@@ -678,4 +684,12 @@
     public void setStatsEnabled(boolean statsEnabled){
         this.factoryStats.setEnabled(statsEnabled);
     }
+
+	public boolean isUseSyncSend() {
+		return useSyncSend;
+	}
+
+	public void setUseSyncSend(boolean forceSyncSend) {
+		this.useSyncSend = forceSyncSend;
+	}
 }

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=511078&r1=511077&r2=511078
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Fri Feb 23 12:22:24 2007
@@ -1544,7 +1544,7 @@
             log.debug("Sending message: " + msg);
         }
 
-        if(!msg.isPersistent() || connection.isUseAsyncSend() || txid!=null) {
+        if( !connection.isUseSyncSend() && ( !msg.isPersistent() || connection.isUseAsyncSend()
|| txid!=null) ) {
             this.connection.asyncSendPacket(msg);
         } else {
             this.connection.syncSendPacket(msg);

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=511078&r1=511077&r2=511078
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
Fri Feb 23 12:22:24 2007
@@ -54,6 +54,7 @@
     private boolean producerFlowControl=true;
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
     private AtomicInteger referenceCounter = new AtomicInteger();
+    private boolean dontSendReponse;
     
     private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
     
@@ -244,6 +245,14 @@
 	
 	public int decrementReference() {
 		return referenceCounter.decrementAndGet();
+	}
+
+	public boolean isDontSendReponse() {
+		return dontSendReponse;
+	}
+
+	public void setDontSendReponse(boolean dontSendReponse) {
+		this.dontSendReponse = dontSendReponse;
 	}
 
 }

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=511078&r1=511077&r2=511078
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Feb 23 12:22:24 2007
@@ -124,6 +124,7 @@
     protected final AtomicBoolean disposed=new AtomicBoolean(false);
     private CountDownLatch stopLatch = new CountDownLatch(1);
     protected final AtomicBoolean asyncException = new AtomicBoolean(false);
+    private ConnectionContext context;
     
     static class ConnectionState extends org.apache.activemq.state.ConnectionState {
         private final ConnectionContext context;
@@ -299,6 +300,18 @@
                 response = new Response();                
             }
             response.setCorrelationId(commandId);
+            if( context!=null && context.isDontSendReponse() ) {
+                // No need to send back a response at this time.
+            } else {
+                if( response == null ) {
+                    response = new Response();
+                }
+                response.setCorrelationId(commandId);
+            }
+            if( context!=null ) {
+                context.setDontSendReponse(false);
+                context=null;
+            }
         }
         return response;
         
@@ -461,7 +474,7 @@
         
         ProducerId producerId = messageSend.getProducerId();
         ConnectionState state = lookupConnectionState(producerId);
-        ConnectionContext context = state.getContext();
+        context = state.getContext();
         
         // If the message originates from this client connection, 
         // then, finde the associated producer state so we can do some dup detection.
@@ -671,7 +684,7 @@
     	
         // Setup the context.
         String clientId = info.getClientId();
-        ConnectionContext context = new ConnectionContext();
+        context = new ConnectionContext();
         context.setConnection(this);
         context.setBroker(broker);
         context.setConnector(connector);

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=511078&r1=511077&r2=511078
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Feb 23 12:22:24 2007
@@ -33,10 +33,12 @@
 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.selector.SelectorParser;
@@ -51,6 +53,12 @@
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -267,19 +275,56 @@
         }
 
     }
+    
+    static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 10, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue()); 
 
     public void send(final ConnectionContext context, final Message message) throws Exception
{
 
         if (context.isProducerFlowControl()) {
-            if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
-                throw new javax.jms.ResourceAllocationException("Usage Manager memory limit
reached");
-            }
-            else {
-                usageManager.waitForSpace();
-            }
+            if( message.isResponseRequired() ) {
+            	if( usageManager.isFull() ) {
+//            		System.out.println("Registering callback...");
+	            	Runnable callback = new Runnable() {
+	            		public void run() {
+//                    		System.out.println("Callback triggering async thread..");
+                    		threadPool.execute(new Runnable() {
+	            				public void run() {
+	    	            	        try {							
+//	    	                    		System.out.println("Async thread start..");
+	    	            	        	sendMessage(context, message);
+	    				                Response response = new Response();
+	    				                response.setCorrelationId(message.getCommandId());
+	    								context.getConnection().dispatchAsync(response);							
+	    							} catch (Exception e) {
+	    				                ExceptionResponse response = new ExceptionResponse(e);
+	    				                response.setCorrelationId(message.getCommandId());
+	    								context.getConnection().dispatchAsync(response);
+	    							} finally {
+//	    	                    		System.out.println("Async thread end..");
+	    							}
+	            				}
+	            			});
+	            		}
+	            	};
+	            	if( usageManager.notifyCallbackWhenNotFull(callback) ) {
+	            		context.setDontSendReponse(true);
+	            		return;
+	            	}
+            	}
+            } else {
+                if (usageManager.isSendFailIfNoSpace() ) {
+                    throw new javax.jms.ResourceAllocationException("Usage Manager memory
limit reached");
+                } else {
+                    usageManager.waitForSpace();
+                }
+            }        	
         }
 
-        message.setRegionDestination(this);
+        sendMessage(context, message);
+    }
+
+	private void sendMessage(final ConnectionContext context, final Message message) throws
IOException, Exception {
+		message.setRegionDestination(this);
 
         if (store != null && message.isPersistent())
             store.addMessage(context, message);
@@ -301,7 +346,7 @@
         finally {
             node.decrementReferenceCount();
         }
-    }
+	}
 
     public void dispose(ConnectionContext context) throws IOException {
         if (store != null) {

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java?view=diff&rev=511078&r1=511077&r2=511078
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
Fri Feb 23 12:22:24 2007
@@ -18,6 +18,7 @@
 package org.apache.activemq.memory;
 
 import java.util.Iterator;
+import java.util.LinkedList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,6 +50,7 @@
     private final Object usageMutex = new Object();
     
     private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();
+    private final LinkedList callbacks = new LinkedList();
     
     private boolean sendFailIfNoSpace;
 
@@ -92,6 +94,38 @@
             }
         }
     }
+
+    /**
+     * @param callback
+     * @return true if the UsageManager was full.  The callback will only be called if this
method returns true.
+     */
+    public boolean notifyCallbackWhenNotFull( final Runnable callback ) {
+        
+    	if(parent!=null) {
+    		Runnable r = new Runnable(){
+				public void run() {
+			        synchronized (usageMutex) {
+			            if( percentUsage >= 100 ) {
+			            	callbacks.add(callback);
+			            } else {
+			            	callback.run();
+			            }
+			        }
+				}
+            };
+    		if( parent.notifyCallbackWhenNotFull(r) ) {
+    			return true;
+    		}
+    	}
+        synchronized (usageMutex) {
+            if( percentUsage >= 100 ) {
+            	callbacks.add(callback);
+            	return true;
+            } else {
+            	return false;
+            }
+        }
+    }
     
     /**
      * Increases the usage by the value amount.  
@@ -247,6 +281,11 @@
         if( oldPercentUsage >= 100 && newPercentUsage < 100 ) {
             synchronized (usageMutex) {
                 usageMutex.notifyAll();
+                for (Iterator iter = callbacks.iterator(); iter.hasNext();) {
+					Runnable callback = (Runnable) iter.next();
+					callback.run();
+				}
+                callbacks.clear();
             }            
         }
         

Added: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java?view=auto&rev=511078
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
(added)
+++ activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
Fri Feb 23 12:22:24 2007
@@ -0,0 +1,439 @@
+package org.apache.activemq;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Random;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
+import edu.emory.mathcs.backport.java.util.concurrent.Executors;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+public class AMQDeadlockTest3 extends TestCase {
+
+	private static final String URL1 = "tcp://localhost:61616";
+
+	private static final String URL2 = "tcp://localhost:61617";
+
+	private static final String QUEUE1_NAME = "test.queue.1";
+
+	private static final String QUEUE2_NAME = "test.queue.2";
+
+	private static final int MAX_CONSUMERS = 1;
+
+	private static final int MAX_PRODUCERS = 1;
+
+	private static final int NUM_MESSAGE_TO_SEND = 10;
+
+	private AtomicInteger messageCount = new AtomicInteger();
+	private CountDownLatch doneLatch;
+
+	public void setUp() throws Exception {
+	}
+
+	public void tearDown() throws Exception {
+	}
+
+	// This should fail with incubator-activemq-fuse-4.1.0.5
+	public void testQueueLimitsWithOneBrokerSameConnection() throws Exception {
+
+		BrokerService brokerService1 = null;
+		ActiveMQConnectionFactory acf = null;
+		PooledConnectionFactory pcf = null;
+		DefaultMessageListenerContainer container1 = null;
+
+		try {
+			brokerService1 = createBrokerService("broker1", URL1, null);
+			brokerService1.start();
+
+			acf = createConnectionFactory(URL1);
+			pcf = new PooledConnectionFactory(acf);
+
+			// Only listen on the first queue.. let the 2nd queue fill up.
+			doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND);
+			container1 = createDefaultMessageListenerContainer(acf,	new TestMessageListener1(500),
QUEUE1_NAME);
+			container1.afterPropertiesSet();
+
+			Thread.sleep(2000);
+
+			final ExecutorService executor = Executors.newCachedThreadPool();
+			for (int i = 0; i < MAX_PRODUCERS; i++) {
+				executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
+				Thread.sleep(1000);
+				executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
+			}
+			
+			// Wait for all message to arrive.
+			assertTrue(doneLatch.await(20, TimeUnit.SECONDS));			
+			executor.shutdownNow();
+
+			Assert.assertEquals(NUM_MESSAGE_TO_SEND, messageCount.get());
+
+		} finally {
+
+			container1.stop();
+			container1.destroy();
+			container1 = null;
+			brokerService1.stop();
+			brokerService1 = null;
+
+		}
+
+	}
+	
+
+
+	
+	// This should fail with incubator-activemq-fuse-4.1.0.5
+	public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithOneConnectionForProducing()
+			throws Exception {
+
+		BrokerService brokerService1 = null;
+		BrokerService brokerService2 = null;
+		ActiveMQConnectionFactory acf1 = null;
+		ActiveMQConnectionFactory acf2 = null;
+		PooledConnectionFactory pcf = null;
+		DefaultMessageListenerContainer container1 = null;
+
+		try {
+			brokerService1 = createBrokerService("broker1", URL1, URL2);
+			brokerService1.start();
+			brokerService2 = createBrokerService("broker2", URL2, URL1);
+			brokerService2.start();
+
+			acf1 = createConnectionFactory(URL1);
+			acf2 = createConnectionFactory(URL2);
+
+			pcf = new PooledConnectionFactory(acf1);
+
+			Thread.sleep(1000);
+
+			doneLatch = new CountDownLatch(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND);
+			container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500),
QUEUE1_NAME);
+			container1.afterPropertiesSet();
+
+			final ExecutorService executor = Executors.newCachedThreadPool();
+			for (int i = 0; i < MAX_PRODUCERS; i++) {
+				executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
+				Thread.sleep(1000);
+				executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
+			}
+
+			assertTrue(doneLatch.await(20, TimeUnit.SECONDS));			
+			executor.shutdownNow();
+
+			Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND,
+					messageCount.get());
+		} finally {
+
+			container1.stop();
+			container1.destroy();
+			container1 = null;
+
+			brokerService1.stop();
+			brokerService1 = null;
+			brokerService2.stop();
+			brokerService2 = null;
+		}
+	}
+	
+	
+	// This should fail with incubator-activemq-fuse-4.1.0.5
+	public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithSeperateConnectionsForProducing()
+			throws Exception {
+
+		BrokerService brokerService1 = null;
+		BrokerService brokerService2 = null;
+		ActiveMQConnectionFactory acf1 = null;
+		ActiveMQConnectionFactory acf2 = null;
+		DefaultMessageListenerContainer container1 = null;
+		DefaultMessageListenerContainer container2 = null;
+		
+		try {
+			brokerService1 = createBrokerService("broker1", URL1, URL2);
+			brokerService1.start();
+			brokerService2 = createBrokerService("broker2", URL2, URL1);
+			brokerService2.start();
+
+			acf1 = createConnectionFactory(URL1);
+			acf2 = createConnectionFactory(URL2);
+
+			Thread.sleep(1000);
+
+			doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND*MAX_PRODUCERS);
+
+			container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500),
QUEUE1_NAME);
+			container1.afterPropertiesSet();
+			container2 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(30000),
QUEUE2_NAME);
+			container2.afterPropertiesSet();
+
+			final ExecutorService executor = Executors.newCachedThreadPool();
+			for (int i = 0; i < MAX_PRODUCERS; i++) {
+				executor.submit(new NonPooledProducerTask(acf1, QUEUE2_NAME));
+				Thread.sleep(1000);
+				executor.submit(new NonPooledProducerTask(acf1, QUEUE1_NAME));
+			}
+
+			assertTrue(doneLatch.await(20, TimeUnit.SECONDS));			
+			executor.shutdownNow();
+
+			Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
+		} finally {
+
+			container1.stop();
+			container1.destroy();
+			container1 = null;
+			
+			container2.stop();
+			container2.destroy();
+			container2 = null;
+
+			brokerService1.stop();
+			brokerService1 = null;
+			brokerService2.stop();
+			brokerService2 = null;
+		}
+	}
+
+
+
+
+	private BrokerService createBrokerService(final String brokerName,
+			final String uri1, final String uri2) throws Exception {
+		final BrokerService brokerService = new BrokerService();
+
+		brokerService.setBrokerName(brokerName);
+		brokerService.setPersistent(false);
+		brokerService.setUseJmx(true);
+
+		final UsageManager memoryManager = new UsageManager();
+		memoryManager.setLimit(5000000);
+		brokerService.setMemoryManager(memoryManager);
+
+		final ArrayList policyEntries = new ArrayList();
+
+		final PolicyEntry entry = new PolicyEntry();
+		entry.setQueue(">");
+		// entry.setQueue(QUEUE1_NAME);
+		entry.setMemoryLimit(1000);
+		policyEntries.add(entry);
+
+		final PolicyMap policyMap = new PolicyMap();
+		policyMap.setPolicyEntries(policyEntries);
+		brokerService.setDestinationPolicy(policyMap);
+
+		final TransportConnector tConnector = new TransportConnector();
+		tConnector.setUri(new URI(uri1));
+		tConnector.setBrokerName(brokerName);
+		tConnector.setName(brokerName + ".transportConnector");
+		brokerService.addConnector(tConnector);
+
+		if (uri2 != null) {
+			final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2));
+			nc.setBridgeTempDestinations(true);
+			nc.setBrokerName(brokerName);
+			nc.setName(brokerName + ".nc");
+			brokerService.addNetworkConnector(nc);
+		}
+
+		return brokerService;
+
+	}
+
+	public DefaultMessageListenerContainer createDefaultMessageListenerContainer(
+			final ConnectionFactory acf, final MessageListener listener,
+			final String queue) {
+		final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
+		container.setConnectionFactory(acf);
+		container.setDestinationName(queue);
+		container.setMessageListener(listener);
+		container.setSessionTransacted(false);
+		container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+		container.setConcurrentConsumers(MAX_CONSUMERS);
+		return container;
+	}
+
+	public ActiveMQConnectionFactory createConnectionFactory(final String url) {
+		final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
+		acf.setCopyMessageOnSend(false);
+		acf.setUseAsyncSend(false);
+		acf.setDispatchAsync(true);
+		acf.setUseCompression(false);
+		acf.setOptimizeAcknowledge(false);
+		acf.setOptimizedMessageDispatch(true);
+		acf.setUseSyncSend(true);
+		return acf;
+	}
+
+	private class TestMessageListener1 implements MessageListener {
+
+		private final long waitTime;
+
+		public TestMessageListener1(long waitTime) {
+			this.waitTime = waitTime;
+		
+		}
+
+		public void onMessage(Message msg) {
+
+			try {
+				System.out.println("Listener1 Consumed message "+ msg.getIntProperty("count"));
+
+				messageCount.incrementAndGet();
+				doneLatch.countDown();
+				
+				Thread.sleep(waitTime);
+			} catch (JMSException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			} catch (InterruptedException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+
+		}
+	}
+
+
+	private class PooledProducerTask implements Runnable {
+
+		private final String queueName;
+
+		private final PooledConnectionFactory pcf;
+
+		public PooledProducerTask(final PooledConnectionFactory pcf,
+				final String queueName) {
+			this.pcf = pcf;
+			this.queueName = queueName;
+		}
+
+		public void run() {
+
+			try {
+
+				final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
+				jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+				jmsTemplate.setExplicitQosEnabled(true);
+				jmsTemplate.setMessageIdEnabled(false);
+				jmsTemplate.setMessageTimestampEnabled(false);
+				jmsTemplate.afterPropertiesSet();
+
+				final byte[] bytes = new byte[2048];
+				final Random r = new Random();
+				r.nextBytes(bytes);
+
+				Thread.sleep(2000);
+
+				final AtomicInteger count = new AtomicInteger();
+				for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+					jmsTemplate.send(queueName, new MessageCreator() {
+
+						public Message createMessage(Session session)
+								throws JMSException {
+
+							final BytesMessage message = session.createBytesMessage();
+
+							message.writeBytes(bytes);
+							message.setIntProperty("count", count.incrementAndGet());
+							message.setStringProperty("producer", "pooled");
+							return message;
+						}
+					});
+
+					System.out.println("PooledProducer sent message: "+ count.get());
+					// Thread.sleep(1000);
+				}
+
+			} catch (final Throwable e) {
+				System.err.println("Producer 1 is exiting.");
+				e.printStackTrace();
+			}
+		}
+	}
+	
+	
+	private class NonPooledProducerTask implements Runnable {
+
+		private final String queueName;
+
+		private final ConnectionFactory cf;
+
+		public NonPooledProducerTask(final ConnectionFactory cf,
+				final String queueName) {
+			this.cf = cf;
+			this.queueName = queueName;
+		}
+
+		public void run() {
+
+			try {
+
+				final JmsTemplate jmsTemplate = new JmsTemplate(cf);
+				jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+				jmsTemplate.setExplicitQosEnabled(true);
+				jmsTemplate.setMessageIdEnabled(false);
+				jmsTemplate.setMessageTimestampEnabled(false);
+				jmsTemplate.afterPropertiesSet();
+
+				final byte[] bytes = new byte[2048];
+				final Random r = new Random();
+				r.nextBytes(bytes);
+
+				Thread.sleep(2000);
+
+				final AtomicInteger count = new AtomicInteger();
+				for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+					jmsTemplate.send(queueName, new MessageCreator() {
+
+						public Message createMessage(Session session)
+								throws JMSException {
+
+							final BytesMessage message = session
+									.createBytesMessage();
+
+							message.writeBytes(bytes);
+							message.setIntProperty("count", count
+									.incrementAndGet());
+							message.setStringProperty("producer", "non-pooled");
+							return message;
+						}
+					});
+
+					System.out.println("Non-PooledProducer sent message: " + count.get());
+
+					// Thread.sleep(1000);
+				}
+
+			} catch (final Throwable e) {
+				System.err.println("Producer 1 is exiting.");
+				e.printStackTrace();
+			}
+		}
+	}
+
+}

Added: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java?view=auto&rev=511078
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
(added)
+++ activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
Fri Feb 23 12:22:24 2007
@@ -0,0 +1,161 @@
+package org.apache.activemq;
+
+import java.io.IOException;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.tcp.TcpTransport;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+public class ProducerFlowControlTest extends JmsTestSupport {
+	
+	ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
+	ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
+	private TransportConnector connector;
+	private ActiveMQConnection connection;
+
+    public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
+        factory.setUseSyncSend(true);
+        connection = (ActiveMQConnection) factory.createConnection();
+        connections.add(connection);
+    	connection.start();
+
+    	// Test sending to Queue A
+    	// 1st send should not block.
+    	fillQueue(queueA);
+    	
+    	Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+    	MessageConsumer consumer = session.createConsumer(queueB);
+
+    	// Test sending to Queue B it should block. 
+    	// Since even though  the it's queue limits have not been reached, the connection
+    	// is blocked.
+    	CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+    	assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
+    	
+    	TextMessage msg = (TextMessage) consumer.receive();
+    	assertEquals("Message 1", msg.getText());
+    	msg.acknowledge();
+    	
+    	pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
+    	assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
+    	
+    	msg = (TextMessage) consumer.receive();
+    	assertEquals("Message 2", msg.getText());
+    	msg.acknowledge();
+    }
+
+    public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
+        ConnectionFactory factory = createConnectionFactory();
+        connection = (ActiveMQConnection) factory.createConnection();
+        connections.add(connection);
+    	connection.start();
+
+    	// Test sending to Queue A
+    	// 1st send should not block.
+    	fillQueue(queueA);
+
+    	// Test sending to Queue B it should block. 
+    	// Since even though  the it's queue limits have not been reached, the connection
+    	// is blocked.
+    	CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+    	assertFalse( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );    	
+    }
+
+
+	private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException
{
+		final AtomicBoolean done = new AtomicBoolean(true);
+		final AtomicBoolean keepGoing = new AtomicBoolean(true);
+		
+		// Starts an async thread that every time it publishes it sets the done flag to false.
+		// Once the send starts to block it will not reset the done flag anymore.
+		new Thread("Fill thread.") {
+			public void run() {
+				Session session=null;
+		    	try {
+					session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+					MessageProducer producer = session.createProducer(queue);
+					producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+					while( keepGoing.get() ) {
+						done.set(false);
+						producer.send(session.createTextMessage("Hello World"));						
+					}
+				} catch (JMSException e) {
+				} finally {
+					safeClose(session);
+				}
+			}
+		}.start();
+		
+		while( true ) {
+			Thread.sleep(1000);
+			// the producer is blocked once the done flag stays true.
+			if( done.get() )
+				break;
+			done.set(true);
+		}		
+		keepGoing.set(false);
+	}
+
+	private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws
JMSException {
+		final CountDownLatch done = new CountDownLatch(1);
+		new Thread("Send thread.") {
+			public void run() {
+				Session session=null;
+		    	try {
+					session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+					MessageProducer producer = session.createProducer(queue);
+					producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+					producer.send(session.createTextMessage(message));
+					done.countDown();
+				} catch (JMSException e) {
+				} finally {
+					safeClose(session);
+				}
+			}
+		}.start();    	
+		return done;
+	}
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setPersistent(false);
+        service.setUseJmx(false);
+        
+        // Setup a destination policy where it takes only 1 message at a time.
+        PolicyMap policyMap = new PolicyMap();        
+        PolicyEntry policy = new PolicyEntry();
+        policy.setMemoryLimit(1);        
+        policyMap.setDefaultEntry(policy);        
+        service.setDestinationPolicy(policyMap);
+        
+        connector = service.addConnector("tcp://localhost:0");        
+        return service;
+    }
+    
+    protected void tearDown() throws Exception {
+    	TcpTransport t = (TcpTransport) connection.getTransport().narrow(TcpTransport.class);
+    	t.getTransportListener().onException(new IOException("Disposed."));
+    	connection.getTransport().stop();
+    	super.tearDown();
+    }
+    
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(connector.getConnectUri());
+    }
+}



Mime
View raw message