activemq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ryan Doyle (JIRA)" <j...@apache.org>
Subject [jira] [Created] (AMQ-5927) Acknowledged messages marked for redelivery stop new messages being consumed
Date Fri, 14 Aug 2015 01:13:46 GMT
Ryan Doyle created AMQ-5927:
-------------------------------

             Summary: Acknowledged messages marked for redelivery stop new messages being
consumed
                 Key: AMQ-5927
                 URL: https://issues.apache.org/jira/browse/AMQ-5927
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 5.11.1
         Environment: Ubuntu 12.04

java version "1.7.0_75"
Java(TM) SE Runtime Environment (build 1.7.0_75-b13)
Java HotSpot(TM) 64-Bit Server VM (build 24.75-b04, mixed mode)
            Reporter: Ryan Doyle
             Fix For: 5.x


Hi

I am getting an issue where consumers are stuck unable to consume messages after an issue
with the underlying persistence failing to store a message (EG: out of disk space).

The issue I am seeing is that when a subscription is removed from a queue (the subscription
get forcibly removed from the persistence adapter failure), as part of the shutdown, messages
are taken off the subscription and then are stored in {{redeliveredWaitingDispatch}}. 

{codecode:title=org.apache.activemq.broker.region.Queue:547}
List<MessageReference> unAckedMessages = sub.remove(context, this);
{code}


{code:title=org.apache.activemq.broker.region.Queue:583}
if (!qmr.isDropped()) {
                        redeliveredWaitingDispatch.addMessageLast(qmr);
                    }
{code}

The only problem is that /some/ of these messages are actually acked {{org.apache.activemq.broker.region.QueueMessageReference#isAcked}}.
I have put breakpoints in this section of the code and confirmed this.

This later becomes a problem. If there are any messages that are acked that end up in {{redeliveredWaitingDispatch}},
this stops any message consumption from happening. 

If you follow {{org.apache.activemq.broker.region.Queue#doDispatch:1957}}, you can see that
{{redeliveredWaitingDispatch}} /always/ takes precedence. New messages that come in don't
get dispatched until {{redeliveredWaitingDispatch}} is empty. The problem is that this /never/
gets emptied.

In {{org.apache.activemq.broker.region.Queue#doActualDispatch:2002}} at line {{2028}}, we
only dispatch and remove the message if the {{QueueMessageReference}} is not acknowledged.

{code}
if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node)
&& !((QueueMessageReference) node).isAcked() ) {
                            // Dispatch it.
                            s.add(node);
                            LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId());
                            iterator.remove();
                            target = s;
                            break;
                        }
{code}


I have a simple patch that will fix the issue, but I am not sure of the full implications
of the patch. I am hoping one of the ActiveMQ developers could chime in here:

{code}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index c0a237f..64717f5 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -580,7 +580,7 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
                             }
                         }
                     }
-                    if (!qmr.isDropped()) {
+                    if (!qmr.isDropped() && !qmr.isAcked()) {
                         redeliveredWaitingDispatch.addMessageLast(qmr);
                     }
                 }

{code}

To reproduce this issue, I filled the disk completely by putting messages on a queue. I then
used dd to zero out any remaining space on the disk. I then consumed the messages from the
queue. /Some/ messages will be consumed but at some point the consumer will fail due to the
out of space errors. Its at this point there will be some acked messages put on redeliveredWaitingDispatch
queue. This can be confirmed by adding a conditional breakpoint at org.apache.activemq.broker.region.Queue:584
where qmr.isAcked() == true.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message