activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r436835 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/broker/
Date Fri, 25 Aug 2006 15:39:18 GMT
Author: jstrachan
Date: Fri Aug 25 08:39:16 2006
New Revision: 436835

URL: http://svn.apache.org/viewvc?rev=436835&view=rev
Log:
applied patch from John Heitmann to fix AMQ-889 to avoid duplicate consumers (such as on failover)
leaking resources

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=436835&r1=436834&r2=436835&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
Fri Aug 25 08:39:16 2006
@@ -17,6 +17,8 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
 import javax.management.ObjectName;
 
 import org.apache.activemq.broker.Broker;
@@ -33,6 +35,7 @@
     
     final ManagedRegionBroker broker;
 	private final BrokerService brokerService;
+    private final AtomicInteger sessionIdCounter = new AtomicInteger(0);
 
     public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws
Exception {
         this.brokerService = brokerService;
@@ -156,7 +159,7 @@
         ConsumerInfo info = new ConsumerInfo();
         ConsumerId consumerId = new ConsumerId();
         consumerId.setConnectionId(clientId);
-        consumerId.setSessionId(0);
+        consumerId.setSessionId(sessionIdCounter.incrementAndGet());
         consumerId.setValue(0);
         info.setConsumerId(consumerId);
         info.setDestination(new ActiveMQTopic(topicName));

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=436835&r1=436834&r2=436835&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Fri Aug 25 08:39:16 2006
@@ -43,11 +43,11 @@
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 
 /**
- * 
+ *
  * @version $Revision: 1.14 $
  */
 abstract public class AbstractRegion implements Region {
-    
+
     private static final Log log = LogFactory.getLog(AbstractRegion.class);
 
     protected final ConcurrentHashMap destinations = new ConcurrentHashMap();
@@ -60,7 +60,8 @@
     protected boolean autoCreateDestinations=true;
     protected final TaskRunnerFactory taskRunnerFactory;
     protected final Object destinationsMutex = new Object();
-    
+    protected final Map consumerChangeMutexMap = new HashMap();
+
     public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics,
UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter)
{
         this.broker = broker;
         this.destinationStatistics = destinationStatistics;
@@ -111,7 +112,7 @@
 
     public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long
timeout)
                     throws Exception{
-        
+
         // No timeout.. then try to shut down right way, fails if there are current subscribers.
         if( timeout == 0 ) {
             for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
@@ -121,19 +122,19 @@
                 }
             }
         }
-        
+
         if( timeout > 0 ) {
-            // TODO: implement a way to notify the subscribers that we want to take the down

+            // TODO: implement a way to notify the subscribers that we want to take the down
             // the destination and that they should un-subscribe..  Then wait up to timeout
time before
             // dropping the subscription.
-        
+
         }
 
         log.debug("Removing destination: "+destination);
         synchronized(destinationsMutex){
             Destination dest=(Destination) destinations.remove(destination);
             if(dest!=null){
-                
+
                 // timeout<0 or we timed out, we now force any remaining subscriptions
to un-subscribe.
                 for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
                     Subscription sub=(Subscription) iter.next();
@@ -141,20 +142,20 @@
                         dest.removeSubscription(context, sub);
                     }
                 }
-                
+
                 destinationMap.removeAll(destination);
                 dest.dispose(context);
                 dest.stop();
-                
+
             }else{
                 log.debug("Destination doesn't exist: " + dest);
             }
         }
     }
-    
+
     /**
      * Provide an exact or wildcard lookup of destinations in the region
-     * 
+     *
      * @return a set of matching destination objects.
      */
     public Set getDestinations(ActiveMQDestination destination) {
@@ -162,7 +163,7 @@
             return destinationMap.get(destination);
         }
     }
-    
+
     public Map getDestinationMap() {
         synchronized(destinationsMutex){
             return new HashMap(destinations);
@@ -177,43 +178,66 @@
             // lets auto-create the destination
             lookup(context, destination);
         }
-        
-        Subscription sub = createSubscription(context, info);
 
-        // We may need to add some destinations that are in persistent store but not active

-        // in the broker.
-        //
-        // TODO: think about this a little more.  This is good cause destinations are not
loaded into 
-        // memory until a client needs to use the queue, but a management agent viewing the

-        // broker will not see a destination that exists in persistent store.  We may want
to
-        // eagerly load all destinations into the broker but have an inactive state for the
-        // destination which has reduced memory usage.
-        //
-        if( persistenceAdapter!=null ) {
-            Set inactiveDests = getInactiveDestinations();
-            for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
-                ActiveMQDestination dest = (ActiveMQDestination) iter.next();
-                if( sub.matches(dest) ) {
-                    context.getBroker().addDestination(context, dest);
-                }
+        Object addGuard;
+        synchronized(consumerChangeMutexMap) {
+            addGuard = consumerChangeMutexMap.get(info.getConsumerId());
+            if (addGuard == null) {
+                addGuard = new Object();
+                consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
             }
         }
-                
-        subscriptions.put(info.getConsumerId(), sub);
+        synchronized (addGuard) {
+            Object o = subscriptions.get(info.getConsumerId());
+            if (o != null) {
+                log.warn("A duplicate subscription was detected. Clients may be misbehaving.
Later warnings you may see about subscription removal are a consequence of this.");
+                return (Subscription)o;
+            }
 
-        // Add the subscription to all the matching queues.
-        for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();)
{
-            Destination dest = (Destination) iter.next();            
-            dest.addSubscription(context, sub);
-        }        
+            Subscription sub = createSubscription(context, info);
 
-        if( info.isBrowser() ) {
-            ((QueueBrowserSubscription)sub).browseDone();
+            // We may need to add some destinations that are in persistent store but not
active
+            // in the broker.
+            //
+            // TODO: think about this a little more.  This is good cause destinations are
not loaded into
+            // memory until a client needs to use the queue, but a management agent viewing
the
+            // broker will not see a destination that exists in persistent store.  We may
want to
+            // eagerly load all destinations into the broker but have an inactive state for
the
+            // destination which has reduced memory usage.
+            //
+            if( persistenceAdapter!=null ) {
+                Set inactiveDests = getInactiveDestinations();
+                for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
+                    ActiveMQDestination dest = (ActiveMQDestination) iter.next();
+                    if( sub.matches(dest) ) {
+                        context.getBroker().addDestination(context, dest);
+                    }
+                }
+            }
+
+            subscriptions.put(info.getConsumerId(), sub);
+
+            // At this point we're done directly manipulating subscriptions,
+            // but we need to retain the synchronized block here. Consider
+            // otherwise what would happen if at this point a second
+            // thread added, then removed, as would be allowed with
+            // no mutex held. Remove is only essentially run once
+            // so everything after this point would be leaked.
+
+            // Add the subscription to all the matching queues.
+            for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();)
{
+                Destination dest = (Destination) iter.next();
+                dest.addSubscription(context, sub);
+            }
+
+            if( info.isBrowser() ) {
+                ((QueueBrowserSubscription)sub).browseDone();
+            }
+
+            return sub;
         }
-        
-        return sub;
     }
-    
+
     /**
      * Get all the Destinations that are in storage
      * @return Set of all stored destinations
@@ -230,26 +254,29 @@
         inactiveDests.removeAll( destinations.keySet() );
         return inactiveDests;
     }
-    
+
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception
{
-    	
+
         log.debug("Removing consumer: "+info.getConsumerId());
-        
+
         Subscription sub = (Subscription) subscriptions.remove(info.getConsumerId());
         if( sub==null )
             throw new IllegalArgumentException("The subscription does not exist: "+info.getConsumerId());
-        
+
         // remove the subscription from all the matching queues.
         for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();)
{
             Destination dest = (Destination) iter.next();
             dest.removeSubscription(context, sub);
         }
-        
+
         destroySubscription(sub);
-        
+
+        synchronized (consumerChangeMutexMap) {
+            consumerChangeMutexMap.remove(info.getConsumerId());
+        }
     }
 
-    protected void destroySubscription(Subscription sub) {        
+    protected void destroySubscription(Subscription sub) {
         sub.destroy();
     }
 
@@ -262,7 +289,7 @@
         Destination dest = lookup(context, messageSend.getDestination());
         dest.send(context, messageSend);
     }
-    
+
     public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
         Subscription sub = (Subscription) subscriptions.get(ack.getConsumerId());
         if( sub==null )
@@ -295,7 +322,7 @@
             return dest;
         }
     }
-    
+
     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification)
throws Exception{
         Subscription sub = (Subscription) subscriptions.get(messageDispatchNotification.getConsumerId());
         if (sub != null){
@@ -306,23 +333,23 @@
         for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
             Subscription sub = (Subscription) iter.next();
             sub.gc();
-        }        
+        }
         for (Iterator iter = destinations.values()  .iterator(); iter.hasNext();) {
             Destination dest = (Destination) iter.next();
             dest.gc();
-        }        
+        }
     }
 
     protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo
info) throws Exception;
     abstract protected Destination createDestination(ConnectionContext context, ActiveMQDestination
destination) throws Exception;
 
-    public boolean isAutoCreateDestinations() { 
+    public boolean isAutoCreateDestinations() {
         return autoCreateDestinations;
     }
 
     public void setAutoCreateDestinations(boolean autoCreateDestinations) {
         this.autoCreateDestinations = autoCreateDestinations;
     }
-    
+
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=436835&r1=436834&r2=436835&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
Fri Aug 25 08:39:16 2006
@@ -105,6 +105,10 @@
             else {
                 super.addConsumer(context, info);
                 sub = (DurableTopicSubscription) durableSubscriptions.get(key);
+                if (sub == null) {
+                    throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId()
+ " for two different durable subscriptions clientID: "
+                            + key.getClientId() + " subscriberName: " + key.getSubscriptionName());
+                }
             }
             
             sub.activate(context, info);

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java?rev=436835&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
Fri Aug 25 08:39:16 2006
@@ -0,0 +1,118 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.DeliveryMode;
+import junit.framework.Test;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.network.NetworkTestSupport;
+
+/**
+ * Pretend to be an abusive client that sends multiple
+ * identical ConsumerInfo commands and make sure the
+ * broker doesn't stall because of it.
+ */
+
+public class DoubleSubscriptionTest extends NetworkTestSupport {
+
+    public ActiveMQDestination destination;
+    public int deliveryMode;
+
+    private String remoteURI = "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
+
+    public static Test suite() {
+        return suite(DoubleSubscriptionTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    public void initCombosForTestDoubleSubscription() {
+        addCombinationValues("destination", new Object[] { new ActiveMQQueue("TEST"), new
ActiveMQQueue("TEST"), });
+    }
+    public void testDoubleSubscription() throws Exception {
+
+        // Start a normal consumer on the remote broker
+        StubConnection connection1 = createRemoteConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.request(consumerInfo1);
+
+        // Start a normal producer on a remote broker
+        StubConnection connection2 = createRemoteConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.request(producerInfo2);
+
+        // Send a message to make sure the basics are working
+        connection2.request(createMessage(producerInfo2, destination, DeliveryMode.PERSISTENT));
+
+        Message m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNoMessagesLeft(connection1);
+
+        connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+
+        // Send a message to sit on the broker while we mess with it
+        connection2.request(createMessage(producerInfo2, destination, DeliveryMode.PERSISTENT));
+
+        // Now we're going to resend the same consumer commands again and see if the broker
+        // can handle it.
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.request(consumerInfo1);
+
+        // After this there should be 2 messages on the broker...
+        connection2.request(createMessage(producerInfo2, destination, DeliveryMode.PERSISTENT));
+
+        // ... let's start a fresh consumer...
+        connection1.stop();
+        StubConnection connection3 = createRemoteConnection();
+        ConnectionInfo connectionInfo3 = createConnectionInfo();
+        SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3);
+        ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo3, destination);
+        connection3.send(connectionInfo3);
+        connection3.send(sessionInfo3);
+        connection3.request(consumerInfo3);
+
+        // ... and then grab the 2 that should be there.
+        assertNotNull(receiveMessage(connection3));
+        assertNotNull(receiveMessage(connection3));
+        assertNoMessagesLeft(connection3);
+    }
+
+    protected String getRemoteURI() {
+        return remoteURI;
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL



Mime
View raw message