activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Sitsky <s...@nuix.com>
Subject Re: DLQ ignored when consumer terminates abnormally (potential patch included)
Date Tue, 17 Mar 2009 04:21:57 GMT
I have a revised patch here which I have been using for a while now and 
it seems to do the job.  I would love it if Rob or somebody else could 
comment on it.

The only downside at the moment is I still have a hard-coded value in 
the code of 5 for the retry-count, since I can't see any way of 
accessing the RedeliveryPolicy for this consumer..

Comments?

Cheers,
David

Index: 
activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
===================================================================
--- 
activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java 
(revision 739923)
+++ 
activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java 
Tue Mar 17 15:17:23 EST 2009
@@ -30,6 +30,8 @@
  import org.apache.activemq.broker.ConnectionContext;
  import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
  import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
  import org.apache.activemq.command.ConsumerControl;
  import org.apache.activemq.command.ConsumerInfo;
  import org.apache.activemq.command.Message;
@@ -541,10 +543,23 @@
              synchronized(dispatchLock) {
  	            for (MessageReference r : dispatched) {
  	                if( r.getRegionDestination() == destination) {
-	                	rc.add((QueueMessageReference)r);
+                        // Its possible this message caused the 
consumer to abnormally terminate.
+                        // Check if it should be moved to the DLQ.
+                        if (r.getRedeliveryCounter() >= 5)
+                        {
+                            MessageAck ack = new MessageAck();
+                            ack.setMessageID(r.getMessageId());
+ 
ack.setDestination(destination.getActiveMQDestination());
+                            ack.setAckType(MessageAck.POSION_ACK_TYPE);
+                            acknowledge(context, ack);
-	                }
+                        }
+                        else
+                        {
+	                	    rc.add(r);
-	            }
-            }
+                        }
+	                }
+	            }
+            }
              // TODO Dispatched messages should be decremented from 
Inflight stat
              // Here is a potential problem concerning Inflight stat:
              // Messages not already committed or rolled back may not 
be removed from dispatched list at the moment


On 16/03/2009 5:28 PM, David Sitsky wrote:
> I have an application which uses the DLQ for messages which have been
> redelivered a certain number of times.
>
> All goes well when the consumer calls rollback(), since the consumer has
> the opportunity to send back a poisoned ack, and this gets handled well.
>
> However there are situations where some messages can cause the entire
> JVM to exit abnormally in my application. In this situation, if the
> message has been redelivered enough, I'd like the message to go the the
> DLQ as well in the same way.
>
> The current code however will prevent this from occurring. Below is a
> very rough patch to PrefetchSubscription.remove(ConnectionContext
> context, Destination destination) to use the sendToDLQ() method?
>
> I have run out of time today to test this (I originally had these
> changes in Queue) - but would love some feedback from the activemq
> developers on this. I know 6 is hardwired in the code - I couldn't see
> an easy way to get the DLQ policy..
>
> Without this change, this "poison message" which kills my consumer JVM
> just keeps getting resent forever..
>
> Index:
> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>
> ===================================================================
> ---
> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> (revision 739923)
> +++
> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> Mon Mar 16 17:21:02 EST 2009
> @@ -541,10 +541,19 @@
> synchronized(dispatchLock) {
> for (MessageReference r : dispatched) {
> if( r.getRegionDestination() == destination) {
> + // Its possible this message caused the consumer to abnormally terminate.
> + // Check if it should be moved to the DLQ.
> + if (r.getRedeliveryCounter() >= 6)
> + {
> + sendToDLQ(context, r);
> + }
> + else
> + {
> - rc.add((QueueMessageReference)r);
> - }
> - }
> - }
> + rc.add((QueueMessageReference)r);
> + }
> + }
> + }
> + }
> // TODO Dispatched messages should be decremented from Inflight stat
> // Here is a potential problem concerning Inflight stat:
> // Messages not already committed or rolled back may not be removed from
> dispatched list at the moment
>
>
>
>

-- 
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Mime
View raw message