Return-Path: Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: (qmail 7492 invoked from network); 5 Nov 2010 09:20:53 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 5 Nov 2010 09:20:53 -0000 Received: (qmail 23396 invoked by uid 500); 5 Nov 2010 09:21:24 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 23283 invoked by uid 500); 5 Nov 2010 09:21:22 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 23275 invoked by uid 99); 5 Nov 2010 09:21:22 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Nov 2010 09:21:22 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.22] (HELO thor.apache.org) (140.211.11.22) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Nov 2010 09:21:21 +0000 Received: from thor (localhost [127.0.0.1]) by thor.apache.org (8.13.8+Sun/8.13.8) with ESMTP id oA59L0G9003851 for ; Fri, 5 Nov 2010 09:21:01 GMT Message-ID: <1052785.9071288948860782.JavaMail.jira@thor> Date: Fri, 5 Nov 2010 05:21:00 -0400 (EDT) From: "Siim Kaalep (JIRA)" To: dev@activemq.apache.org Subject: [jira] Issue Comment Edited: (AMQ-2908) Slow consumer stops receiving messages because PrefetchSubscription.dispatched is filled with expired messages. In-Reply-To: <15342604.13361284100120379.JavaMail.jira@thor> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: ae95407df07c98740808b2ef9da0087c [ https://issues.apache.org/activemq/browse/AMQ-2908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=63092#action_63092 ] Siim Kaalep edited comment on AMQ-2908 at 11/5/10 5:19 AM: ----------------------------------------------------------- After performing more tests I also observed the appearance of OutOfMemoryError. I would like to raise the theoretical case that OutOfMemoryError caused some thread to die that was supposed to perform maintenance for the dispatched list. Leading to a situation wherein the expired messages clogged the processing of the dispatched list. If that would be the case, then the current fix is redundant and the real problem is what caused that OutOfMemoryError. was (Author: siim_kaalep): After performing more tests I also observed the appearance of OutOfMemoryError. I would like to raise the theoretical case that OutOfMemoryError caused some thread to die that was supposed to perform maintenance for the dispatched list. Leading to a situation wherein the expired messages clogged the processing of the dispatched list. If that would be the case, then the current fix is redundant and the real problem is caused by OutOfMemoryError. > Slow consumer stops receiving messages because PrefetchSubscription.dispatched is filled with expired messages. > --------------------------------------------------------------------------------------------------------------- > > Key: AMQ-2908 > URL: https://issues.apache.org/activemq/browse/AMQ-2908 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 5.3.2 > Reporter: Siim Kaalep > Assignee: Gary Tully > > Slow consumer gets stuck when consuming from queue that has expiring messages in it. > Looked into broker while it got stuck and saw that PrefetchSubscription.dispatched is full of expired messages. > WORKAROUND > Into doActualDispatch added check that if subscription is full, it will remove all expired message from dispatch. > {code} > Index: trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java > =================================================================== > --- trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (revision 42304) > +++ trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (working copy) > @@ -400,6 +400,21 @@ > } > } > > + public void removeExpiredMessagesFromDispatch() { > + synchronized(dispatchLock) { > + for (Iterator iter = dispatched.iterator(); iter.hasNext(); ) { > + final MessageReference node = iter.next(); > + if (node.isExpired()) { > + if (broker.isExpired(node)) { > + node.getRegionDestination().messageExpired(context, this, node); > + } > + dispatched.remove(node); > + node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); > + } > + } > + } > + } > + > /** > * Checks an ack versus the contents of the dispatched list. > * > {code} > {code} > Index: trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java > =================================================================== > --- trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (revision 42304) > +++ trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (working copy) > @@ -1543,6 +1543,9 @@ > } > if (dispatchSelector.canSelect(s, node)) { > if (!fullConsumers.contains(s)) { > + if (s.isFull() && s instanceof PrefetchSubscription) { > + ((PrefetchSubscription)s).removeExpiredMessagesFromDispatch(); > + } > if (!s.isFull()) { > // Dispatch it. > s.add(node); > {code} -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.