Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 94517 invoked from network); 14 Dec 2006 15:24:34 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 14 Dec 2006 15:24:34 -0000 Received: (qmail 19590 invoked by uid 500); 14 Dec 2006 15:24:42 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 19575 invoked by uid 500); 14 Dec 2006 15:24:42 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 19566 invoked by uid 99); 14 Dec 2006 15:24:42 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Dec 2006 07:24:42 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Dec 2006 07:24:33 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 04C4D1A981A; Thu, 14 Dec 2006 07:23:48 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r487235 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Date: Thu, 14 Dec 2006 15:23:47 -0000 To: activemq-commits@geronimo.apache.org From: jstrachan@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061214152348.04C4D1A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jstrachan Date: Thu Dec 14 07:23:47 2006 New Revision: 487235 URL: http://svn.apache.org/viewvc?view=rev&rev=487235 Log: Patch for AMQ-1093 to avoid a deadlock if the transport is being reconnected from inside a MessageListener which is calling a send(), lets make the explicit clear of the consumer dispatch list asynchronous and within the existing mutex Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?view=diff&rev=487235&r1=487234&r2=487235 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Dec 14 07:23:47 2006 @@ -17,24 +17,7 @@ */ package org.apache.activemq; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; - -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; - -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatch; -import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.*; import org.apache.activemq.management.JMSConsumerStatsImpl; import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsImpl; @@ -47,6 +30,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import javax.jms.IllegalStateException; +import javax.jms.*; +import javax.jms.Message; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -121,6 +110,7 @@ private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); private ExecutorService executorService = null; private MessageTransformer transformer; + private boolean clearDispatchList; /** * Create a MessageConsumer @@ -569,7 +559,14 @@ } void clearMessagesInProgress(){ - unconsumedMessages.clear(); + // we are called from inside the transport reconnection logic + // which involves us clearing all the connections' consumers + // dispatch lists and clearing them + // so rather than trying to grab a mutex (which could be already + // owned by the message listener calling the send) we will just set + // a flag so that the list can be cleared as soon as the + // dispatch thread is ready to flush the dispatch list + clearDispatchList= true; } void deliverAcks(){ @@ -859,7 +856,13 @@ MessageListener listener = this.messageListener; try { synchronized(unconsumedMessages.getMutex()){ - if (!unconsumedMessages.isClosed()) { + if (clearDispatchList) { + // we are reconnecting so lets flush the in progress messages + clearDispatchList = false; + unconsumedMessages.clear(); + } + + if (!unconsumedMessages.isClosed()) { if (listener != null && unconsumedMessages.isRunning() ) { ActiveMQMessage message = createActiveMQMessage(md); beforeMessageIsConsumed(md);