activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Paul Morris (JIRA)" <j...@apache.org>
Subject [jira] Commented: (AMQ-2908) Slow consumer stops receiving messages because PrefetchSubscription.dispatched is filled with expired messages.
Date Fri, 15 Oct 2010 13:38:40 GMT

    [ https://issues.apache.org/activemq/browse/AMQ-2908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=62611#action_62611
] 

Paul Morris commented on AMQ-2908:
----------------------------------

hmmm i am wondering if i am seeing the same issue here, unfortunately the original stack trace
or indication of why the user thinks this is a bug was not included but i am seeing the same
sort of thing and the broker produced the following stack trace:

2010-09-05 09:07:51,844 | ERROR | Failed to page in more queue messages  | org.apache.activemq.broker.region.Queue
| Queue:CORE-IN-QUEUE
java.lang.OutOfMemoryError: Java heap space
                at java.util.Arrays.copyOf(Unknown Source)
                at java.util.concurrent.CopyOnWriteArrayList.add(Unknown Source)
                at org.apache.activemq.broker.region.PrefetchSubscription.dispatch(PrefetchSubscription.java:630)
                at org.apache.activemq.broker.region.PrefetchSubscription.dispatchPending(PrefetchSubscription.java:592)
                at org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:158)
                at org.apache.activemq.broker.region.Queue.doActualDispatch(Queue.java:1548)
                at org.apache.activemq.broker.region.Queue.doDispatch(Queue.java:1500)
                at org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:1585)
                at org.apache.activemq.broker.region.Queue.iterate(Queue.java:1219)
                at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
                at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)

we are using 5.3.1 by the way, and i will hopefully upgrade to 5.4.1 next week and retry

in our case we have very slow consumers in certain conditions, because they recieve the message
and post it out to a web endpoint, and at various points during the year the web endpoint
is taken offline, causing us the above problem.... 

we do have a test case here, but it is kind of explicitly linked into stopping and starting
our services, and pulling the IP stack down, so it is kind of difficult to repost that one,
and it takes about 2 days for it to actually kill the system after 300 messages per second

> Slow consumer stops receiving messages because PrefetchSubscription.dispatched is filled
with expired messages.
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-2908
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2908
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.3.2
>            Reporter: Siim Kaalep
>            Assignee: Gary Tully
>
> Slow consumer gets stuck when consuming from queue that has expiring messages in it.

> Looked into broker while it got stuck and saw that PrefetchSubscription.dispatched is
full of expired messages.
> WORKAROUND
> Into doActualDispatch added check that if subscription is full, it will remove all expired
message from dispatch.
> {code}
> Index: trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> ===================================================================
> --- trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
              (revision 42304)
> +++ trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
           (working copy)
> @@ -400,6 +400,21 @@
>          }
>      }
>  
> +   public void removeExpiredMessagesFromDispatch() {
> +     synchronized(dispatchLock) {
> +                  for (Iterator<MessageReference> iter = dispatched.iterator();
iter.hasNext(); ) {
> +                    final MessageReference node = iter.next();
> +                    if (node.isExpired()) {
> +                        if (broker.isExpired(node)) {
> +                            node.getRegionDestination().messageExpired(context, this,
node);
> +                        }
> +                        dispatched.remove(node);
> +                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
> +                    }
> +                  }
> +     }
> +   }
> +    
>      /**
>       * Checks an ack versus the contents of the dispatched list.
>       * 
> {code}
> {code}
> Index: trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
> ===================================================================
> --- trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java  
        (revision 42304)
> +++ trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java  
     (working copy)
> @@ -1543,6 +1543,9 @@
>                  }
>                  if (dispatchSelector.canSelect(s, node)) {
>                      if (!fullConsumers.contains(s)) {
> +                               if (s.isFull() && s instanceof PrefetchSubscription)
{
> +                                             ((PrefetchSubscription)s).removeExpiredMessagesFromDispatch();
> +                               }
>                          if (!s.isFull()) {
>                              // Dispatch it.
>                              s.add(node);
> {code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message