activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpont <mpont...@gmail.com>
Subject Memory Leak Using Temporary Queues
Date Wed, 20 Jun 2007 17:22:23 GMT

Hello All,

I am grappling with what appears to be memory leaks while creating and
destroying Temporary Queues.

Should an application that is consuming X bytes of memory be able to create
new Temporary Queues that consume Z additional bytes (total application
memory consumption is now X+Z bytes) and then destroy/close the Temporary
Queues to return the application memory consumption back to X bytes?

It seems like Temporary Queues should behave this way, but they don't seem
to.

Here is a simple test rig that exhibits the apparent memory leak.  On my
machine it produces an OutOfMemoryError in two to three minutes (ActiveMQ
4.1.1/Sun 1.6 JVM/WinXP).  Am I doing something wrong?

public class TestActiveMQ {
  public static void main(String[] args) throws Exception {
    BrokerService brokerService = new BrokerService();
    brokerService.setUseJmx(false);
    brokerService.setPersistent(false);
    brokerService.addConnector(ActiveMQConnection.DEFAULT_BROKER_URL);
    brokerService.start();
    
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
        ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
          ActiveMQConnection.DEFAULT_BROKER_URL);
    
    Connection producerConnection = factory.createConnection();
    Session producerSession = producerConnection.createSession(false,
        Session.AUTO_ACKNOWLEDGE);
    MessageProducer producer = producerSession.createProducer(null);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    producerConnection.start();
    
    for (int j = 0; j < 1000; j++) {
      ArrayList<MyConsumerInfo> consumers = new ArrayList<MyConsumerInfo>();
      
      // Create 50 consumers
      for (int i = 0; i < 50; i++) {
        System.out.println(">>> Creating Consumer");
        
        Connection connection = factory.createConnection();
        Session consumerSession = connection.createSession(false,
            Session.AUTO_ACKNOWLEDGE);
        TemporaryQueue consumerQueue =
consumerSession.createTemporaryQueue();
        MessageConsumer consumer =
            consumerSession.createConsumer(consumerQueue);
        
        MyMessageReceiver receiver = new MyMessageReceiver();
        consumer.setMessageListener(receiver);
        
        MyConsumerInfo consumerInfo = new MyConsumerInfo();
        consumerInfo.receiver = receiver;
        consumerInfo.connection = connection;
        consumerInfo.session = consumerSession;
        consumerInfo.queue = consumerQueue;
        consumerInfo.consumer = consumer;
        
        connection.start();
        
        consumers.add(consumerInfo);
      }
      
      // Send message to each consumer created above
      for (MyConsumerInfo consumerInfo : consumers) {
        System.out.println("--> Sending Message");
        
        TextMessage message = producerSession.createTextMessage("Hello");
        producer.send(consumerInfo.queue, message);
        
        while (!consumerInfo.receiver.done)
          Thread.sleep(500);
      }
      
      Thread.sleep(5000);
      
      // Destroy all consumers
      for (MyConsumerInfo consumerInfo : consumers) {
        System.out.println("< Closing Consumer");
        
        consumerInfo.consumer.close();
        consumerInfo.session.close();
        consumerInfo.connection.stop();
        consumerInfo.connection.close();
      }
      
      consumers.clear();
      consumers = null;
      
      System.gc();
      
      Thread.sleep(5000);
    }
    
    producer.close();
    producerSession.close();
    producerConnection.stop();
    producerConnection.close();
    
    Thread.sleep(60 * 60 * 1000);
  }

  private static class MyConsumerInfo {
    public MyMessageReceiver receiver;
    public Session session;
    public Connection connection;
    public Queue queue;
    public MessageConsumer consumer;
    public ResponseMessageMonitor monitor;
  }
  
  private static class MyMessageReceiver implements MessageListener {
    public boolean done;
    public void onMessage(Message message) {
      done = true;
    }
  }
}

Things I have tried:
  * One Connection for all Consumers.  This made things worse because
     apparently Temporary Queue resources are tied to their parent
Connection.
  * One Factory per Connection
  * One Producer per Consumer
  * No Producers or Messages
  * No Message Prefetch
  * Adjusting the Heap size up and down
  * Adjusting the MemoryManager limit
  * Adjusting the Connection inactivity timeouts
  * Persistence with Derby - made things worse
  * Explicitly deleting the Temporary Queues via ActiveMQTempQueue.delete
and
     ActiveMQConnection.deleteTempDestination

Any ideas are greatly appreciated,
/Matt

-- 
View this message in context: http://www.nabble.com/Memory-Leak-Using-Temporary-Queues-tf3953839s2354.html#a11218217
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message