Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 49671 invoked from network); 25 Aug 2006 15:39:41 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 25 Aug 2006 15:39:41 -0000 Received: (qmail 45687 invoked by uid 500); 25 Aug 2006 15:39:41 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 45644 invoked by uid 500); 25 Aug 2006 15:39:40 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 45634 invoked by uid 99); 25 Aug 2006 15:39:40 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Aug 2006 08:39:40 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Aug 2006 08:39:39 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 6CE691A981A; Fri, 25 Aug 2006 08:39:19 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r436835 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/broker/ Date: Fri, 25 Aug 2006 15:39:18 -0000 To: activemq-commits@geronimo.apache.org From: jstrachan@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060825153919.6CE691A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: jstrachan Date: Fri Aug 25 08:39:16 2006 New Revision: 436835 URL: http://svn.apache.org/viewvc?rev=436835&view=rev Log: applied patch from John Heitmann to fix AMQ-889 to avoid duplicate consumers (such as on failover) leaking resources Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java (with props) Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=436835&r1=436834&r2=436835&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Fri Aug 25 08:39:16 2006 @@ -17,6 +17,8 @@ */ package org.apache.activemq.broker.jmx; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; + import javax.management.ObjectName; import org.apache.activemq.broker.Broker; @@ -33,6 +35,7 @@ final ManagedRegionBroker broker; private final BrokerService brokerService; + private final AtomicInteger sessionIdCounter = new AtomicInteger(0); public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception { this.brokerService = brokerService; @@ -156,7 +159,7 @@ ConsumerInfo info = new ConsumerInfo(); ConsumerId consumerId = new ConsumerId(); consumerId.setConnectionId(clientId); - consumerId.setSessionId(0); + consumerId.setSessionId(sessionIdCounter.incrementAndGet()); consumerId.setValue(0); info.setConsumerId(consumerId); info.setDestination(new ActiveMQTopic(topicName)); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=436835&r1=436834&r2=436835&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Fri Aug 25 08:39:16 2006 @@ -43,11 +43,11 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; /** - * + * * @version $Revision: 1.14 $ */ abstract public class AbstractRegion implements Region { - + private static final Log log = LogFactory.getLog(AbstractRegion.class); protected final ConcurrentHashMap destinations = new ConcurrentHashMap(); @@ -60,7 +60,8 @@ protected boolean autoCreateDestinations=true; protected final TaskRunnerFactory taskRunnerFactory; protected final Object destinationsMutex = new Object(); - + protected final Map consumerChangeMutexMap = new HashMap(); + public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) { this.broker = broker; this.destinationStatistics = destinationStatistics; @@ -111,7 +112,7 @@ public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout) throws Exception{ - + // No timeout.. then try to shut down right way, fails if there are current subscribers. if( timeout == 0 ) { for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){ @@ -121,19 +122,19 @@ } } } - + if( timeout > 0 ) { - // TODO: implement a way to notify the subscribers that we want to take the down + // TODO: implement a way to notify the subscribers that we want to take the down // the destination and that they should un-subscribe.. Then wait up to timeout time before // dropping the subscription. - + } log.debug("Removing destination: "+destination); synchronized(destinationsMutex){ Destination dest=(Destination) destinations.remove(destination); if(dest!=null){ - + // timeout<0 or we timed out, we now force any remaining subscriptions to un-subscribe. for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){ Subscription sub=(Subscription) iter.next(); @@ -141,20 +142,20 @@ dest.removeSubscription(context, sub); } } - + destinationMap.removeAll(destination); dest.dispose(context); dest.stop(); - + }else{ log.debug("Destination doesn't exist: " + dest); } } } - + /** * Provide an exact or wildcard lookup of destinations in the region - * + * * @return a set of matching destination objects. */ public Set getDestinations(ActiveMQDestination destination) { @@ -162,7 +163,7 @@ return destinationMap.get(destination); } } - + public Map getDestinationMap() { synchronized(destinationsMutex){ return new HashMap(destinations); @@ -177,43 +178,66 @@ // lets auto-create the destination lookup(context, destination); } - - Subscription sub = createSubscription(context, info); - // We may need to add some destinations that are in persistent store but not active - // in the broker. - // - // TODO: think about this a little more. This is good cause destinations are not loaded into - // memory until a client needs to use the queue, but a management agent viewing the - // broker will not see a destination that exists in persistent store. We may want to - // eagerly load all destinations into the broker but have an inactive state for the - // destination which has reduced memory usage. - // - if( persistenceAdapter!=null ) { - Set inactiveDests = getInactiveDestinations(); - for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) { - ActiveMQDestination dest = (ActiveMQDestination) iter.next(); - if( sub.matches(dest) ) { - context.getBroker().addDestination(context, dest); - } + Object addGuard; + synchronized(consumerChangeMutexMap) { + addGuard = consumerChangeMutexMap.get(info.getConsumerId()); + if (addGuard == null) { + addGuard = new Object(); + consumerChangeMutexMap.put(info.getConsumerId(), addGuard); } } - - subscriptions.put(info.getConsumerId(), sub); + synchronized (addGuard) { + Object o = subscriptions.get(info.getConsumerId()); + if (o != null) { + log.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this."); + return (Subscription)o; + } - // Add the subscription to all the matching queues. - for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { - Destination dest = (Destination) iter.next(); - dest.addSubscription(context, sub); - } + Subscription sub = createSubscription(context, info); - if( info.isBrowser() ) { - ((QueueBrowserSubscription)sub).browseDone(); + // We may need to add some destinations that are in persistent store but not active + // in the broker. + // + // TODO: think about this a little more. This is good cause destinations are not loaded into + // memory until a client needs to use the queue, but a management agent viewing the + // broker will not see a destination that exists in persistent store. We may want to + // eagerly load all destinations into the broker but have an inactive state for the + // destination which has reduced memory usage. + // + if( persistenceAdapter!=null ) { + Set inactiveDests = getInactiveDestinations(); + for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) { + ActiveMQDestination dest = (ActiveMQDestination) iter.next(); + if( sub.matches(dest) ) { + context.getBroker().addDestination(context, dest); + } + } + } + + subscriptions.put(info.getConsumerId(), sub); + + // At this point we're done directly manipulating subscriptions, + // but we need to retain the synchronized block here. Consider + // otherwise what would happen if at this point a second + // thread added, then removed, as would be allowed with + // no mutex held. Remove is only essentially run once + // so everything after this point would be leaked. + + // Add the subscription to all the matching queues. + for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { + Destination dest = (Destination) iter.next(); + dest.addSubscription(context, sub); + } + + if( info.isBrowser() ) { + ((QueueBrowserSubscription)sub).browseDone(); + } + + return sub; } - - return sub; } - + /** * Get all the Destinations that are in storage * @return Set of all stored destinations @@ -230,26 +254,29 @@ inactiveDests.removeAll( destinations.keySet() ); return inactiveDests; } - + public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - + log.debug("Removing consumer: "+info.getConsumerId()); - + Subscription sub = (Subscription) subscriptions.remove(info.getConsumerId()); if( sub==null ) throw new IllegalArgumentException("The subscription does not exist: "+info.getConsumerId()); - + // remove the subscription from all the matching queues. for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { Destination dest = (Destination) iter.next(); dest.removeSubscription(context, sub); } - + destroySubscription(sub); - + + synchronized (consumerChangeMutexMap) { + consumerChangeMutexMap.remove(info.getConsumerId()); + } } - protected void destroySubscription(Subscription sub) { + protected void destroySubscription(Subscription sub) { sub.destroy(); } @@ -262,7 +289,7 @@ Destination dest = lookup(context, messageSend.getDestination()); dest.send(context, messageSend); } - + public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception { Subscription sub = (Subscription) subscriptions.get(ack.getConsumerId()); if( sub==null ) @@ -295,7 +322,7 @@ return dest; } } - + public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception{ Subscription sub = (Subscription) subscriptions.get(messageDispatchNotification.getConsumerId()); if (sub != null){ @@ -306,23 +333,23 @@ for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { Subscription sub = (Subscription) iter.next(); sub.gc(); - } + } for (Iterator iter = destinations.values() .iterator(); iter.hasNext();) { Destination dest = (Destination) iter.next(); dest.gc(); - } + } } protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception; abstract protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception; - public boolean isAutoCreateDestinations() { + public boolean isAutoCreateDestinations() { return autoCreateDestinations; } public void setAutoCreateDestinations(boolean autoCreateDestinations) { this.autoCreateDestinations = autoCreateDestinations; } - + } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=436835&r1=436834&r2=436835&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Fri Aug 25 08:39:16 2006 @@ -105,6 +105,10 @@ else { super.addConsumer(context, info); sub = (DurableTopicSubscription) durableSubscriptions.get(key); + if (sub == null) { + throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + + key.getClientId() + " subscriberName: " + key.getSubscriptionName()); + } } sub.activate(context, info); Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java?rev=436835&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java (added) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java Fri Aug 25 08:39:16 2006 @@ -0,0 +1,118 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import javax.jms.DeliveryMode; +import junit.framework.Test; +import org.apache.activemq.broker.StubConnection; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.network.NetworkTestSupport; + +/** + * Pretend to be an abusive client that sends multiple + * identical ConsumerInfo commands and make sure the + * broker doesn't stall because of it. + */ + +public class DoubleSubscriptionTest extends NetworkTestSupport { + + public ActiveMQDestination destination; + public int deliveryMode; + + private String remoteURI = "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"; + + public static Test suite() { + return suite(DoubleSubscriptionTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + public void initCombosForTestDoubleSubscription() { + addCombinationValues("destination", new Object[] { new ActiveMQQueue("TEST"), new ActiveMQQueue("TEST"), }); + } + public void testDoubleSubscription() throws Exception { + + // Start a normal consumer on the remote broker + StubConnection connection1 = createRemoteConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.request(consumerInfo1); + + // Start a normal producer on a remote broker + StubConnection connection2 = createRemoteConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + connection2.request(producerInfo2); + + // Send a message to make sure the basics are working + connection2.request(createMessage(producerInfo2, destination, DeliveryMode.PERSISTENT)); + + Message m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNoMessagesLeft(connection1); + + connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); + + // Send a message to sit on the broker while we mess with it + connection2.request(createMessage(producerInfo2, destination, DeliveryMode.PERSISTENT)); + + // Now we're going to resend the same consumer commands again and see if the broker + // can handle it. + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.request(consumerInfo1); + + // After this there should be 2 messages on the broker... + connection2.request(createMessage(producerInfo2, destination, DeliveryMode.PERSISTENT)); + + // ... let's start a fresh consumer... + connection1.stop(); + StubConnection connection3 = createRemoteConnection(); + ConnectionInfo connectionInfo3 = createConnectionInfo(); + SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3); + ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo3, destination); + connection3.send(connectionInfo3); + connection3.send(sessionInfo3); + connection3.request(consumerInfo3); + + // ... and then grab the 2 that should be there. + assertNotNull(receiveMessage(connection3)); + assertNotNull(receiveMessage(connection3)); + assertNoMessagesLeft(connection3); + } + + protected String getRemoteURI() { + return remoteURI; + } + +} Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL