activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rob Davies <rajdav...@gmail.com>
Subject Re: Expired messages
Date Mon, 19 May 2008 11:38:39 GMT
Ah - penny drops - I see what you mean now. There's an outstanding  
enhancement (can't get access to jira at the moment) - to optionally  
allow for messages to actively be expired from in-flight or from the  
store instead of waiting for message consumption to trigger the  
expiration - you'll have to wait for it to be implemented!

cheers,

Rob

http://open.iona.com/products/enterprise-activemq
http://rajdavies.blogspot.com/

On 19 May 2008, at 12:25, Steven Van Loon wrote:

>
> Still no success on this one...
>
> I was able to create the following small testcase.
>
> First, make sure DLQ are enabled in the activemq.xml configuration  
> file:
> <policyEntry queue=">" memoryLimit="5mb">
>    <deadLetterStrategy>
>      <individualDeadLetterStrategy queuePrefix="DLQ."
> useQueueForQueueMessages="true"/>
>    </deadLetterStrategy>
> </policyEntry>
>
> (I use individual DLQ's only for simplicity, same results with a  
> shared
> DLQ)
>
> When I run the test with CREATE_CONSUMER = false, a message is  
> placed on
> the queue MY.QUEUE and it stays there forever.
>
> When I run the test with CREATE_CONSUMER = true, the expired message  
> is
> moved to the DLQ.MY.QUEUE the moment the consumer is created (not when
> the message actually expires)
>
> Also when you browse the queue MY.QUEUE when it contains an expired
> message (e.g. via http://localhost:8161/admin/queues.jsp), the expired
> message is moved to the DLQ the moment the queue is browsed. I already
> tried to create a plugin for activemq which iterates each second over
> all messages in all queues to force the expiration of messages. No
> success on this one either. It works when there are no consumers at  
> all
> listening on the queue, the moment, another consumer is listening to  
> the
> queue, the expired messages will stay on the queue.
>
> Is there anyone who has suggestions what to try next?
>
>
> import java.util.Properties;
>
> import javax.jms.DeliveryMode;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.Queue;
> import javax.jms.QueueConnection;
> import javax.jms.QueueConnectionFactory;
> import javax.jms.QueueSender;
> import javax.jms.QueueSession;
> import javax.naming.Context;
> import javax.naming.InitialContext;
> import javax.naming.NamingException;
>
> public class TestCase {
>
>
> 	public final static String INITIAL_CONTEXT_FACTORY =
> "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
> 	public final static String PROVIDER_URL =
> "tcp://localhost:61616";
> 	public static final String CONNECTION_FACTORY =
> "ConnectionFactory";
> 	public final static String QUEUE_NAME = "MY.QUEUE";
>
> 	public final static int MESSAGE_TTL = 1000; // milliseconds
>
> 	public final static boolean CREATE_CONSUMER = false;
>
> 	public static void main(String[] args) {
>
> 		QueueConnection queueConnection = null;
> 		QueueSession queueSession = null;
> 		Queue queue = null;
> 		try {
> 			QueueConnectionFactory queueConnectionFactory =
> (QueueConnectionFactory) jndiLookup(CONNECTION_FACTORY);
> 			queueConnection =
> queueConnectionFactory.createQueueConnection();
> 			queueConnection.start();
> 			queueSession =
> queueConnection.createQueueSession(false,
> 					DeliveryMode.NON_PERSISTENT);
> 			queue = queueSession.createQueue(QUEUE_NAME);
>
> 			// Send a message
> 			System.out.println("Creating sender to " +
> queue.getQueueName());
> 			QueueSender queueSender =
> queueSession.createSender(queue);
> 			Message message =
> queueSession.createTextMessage("This is a test");
> 			message.setJMSCorrelationID("CID:1");
> 			queueSender.send(message,
> DeliveryMode.PERSISTENT, 5, MESSAGE_TTL);
> 			System.out.println("Message sent to " +
> queue.getQueueName());
>
> 			if (CREATE_CONSUMER) {
> 				// Wait until message is expired
> 				Thread.sleep(MESSAGE_TTL + 100);
>
> 				// Receive that message --> this should
> hang the process since
> 				// the message will be expired at this
> moment.
> 				// Check out the queue
> (http://localhost:8161/admin/queues.jsp)
> 				System.out.println("Creating receiver on
> "
> 						+ queue.getQueueName());
> 				System.out
> 						.println("It will hang
> because no message will be available");
> 				MessageConsumer consumer =
> queueSession.createConsumer(queue);
> 				Message received = consumer.receive();
> 				if (received != null) {
> 					System.out.println("Received
> message unexpectedly!!");
> 				}
> 			} else {
> 				// When no consumer is defined, the
> expired message will stay
> 				// forever on the queue instead of being
> moved to the DLQ
> 			}
>
> 		} catch (Exception e) {
> 			System.err.println("Problem occurred: " +
> e.getMessage());
> 			e.printStackTrace();
> 		} finally {
> 			if (queueConnection != null) {
> 				try {
> 					queueConnection.stop();
> 					queueConnection.close();
> 				} catch (JMSException e) {
> 				}
> 			}
>
> 			if (queueSession != null) {
> 				try {
> 					queueSession.close();
> 				} catch (JMSException e) {
> 				}
> 			}
> 		}
> 	}
>
> 	public static Object jndiLookup(String name) throws
> NamingException {
> 		Context ctxt = getJndiContext();
> 		return ctxt.lookup(name);
> 	}
>
> 	public static Context getJndiContext() throws NamingException {
> 		Context jndiContext = null;
> 		Properties props = new Properties();
> 		props.setProperty(Context.INITIAL_CONTEXT_FACTORY,
> 				INITIAL_CONTEXT_FACTORY);
> 		props.setProperty(Context.PROVIDER_URL, PROVIDER_URL);
> 		jndiContext = new InitialContext(props);
> 		return jndiContext;
> 	}
> }
>
>
>
>
>
>
> -----Original Message-----
> From: Rob Davies [mailto:rajdavies@gmail.com]
> Sent: donderdag 15 mei 2008 19:02
> To: users@activemq.apache.org
> Subject: Re: Expired messages
>
> will look into it!
>
> cheers,
>
> Rob
>
> http://open.iona.com/products/enterprise-activemq
> http://rajdavies.blogspot.com/
>
> On 15 May 2008, at 13:04, Steven Van Loon wrote:
>
>> Thanks for the reply Rob.
>>
>> The DLQ seems to be the queue to look for indeed, but it seems that
>> the
>> expired messages are not send to this queue the moment they actually
>> expire.
>>
>> I have created the following test scenario:
>>
>> - I create a receiver for queue://ActiveMQ.DLQ and start it up
>> - I create a receiver for queue://MY.QUEUE.C which leaves 1000 ms
>> between two receive's (Thread.sleep(1000) after 1 message reception)
>> - I create a producer for queue://MY.QUEUE.C with a TTL = 100 ms  
>> which
>> will send 10 messages
>> - I start the receiver for queue://MY.QUEUE.C
>> - I start the producer for queue://MY.QUEUE.C
>>
>> After running this, I see this:
>>
>> ActiveMQ.DLQ 	
>> 	Number Of Pending Messages : 0
>> 	Number Of Consumers: 1
>> 	Messages Sent: 0
>> 	Messages Received: 0
>> MY.QUEUE.C
>> 	Number Of Pending Messages : 9
>> 	Number Of Consumers: 1
>> 	Messages Sent: 10
>> 	Messages Received: 1
>>
>> queue://MY.QUEUE.C contains 9 expired messages after execution, DLQ
>> has
>> received nothing.
>>
>> When I run my test an second time, I get the following stats:
>>
>> ActiveMQ.DLQ 	
>> 	Number Of Pending Messages : 0
>> 	Number Of Consumers: 1
>> 	Messages Sent: 9
>> 	Messages Received: 9
>> MY.QUEUE.C
>> 	Number Of Pending Messages : 9
>> 	Number Of Consumers: 1
>> 	Messages Sent: 20
>> 	Messages Received: 2
>>
>> Which means the DLQ consumer has received the expired messages of my
>> first run and the expired message of the second run are not sent to
>> the
>> DLQ. (confirmed by my logging).
>>
>> Now, how can this be avoided? I want the expired messages to be sent
>> immediately to the DLQ.
>>
>> Anybody any ideas?
>>
>> Thanks!
>> Steven.
>>
>>
>>
>>
>>
>>
>>
>> -----Original Message-----
>> From: Rob Davies [mailto:rajdavies@gmail.com]
>> Sent: maandag 12 mei 2008 9:21
>> To: users@activemq.apache.org
>> Subject: Re: Expired messages
>>
>>
>> On 9 May 2008, at 10:35, Steven Van Loon wrote:
>>
>>> Hi,
>>>
>>>
>>>
>>> Does anybody knows whether and how it is possible to act on expired
>>> messages?
>>>
>>>
>>>
>>> Thanks!
>>>
>>> Steven.
>>>
>>
>> Expired messages have taken too long to deliver to a consumer - they
>> are sent to a dead letter queue - see
>> http://activemq.apache.org/message-redelivery-and-dlq-handling.html
>> or you can listen for advisories - see
>> http://activemq.apache.org/advisory-message.html
>>
>>
>>
>>
>> cheers,
>>
>> Rob
>>
>> http://open.iona.com/ -Enterprise Open Integration
>> http://rajdavies.blogspot.com/
>>
>>
>>
>
>
>
>
>








Mime
View raw message