activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Heiko Bobzin" <brzzlw...@gmx.de>
Subject removeMatchingMessages in low memory conditions
Date Tue, 18 Sep 2012 15:05:12 GMT
Hello,
I'm trying to delete a persistent message from a large queue with removeMatchingMessages.
The code works well until the memory that is used to browse the queue hits the systemUsage.memoryUsage.limit.

I've changed the LargeQueueSparseDelete test case to reproduce this problem (see below). In
the Queue.java code I can see that doPageIn(true) will never page in any more messages and
the loop in removeMatchingMessages will never end. It seems that the reference count for the
messages in that loop is never decreased.

Thanks,
Heiko

package org.apache.activemq.usecases;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.ConnectionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Assert;

/**
 * This unit test creates a fixed size queue and tries to remove some messages in the
 * queueThe test is used to verify the performance of
 * {@link org.apache.activemq.broker.region.Queue#removeMatchingMessages(org.apache.activemq.broker.ConnectionContext,
String, int)}.
 */
public class LargeQueueLowMemoryRemoveTest extends EmbeddedBrokerTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(LargeQueueLowMemoryRemoveTest.class);

    /**
     * {@inheritDoc}
     */
    @Override
    protected void setUp() throws Exception {
        super.useTopic = false;
        super.setUp();
    }

    /**
     * @return whether or not persistence should be used
     */
    protected boolean isPersistent() {
        return true;
    }


    public void testRemoveMessages() throws Exception {
        final int QUEUE_SIZE = 30000;
        final int QUEUE_CHUNK_SIZE = 100;
        final long TEST_TIMEOUT = 6000;

        // Populate a test queue with uniquely-identifiable messages.
        Connection conn = createConnection();
        
        // Set the maximum memory to be used by the broker:
        this.broker.getSystemUsage().getMemoryUsage().setLimit(10000000);
        this.broker.getSystemUsage().getStoreUsage().setLimit( 200000000);
        this.broker.getSystemUsage().getTempUsage().setLimit(  300000000);

        int count = 0;
        conn.start();
        while (count < QUEUE_SIZE) {
	        try {
	            Session session = conn.createSession(false,
	                    Session.AUTO_ACKNOWLEDGE);
	            MessageProducer producer = session.createProducer(destination);
	            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
	            for (int i = 0; i < QUEUE_CHUNK_SIZE && count < QUEUE_SIZE; i++)
{
	                Message message = session.createMessage();
	                message.setIntProperty("id", count);
	                
	                producer.send(message);
	                count += 1;
	            }
                LOG.info("count =  " + count);
                session.close();
	        } finally {
	        }
        }
        conn.close();
        // Access the implementation of the test queue and remove a message.
        Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(
                destination);

        ConnectionContext context = new ConnectionContext(
                new NonCachedMessageEvaluationContext());
        context.setBroker(broker.getBroker());
        context.getMessageEvaluationContext().setDestination(destination);

        long startTimeMillis = System.currentTimeMillis();
        Assert.assertEquals(1,
            queue.removeMatchingMessages("id=" + (QUEUE_SIZE - 1)));

        // Never reached this end, if the memory limit is lower than the queue size.
        long durationMillis = System.currentTimeMillis() - startTimeMillis;

        LOG.info("It took " + durationMillis
                + "ms to remove the last message from a queue a " + QUEUE_SIZE
                + " messages.");
    }
}



Mime
View raw message