activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gary Tully (JIRA)" <j...@apache.org>
Subject [jira] Commented: (AMQ-2908) Slow consumer stops receiving messages because PrefetchSubscription.dispatched is filled with expired messages.
Date Thu, 04 Nov 2010 18:46:01 GMT

    [ https://issues.apache.org/activemq/browse/AMQ-2908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=63059#action_63059
] 

Gary Tully commented on AMQ-2908:
---------------------------------

I tried to reproduce this issue with a test case. The test case is committed at apache as
it works ok: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?view=diff&r1=1031135&r2=1031136&pathrev=1031136
It is a variant of a slow consumer test, in the original the consumer blocks and only acks
when all the messages in the queue have expired. So it will have received 600 (prefetch) messages
and they will be in the prefetchsubscription.dispatched list on the broker. The test verifies
that all of these messages expire.
The variation of that test to validate this issue adds another batch of messages with no expiry
and validates that the consumer can receive them, so it proves that expiring messages in the
dispatched list does not block the consumer.

Could you look at the test case to see if any of the options used in the test do not match
your use case?

> Slow consumer stops receiving messages because PrefetchSubscription.dispatched is filled
with expired messages.
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-2908
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2908
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.3.2
>            Reporter: Siim Kaalep
>            Assignee: Gary Tully
>
> Slow consumer gets stuck when consuming from queue that has expiring messages in it.

> Looked into broker while it got stuck and saw that PrefetchSubscription.dispatched is
full of expired messages.
> WORKAROUND
> Into doActualDispatch added check that if subscription is full, it will remove all expired
message from dispatch.
> {code}
> Index: trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> ===================================================================
> --- trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
              (revision 42304)
> +++ trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
           (working copy)
> @@ -400,6 +400,21 @@
>          }
>      }
>  
> +   public void removeExpiredMessagesFromDispatch() {
> +     synchronized(dispatchLock) {
> +                  for (Iterator<MessageReference> iter = dispatched.iterator();
iter.hasNext(); ) {
> +                    final MessageReference node = iter.next();
> +                    if (node.isExpired()) {
> +                        if (broker.isExpired(node)) {
> +                            node.getRegionDestination().messageExpired(context, this,
node);
> +                        }
> +                        dispatched.remove(node);
> +                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
> +                    }
> +                  }
> +     }
> +   }
> +    
>      /**
>       * Checks an ack versus the contents of the dispatched list.
>       * 
> {code}
> {code}
> Index: trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
> ===================================================================
> --- trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java  
        (revision 42304)
> +++ trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java  
     (working copy)
> @@ -1543,6 +1543,9 @@
>                  }
>                  if (dispatchSelector.canSelect(s, node)) {
>                      if (!fullConsumers.contains(s)) {
> +                               if (s.isFull() && s instanceof PrefetchSubscription)
{
> +                                             ((PrefetchSubscription)s).removeExpiredMessagesFromDispatch();
> +                               }
>                          if (!s.isFull()) {
>                              // Dispatch it.
>                              s.add(node);
> {code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message