activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Steven Van Loon" <steven.vanl...@invenso.com>
Subject RE: Expired messages
Date Mon, 19 May 2008 11:25:56 GMT

Still no success on this one...

I was able to create the following small testcase.

First, make sure DLQ are enabled in the activemq.xml configuration file:
<policyEntry queue=">" memoryLimit="5mb">
    <deadLetterStrategy>
      <individualDeadLetterStrategy queuePrefix="DLQ."
useQueueForQueueMessages="true"/>
    </deadLetterStrategy>
</policyEntry>

(I use individual DLQ's only for simplicity, same results with a shared
DLQ)

When I run the test with CREATE_CONSUMER = false, a message is placed on
the queue MY.QUEUE and it stays there forever.

When I run the test with CREATE_CONSUMER = true, the expired message is
moved to the DLQ.MY.QUEUE the moment the consumer is created (not when
the message actually expires)

Also when you browse the queue MY.QUEUE when it contains an expired
message (e.g. via http://localhost:8161/admin/queues.jsp), the expired
message is moved to the DLQ the moment the queue is browsed. I already
tried to create a plugin for activemq which iterates each second over
all messages in all queues to force the expiration of messages. No
success on this one either. It works when there are no consumers at all
listening on the queue, the moment, another consumer is listening to the
queue, the expired messages will stay on the queue.

Is there anyone who has suggestions what to try next? 


import java.util.Properties;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class TestCase {


	public final static String INITIAL_CONTEXT_FACTORY =
"org.apache.activemq.jndi.ActiveMQInitialContextFactory";
	public final static String PROVIDER_URL =
"tcp://localhost:61616";
	public static final String CONNECTION_FACTORY =
"ConnectionFactory";
	public final static String QUEUE_NAME = "MY.QUEUE";

	public final static int MESSAGE_TTL = 1000; // milliseconds

	public final static boolean CREATE_CONSUMER = false;

	public static void main(String[] args) {

		QueueConnection queueConnection = null;
		QueueSession queueSession = null;
		Queue queue = null;
		try {
			QueueConnectionFactory queueConnectionFactory =
(QueueConnectionFactory) jndiLookup(CONNECTION_FACTORY);
			queueConnection =
queueConnectionFactory.createQueueConnection();
			queueConnection.start();
			queueSession =
queueConnection.createQueueSession(false,
					DeliveryMode.NON_PERSISTENT);
			queue = queueSession.createQueue(QUEUE_NAME);

			// Send a message
			System.out.println("Creating sender to " +
queue.getQueueName());
			QueueSender queueSender =
queueSession.createSender(queue);
			Message message =
queueSession.createTextMessage("This is a test");
			message.setJMSCorrelationID("CID:1");
			queueSender.send(message,
DeliveryMode.PERSISTENT, 5, MESSAGE_TTL);
			System.out.println("Message sent to " +
queue.getQueueName());

			if (CREATE_CONSUMER) {
				// Wait until message is expired
				Thread.sleep(MESSAGE_TTL + 100);

				// Receive that message --> this should
hang the process since
				// the message will be expired at this
moment.
				// Check out the queue
(http://localhost:8161/admin/queues.jsp)
				System.out.println("Creating receiver on
"
						+ queue.getQueueName());
				System.out
						.println("It will hang
because no message will be available");
				MessageConsumer consumer =
queueSession.createConsumer(queue);
				Message received = consumer.receive();
				if (received != null) {
					System.out.println("Received
message unexpectedly!!");
				}
			} else {
				// When no consumer is defined, the
expired message will stay
				// forever on the queue instead of being
moved to the DLQ
			}

		} catch (Exception e) {
			System.err.println("Problem occurred: " +
e.getMessage());
			e.printStackTrace();
		} finally {
			if (queueConnection != null) {
				try {
					queueConnection.stop();
					queueConnection.close();
				} catch (JMSException e) {
				}
			}

			if (queueSession != null) {
				try {
					queueSession.close();
				} catch (JMSException e) {
				}
			}
		}
	}

	public static Object jndiLookup(String name) throws
NamingException {
		Context ctxt = getJndiContext();
		return ctxt.lookup(name);
	}

	public static Context getJndiContext() throws NamingException {
		Context jndiContext = null;
		Properties props = new Properties();
		props.setProperty(Context.INITIAL_CONTEXT_FACTORY,
				INITIAL_CONTEXT_FACTORY);
		props.setProperty(Context.PROVIDER_URL, PROVIDER_URL);
		jndiContext = new InitialContext(props);
		return jndiContext;
	}
}






-----Original Message-----
From: Rob Davies [mailto:rajdavies@gmail.com] 
Sent: donderdag 15 mei 2008 19:02
To: users@activemq.apache.org
Subject: Re: Expired messages

will look into it!

cheers,

Rob

http://open.iona.com/products/enterprise-activemq
http://rajdavies.blogspot.com/

On 15 May 2008, at 13:04, Steven Van Loon wrote:

> Thanks for the reply Rob.
>
> The DLQ seems to be the queue to look for indeed, but it seems that  
> the
> expired messages are not send to this queue the moment they actually
> expire.
>
> I have created the following test scenario:
>
> - I create a receiver for queue://ActiveMQ.DLQ and start it up
> - I create a receiver for queue://MY.QUEUE.C which leaves 1000 ms
> between two receive's (Thread.sleep(1000) after 1 message reception)
> - I create a producer for queue://MY.QUEUE.C with a TTL = 100 ms which
> will send 10 messages
> - I start the receiver for queue://MY.QUEUE.C
> - I start the producer for queue://MY.QUEUE.C
>
> After running this, I see this:
>
> ActiveMQ.DLQ 	
> 	Number Of Pending Messages : 0
> 	Number Of Consumers: 1
> 	Messages Sent: 0
> 	Messages Received: 0
> MY.QUEUE.C
> 	Number Of Pending Messages : 9
> 	Number Of Consumers: 1
> 	Messages Sent: 10
> 	Messages Received: 1
>
> queue://MY.QUEUE.C contains 9 expired messages after execution, DLQ  
> has
> received nothing.
>
> When I run my test an second time, I get the following stats:
>
> ActiveMQ.DLQ 	
> 	Number Of Pending Messages : 0
> 	Number Of Consumers: 1
> 	Messages Sent: 9
> 	Messages Received: 9
> MY.QUEUE.C
> 	Number Of Pending Messages : 9
> 	Number Of Consumers: 1
> 	Messages Sent: 20
> 	Messages Received: 2
>
> Which means the DLQ consumer has received the expired messages of my
> first run and the expired message of the second run are not sent to  
> the
> DLQ. (confirmed by my logging).
>
> Now, how can this be avoided? I want the expired messages to be sent
> immediately to the DLQ.
>
> Anybody any ideas?
>
> Thanks!
> Steven.
>
>
>
>
>
>
>
> -----Original Message-----
> From: Rob Davies [mailto:rajdavies@gmail.com]
> Sent: maandag 12 mei 2008 9:21
> To: users@activemq.apache.org
> Subject: Re: Expired messages
>
>
> On 9 May 2008, at 10:35, Steven Van Loon wrote:
>
>> Hi,
>>
>>
>>
>> Does anybody knows whether and how it is possible to act on expired
>> messages?
>>
>>
>>
>> Thanks!
>>
>> Steven.
>>
>
> Expired messages have taken too long to deliver to a consumer - they
> are sent to a dead letter queue - see
> http://activemq.apache.org/message-redelivery-and-dlq-handling.html
> or you can listen for advisories - see
> http://activemq.apache.org/advisory-message.html
>
>
>
>
> cheers,
>
> Rob
>
> http://open.iona.com/ -Enterprise Open Integration
> http://rajdavies.blogspot.com/
>
>
>






Mime
View raw message