activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Mason (JIRA)" <>
Subject [jira] Created: (AMQ-2937) kahadb log files not deleted when exceptions are thrown by consumers
Date Thu, 23 Sep 2010 15:05:40 GMT
kahadb log files not deleted when exceptions are thrown by consumers

                 Key: AMQ-2937
             Project: ActiveMQ
          Issue Type: Bug
          Components: Message Store
    Affects Versions: 5.4.1
         Environment: OSX 10.5.8, Spring 2.5.6
            Reporter: James Mason

kahadb log files are not being cleaned up, and are building up over a number of days. I've
created a simple test that can reproduce the problem, and it only reproduces the problem if
my message consumer throws errors (in the test case they are thrown 1 in 100 times). 

The messages are re-consumed, and none of them are added to a DLQ. I can provide the test
code if this helps. I'm using AMQ v5.4.0, persistent queues, and using AMQ embedded with Spring.

Test code:
(The test needs to be kept running until several log files have been created)

<beans xmlns=""

	<broker xmlns=""
				enableJournalDiskSyncs="false" />

			<transportConnector uri="tcp://localhost:61615?jms.prefetchPolicy.all=1" />

		<!-- JMX active -->
			<managementContext createConnector="true" />

			<statisticsBrokerPlugin />

'Unit' Test (long running test...)

public class AmqKahaDbTest extends IntegrationTestBase {

	public void testKahaDbLogs() throws Exception {
		for(int i = 0; i < 1000000; i++) {
	private void sendMessage(final int i) {
		jmsTemplate.send(testDestination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				MapMessage message = session.createMapMessage();
				message.setString("testField1", "Test Message " + i);
				return message;
	private JmsTemplate jmsTemplate;
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	private Destination testDestination;
	public void setTestDestination(Destination testDestination) {
		this.testDestination = testDestination;

Message listener:

public class TestMessageListener implements MessageListener {
	public void onMessage(Message message) {
        if (message instanceof MapMessage) {
	        	//read the type of subtask
	        	MapMessage mapMessage = (MapMessage)message;
	        	String messageText;
				try {
					messageText = mapMessage.getString("testField1");
					long sleepTime = (long)(Math.random() * 100d);
	"Test message consume start, sleep time: " + sleepTime);
					if(sleepTime == 50) {
						//if commented out then the old log files are deleted
						throw new RuntimeException("Random error!!!");
	"Test message consume start: " + messageText);
				} catch (Exception e) {
					log.error("Error consuming message", e);
					throw new RuntimeException(e);
    	} else {
            throw new IllegalArgumentException("Message must be of type Test");
	private Logger log = Logger.getLogger(TestMessageListener.class);

Spring config:

	<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
	    <property name="config" value="classpath:activemq.xml" />
	    <property name="start" value="true" />

	<amq:connectionFactory id="amqConnectionFactory">
		<property name="brokerURL" value="vm://amq-broker:61615?jms.prefetchPolicy.all=1" />
	<!-- CachingConnectionFactory Definition, sessionCacheSize property is the number of sessions
to cache -->
	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
	    <constructor-arg ref="amqConnectionFactory" />
	    <property name="exceptionListener" ref="jmsExceptionListener" />
	    <property name="sessionCacheSize" value="100" />

	<bean id="testDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="test.queue" />
	<bean id="testListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	    <property name="connectionFactory" ref="connectionFactory"/>
	    <property name="destination" ref="testDestination"/>
	    <property name="messageListener" ref="testMessageListener"/>
	    <property name="sessionTransacted" value="true"/>
	    <property name="maxConcurrentConsumers" value="10" />
	    <property name="exceptionListener" ref="jmsExceptionListener" />
	<bean id="testMessageListener" class="TestMessageListener" />

This message is automatically generated by JIRA.
You can reply to this email to add a comment to the issue online.

View raw message