Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 51C5710329 for ; Thu, 20 Mar 2014 17:44:25 +0000 (UTC) Received: (qmail 23812 invoked by uid 500); 20 Mar 2014 17:44:23 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 23695 invoked by uid 500); 20 Mar 2014 17:44:22 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 23184 invoked by uid 99); 20 Mar 2014 17:44:13 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Mar 2014 17:44:13 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 56F8398681C; Thu, 20 Mar 2014 17:44:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hadrian@apache.org To: commits@activemq.apache.org Date: Thu, 20 Mar 2014 17:44:20 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/29] git commit: https://issues.apache.org/jira/browse/AMQ-5003 https://issues.apache.org/jira/browse/AMQ-5003 Gate the session clear in progress code so that overlapping transportInterrupted calls don't start consuming lots of memory for no reason. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7143ba2b Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7143ba2b Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7143ba2b Branch: refs/heads/activemq-5.9 Commit: 7143ba2bcad1eedb931f0b2fd632a097fba8e487 Parents: 8c7e55a Author: Timothy Bish Authored: Wed Jan 29 16:57:33 2014 -0500 Committer: Hadrian Zbarcea Committed: Thu Mar 20 13:08:10 2014 -0400 ---------------------------------------------------------------------- .../org/apache/activemq/ActiveMQSession.java | 41 ++++++++++++++------ 1 file changed, 30 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/7143ba2b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 0a96134..47ed980 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -221,6 +221,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta protected boolean sessionAsyncDispatch; protected final boolean debug; protected Object sendMutex = new Object(); + private final AtomicBoolean clearInProgress = new AtomicBoolean(); private MessageListener messageListener; private final JMSSessionStatsImpl stats; @@ -650,21 +651,39 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) { executor.clearMessagesInProgress(); - // we are called from inside the transport reconnection logic - // which involves us clearing all the connections' consumers - // dispatch and delivered lists. So rather than trying to - // grab a mutex (which could be already owned by the message - // listener calling the send or an ack) we allow it to complete in - // a separate thread via the scheduler and notify us via - // connection.transportInterruptionProcessingComplete() + // we are called from inside the transport reconnection logic which involves us + // clearing all the connections' consumers dispatch and delivered lists. So rather + // than trying to grab a mutex (which could be already owned by the message listener + // calling the send or an ack) we allow it to complete in a separate thread via the + // scheduler and notify us via connection.transportInterruptionProcessingComplete() // - for (final ActiveMQMessageConsumer consumer : consumers) { - consumer.inProgressClearRequired(); - transportInterruptionProcessingComplete.incrementAndGet(); + // We must be careful though not to allow multiple calls to this method from a + // connection that is having issue becoming fully established from causing a large + // build up of scheduled tasks to clear the same consumers over and over. + if (consumers.isEmpty()) { + return; + } + + if (clearInProgress.compareAndSet(false, true)) { + for (final ActiveMQMessageConsumer consumer : consumers) { + consumer.inProgressClearRequired(); + transportInterruptionProcessingComplete.incrementAndGet(); + try { + connection.getScheduler().executeAfterDelay(new Runnable() { + @Override + public void run() { + consumer.clearMessagesInProgress(); + }}, 0l); + } catch (JMSException e) { + connection.onClientInternalException(e); + } + } + try { connection.getScheduler().executeAfterDelay(new Runnable() { + @Override public void run() { - consumer.clearMessagesInProgress(); + clearInProgress.set(false); }}, 0l); } catch (JMSException e) { connection.onClientInternalException(e);