activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dave cawthorn <dave.cawth...@empired.com>
Subject Re: AMQ 4.0.1 Memory Leak/Hang
Date Thu, 06 Jul 2006 10:39:47 GMT


Actually it didn't end up being the solution to the problem in my
application :(

However it did lead me to realise that the problem is related to synchronous
messages and specifically i think it may be the temporary queues used to do
the acknowledging.

I have hurriedly created a junit test case that causes my broker to grow in
size untill i get an error message on the client like this:

javax.jms.JMSException: The transport tcp://localhost/127.0.0.1:61616 of
type: org.apache.activemq.transport.tcp.TcpTransport is not running.
	at
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:57)
	at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1122)
	at
org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1200)
	at
org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:271)
	at
com.vieo.ccs.activemq.AMQSynchronousTest$1.run(AMQSynchronousTest.java:82)
Caused by: java.io.IOException: The transport
tcp://localhost/127.0.0.1:61616 of type:
org.apache.activemq.transport.tcp.TcpTransport is not running.
	at
org.apache.activemq.transport.TransportSupport.checkStarted(TransportSupport.java:108)
	at
org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:123)
	at
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:141)
	at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:78)
	at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:77)
	at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
	at
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:68)
	at
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:73)
	at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1112)

I was running the broker with its JVM memory size limited to 256M (even
though i saw it grow to 270??) to save a bit of time. When i ran it with
512M i once saw everything stop for a while then suddenly the brokers jvm
size dropped to abot 10M and i saw it log into the database again and then
everything started working again???

Here is the test case:



import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.MBeanServer;
import javax.management.ObjectName;


import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.*;
import org.apache.log4j.Logger;

import com.vieo.ccs.common.messages.TestMessage;
import com.vieo.ccs.common.queue.AMQQueueConsumer;
import com.vieo.ccs.common.queue.CommunicationException;

import junit.framework.TestCase;

public class AMQSynchronousTest extends TestCase {
	static Logger logger = Logger.getLogger("AMQStressTest");

	
	
 public void testAMQSendReceive(){	
	
	
	final String connectionString = "tcp://localhost:61616";
	 

    ActiveMQConnectionFactory amqFactory = null;
    
    Destination destination = null;
    long counter=0;
    
    try {
   	    	
    	// get amq connection factory
        amqFactory =  new ActiveMQConnectionFactory(connectionString);
    
        while(true){
            
        	counter++;
        	logger.info("iteration: "+counter);
        	
        	amqFactory =  new ActiveMQConnectionFactory(connectionString);
        	
        	ActiveMQConnection producerConnection ;
            
            
            Session producerSession ;
           
            MessageProducer producer;
        	
            
            producerConnection =
(ActiveMQConnection)amqFactory.createConnection();
	    	producerConnection.setCopyMessageOnSend(false);
	     	producerSession = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
	      	if (destination==null){
	    		destination =  producerSession.createQueue("MemoryLeakQueue");
	    	}
	      	producer = producerSession.createProducer(destination);
	      	producerConnection.start();
            
            
            Thread thread = new Thread() {
				@Override
				public void run() {
					try {
						ActiveMQConnectionFactory amqFactory2 =  new
ActiveMQConnectionFactory(connectionString);
						ActiveMQConnection consumerConnection =
(ActiveMQConnection)amqFactory2.createConnection();
				    	Session consumerSession = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
				    	
				    	// Not sure if this is part of the problem - normally a jndi lookup
				    	//ActiveMQQueue queue = new ActiveMQQueue("MemoryLeakQueue");
				    	Destination queue = consumerSession.createQueue("MemoryLeakQueue"); 
				    	
				    	
				    	MessageConsumer consumer = consumerSession.createConsumer(queue);    
				    	consumerConnection.start();
				
						TextMessage msg = (TextMessage)consumer.receive(0);
						
						assertTrue( msg.getText().equals("Test Message") );
						
												
						Destination tempQueue = (Destination)msg.getJMSReplyTo();
						MessageProducer prod = consumerSession.createProducer(tempQueue);
						
						TextMessage reply = consumerSession.createTextMessage("Reply");
						
						prod.send(reply);
						
						tempQueue = null;
						prod.close();
						prod=null;
						consumerSession.close();
						consumerSession=null;
						consumerConnection.close();
						consumerConnection=null;
						
						amqFactory2= null;
						
					} catch (JMSException e) {
						e.printStackTrace();
						fail();
					}

					
				}
			};
			thread.start();
	   
	      	TemporaryQueue tempQueue = producerSession.createTemporaryQueue();
			MessageConsumer tempConsumer = producerSession.createConsumer(tempQueue);
			
	        // Create and send the message
			
			TextMessage message= producerSession.createTextMessage("Test Message");
	        message.setJMSReplyTo(tempQueue);
	        producer.send(message);
	        
	        // Wait for the reply
	        Message reply = tempConsumer.receive(0);
	        // Delete the temporary consumer and queue
	        
	        try {
				thread.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
	        
	        
	        tempConsumer.close();
	        tempQueue.delete();
	    	
	    	producerConnection.stop();
	    	producer.close();
			producer = null;
			producerSession.close();
			producerSession = null;
			producerConnection.close();
			producerConnection = null;
			message = null;
			amqFactory = null;
			thread = null;
			
			System.gc();

        }
    	
    } catch (JMSException e) {
    	logger.error("JMS Exception occurred: " + e.getMessage());
    	fail();
    
	} catch (Exception e){
		e.printStackTrace();
		fail();
	}
	
 }
	
}


I was a bit rushed so there may be a stupid error in there that is causing
the broker error. 

ps I was running this on a windows machine - not sure if this is relevent
but i did notice a lot of connections in a TIME_WAIT state when i did a
netstat while the test was running.


-- 
View this message in context: http://www.nabble.com/AMQ-4.0.1-Memory-Leak-Hang-tf1889180.html#a5197284
Sent from the ActiveMQ - User forum at Nabble.com.


Mime
View raw message