activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gary Tully (JIRA)" <j...@apache.org>
Subject [jira] Commented: (AMQ-2908) Slow consumer stops receiving messages because PrefetchSubscription.dispatched is filled with expired messages.
Date Fri, 10 Sep 2010 12:50:41 GMT

    [ https://issues.apache.org/activemq/browse/AMQ-2908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=61769#action_61769
] 

Gary Tully commented on AMQ-2908:
---------------------------------

a code change like that without a test to validate is just a good start. To have it committed
to trunk it needs a test to validate: 
1) the issue still exists
2) to protect the fix into the future

I guess a slow client ack consumer can fill the prefetch and an appropriate ttl can have them
expire, but I wonder what type of client would cause this behavior is real life, until it
acks there will be no further dispatch.
There is an existing check for message expiry on dispatch and on receiving an ack.

> Slow consumer stops receiving messages because PrefetchSubscription.dispatched is filled
with expired messages.
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-2908
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2908
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.3.2
>            Reporter: Siim Kaalep
>
> Slow consumer gets stuck when consuming from queue that has expiring messages in it.

> Looked into broker while it got stuck and saw that PrefetchSubscription.dispatched is
full of expired messages.
> WORKAROUND
> Into doActualDispatch added check that if subscription is full, it will remove all expired
message from dispatch.
> {code}
> Index: trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> ===================================================================
> --- trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
              (revision 42304)
> +++ trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
           (working copy)
> @@ -400,6 +400,21 @@
>          }
>      }
>  
> +   public void removeExpiredMessagesFromDispatch() {
> +     synchronized(dispatchLock) {
> +                  for (Iterator<MessageReference> iter = dispatched.iterator();
iter.hasNext(); ) {
> +                    final MessageReference node = iter.next();
> +                    if (node.isExpired()) {
> +                        if (broker.isExpired(node)) {
> +                            node.getRegionDestination().messageExpired(context, this,
node);
> +                        }
> +                        dispatched.remove(node);
> +                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
> +                    }
> +                  }
> +     }
> +   }
> +    
>      /**
>       * Checks an ack versus the contents of the dispatched list.
>       * 
> {code}
> {code}
> Index: trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
> ===================================================================
> --- trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java  
        (revision 42304)
> +++ trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java  
     (working copy)
> @@ -1543,6 +1543,9 @@
>                  }
>                  if (dispatchSelector.canSelect(s, node)) {
>                      if (!fullConsumers.contains(s)) {
> +                               if (s.isFull() && s instanceof PrefetchSubscription)
{
> +                                             ((PrefetchSubscription)s).removeExpiredMessagesFromDispatch();
> +                               }
>                          if (!s.isFull()) {
>                              // Dispatch it.
>                              s.add(node);
> {code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message