Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 36159 invoked from network); 2 Jul 2006 13:43:42 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 2 Jul 2006 13:43:42 -0000 Received: (qmail 83906 invoked by uid 500); 2 Jul 2006 13:43:42 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 83886 invoked by uid 500); 2 Jul 2006 13:43:41 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 83877 invoked by uid 99); 2 Jul 2006 13:43:41 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 02 Jul 2006 06:43:41 -0700 X-ASF-Spam-Status: No, hits=-5.3 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME,URIBL_JP_SURBL X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 02 Jul 2006 06:43:41 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id CE1E61A983A; Sun, 2 Jul 2006 06:43:20 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r418592 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/AbstractConnection.java state/ConnectionState.java state/SessionState.java Date: Sun, 02 Jul 2006 13:43:20 -0000 To: activemq-commits@geronimo.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060702134320.CE1E61A983A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: chirino Date: Sun Jul 2 06:43:19 2006 New Revision: 418592 URL: http://svn.apache.org/viewvc?rev=418592&view=rev Log: Fixing http://issues.apache.org/activemq/browse/AMQ-724, async exception could close a connection while a new consumer is being added which resulted in the consumer not being removed from the broker when the connction was shut down. Danielius Jurna, thanks for the great bug report and problem determination! Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?rev=418592&r1=418591&r2=418592&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java Sun Jul 2 06:43:19 2006 @@ -421,7 +421,11 @@ if( ss == null ) throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "+sessionId); broker.addProducer(cs.getContext(), info); - ss.addProducer(info); + try { + ss.addProducer(info); + } catch (IllegalStateException e) { + broker.removeProducer(cs.getContext(), info); + } return null; } @@ -451,7 +455,12 @@ throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "+sessionId); broker.addConsumer(cs.getContext(), info); - ss.addConsumer(info); + try { + ss.addConsumer(info); + } catch (IllegalStateException e) { + broker.removeConsumer(cs.getContext(), info); + } + return null; } @@ -476,8 +485,12 @@ ConnectionId connectionId = info.getSessionId().getParentId(); ConnectionState cs = lookupConnectionState(connectionId); - broker.addSession(cs.getContext(), info); - cs.addSession(info); + broker.addSession(cs.getContext(), info); + try { + cs.addSession(info); + } catch (IllegalStateException e) { + broker.removeSession(cs.getContext(), info); + } return null; } @@ -487,6 +500,10 @@ ConnectionState cs = lookupConnectionState(connectionId); SessionState session = cs.getSessionState(id); + + // Don't let new consumers or producers get added while we are closing this down. + session.shutdown(); + if( session == null ) throw new IllegalStateException("Cannot remove session that had not been registered: "+id); @@ -543,6 +560,9 @@ public Response processRemoveConnection(ConnectionId id) { ConnectionState cs = lookupConnectionState(id); + + // Don't allow things to be added to the connection state while we are shutting down. + cs.shutdown(); // Cascade the connection stop to the sessions. for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) { Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java?rev=418592&r1=418591&r2=418592&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java Sun Jul 2 06:43:19 2006 @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; @@ -37,6 +38,7 @@ final ConnectionInfo info; private final ConcurrentHashMap sessions = new ConcurrentHashMap(); private final List tempDestinations = Collections.synchronizedList(new ArrayList()); + private final AtomicBoolean shutdown = new AtomicBoolean(false); public ConnectionState(ConnectionInfo info) { this.info = info; @@ -49,10 +51,11 @@ } public void addTempDestination(DestinationInfo info) { + checkShutdown(); tempDestinations.add(info); } - public void removeTempDestination(ActiveMQDestination destination) { + public void removeTempDestination(ActiveMQDestination destination) { for (Iterator iter = tempDestinations.iterator(); iter.hasNext();) { DestinationInfo di = (DestinationInfo) iter.next(); if( di.getDestination().equals(destination) ) { @@ -62,6 +65,7 @@ } public void addSession(SessionInfo info) { + checkShutdown(); sessions.put(info.getSessionId(), new SessionState(info)); } public SessionState removeSession(SessionId id) { @@ -85,5 +89,19 @@ public Collection getSessionStates() { return sessions.values(); - } + } + + private void checkShutdown() { + if( shutdown.get() ) + throw new IllegalStateException("Disposed"); + } + + public void shutdown() { + if( shutdown.compareAndSet(false, true) ) { + for (Iterator iter = sessions.values().iterator(); iter.hasNext();) { + SessionState ss = (SessionState) iter.next(); + ss.shutdown(); + } + } + } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java?rev=418592&r1=418591&r2=418592&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java Sun Jul 2 06:43:19 2006 @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; @@ -33,6 +34,7 @@ public final ConcurrentHashMap producers = new ConcurrentHashMap(); public final ConcurrentHashMap consumers = new ConcurrentHashMap(); + private final AtomicBoolean shutdown = new AtomicBoolean(false); public SessionState(SessionInfo info) { this.info = info; @@ -42,6 +44,7 @@ } public void addProducer(ProducerInfo info) { + checkShutdown(); producers.put(info.getProducerId(), new ProducerState(info)); } public ProducerState removeProducer(ProducerId id) { @@ -49,6 +52,7 @@ } public void addConsumer(ConsumerInfo info) { + checkShutdown(); consumers.put(info.getConsumerId(), new ConsumerState(info)); } public ConsumerState removeConsumer(ConsumerId id) { @@ -72,5 +76,15 @@ public Collection getConsumerStates() { return consumers.values(); - } + } + + private void checkShutdown() { + if( shutdown.get() ) + throw new IllegalStateException("Disposed"); + } + + public void shutdown() { + shutdown.set(false); + } + }