activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Siim Kaalep (JIRA)" <j...@apache.org>
Subject [jira] Created: (AMQ-2908) Slow consumer stops receiving messages because PrefetchSubscription.dispatched is filled with expired messages.
Date Fri, 10 Sep 2010 06:28:40 GMT
Slow consumer stops receiving messages because PrefetchSubscription.dispatched is filled with
expired messages.
---------------------------------------------------------------------------------------------------------------

                 Key: AMQ-2908
                 URL: https://issues.apache.org/activemq/browse/AMQ-2908
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 5.3.2
            Reporter: Siim Kaalep


Slow consumer gets stuck when consuming from queue that has expiring messages in it. 

Looked into broker while it got stuck and saw that PrefetchSubscription.dispatched is full
of expired messages.

WORKAROUND
Into doActualDispatch added check that if subscription is full, it will remove all expired
message from dispatch.

{code}
Index: trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
===================================================================
--- trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
              (revision 42304)
+++ trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
           (working copy)
@@ -400,6 +400,21 @@
         }
     }
 
+   public void removeExpiredMessagesFromDispatch() {
+     synchronized(dispatchLock) {
+                  for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();
) {
+                    final MessageReference node = iter.next();
+                    if (node.isExpired()) {
+                        if (broker.isExpired(node)) {
+                            node.getRegionDestination().messageExpired(context, this, node);
+                        }
+                        dispatched.remove(node);
+                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+                    }
+                  }
+     }
+   }
+    
     /**
      * Checks an ack versus the contents of the dispatched list.
      * 
{code}

{code}
Index: trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
===================================================================
--- trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java       
   (revision 42304)
+++ trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java       
(working copy)
@@ -1543,6 +1543,9 @@
                 }
                 if (dispatchSelector.canSelect(s, node)) {
                     if (!fullConsumers.contains(s)) {
+                               if (s.isFull() && s instanceof PrefetchSubscription)
{
+                                             ((PrefetchSubscription)s).removeExpiredMessagesFromDispatch();
+                               }
                         if (!s.isFull()) {
                             // Dispatch it.
                             s.add(node);

{code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message