activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r647872 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: advisory/AdvisoryBroker.java broker/region/AbstractRegion.java broker/region/TopicRegion.java
Date Mon, 14 Apr 2008 16:02:16 GMT
Author: rajdavies
Date: Mon Apr 14 09:02:12 2008
New Revision: 647872

URL: http://svn.apache.org/viewvc?rev=647872&view=rev
Log:
Fix for:
https://issues.apache.org/activemq/browse/AMQ-1255

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=647872&r1=647871&r2=647872&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Mon Apr 14 09:02:12 2008
@@ -86,7 +86,7 @@
         if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
             consumers.put(info.getConsumerId(), info);
-            fireConsumerAdvisory(context,info.getDestination(), topic, info);
+            fireConsumerAdvisory(context, info.getDestination(), topic, info);
         } else {
             // We need to replay all the previously collected state objects
             // for this newly added consumer.
@@ -179,7 +179,7 @@
             fireAdvisory(context, topic, info);
             try {
                 next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()),
-1);
-            } catch (Exception expectedIfDestinationDidNotExistYet) {
+            } catch (Exception expectedIfDestinationDidNotExistYet) {                
             }
             try {
                 next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()),
-1);
@@ -190,7 +190,7 @@
     }
 
     public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo)
throws Exception {
-        next.removeDestinationInfo(context, destInfo);
+        next.removeDestinationInfo(context, destInfo);   
         DestinationInfo info = destinations.remove(destInfo.getDestination());
         if (info != null) {
             info.setDestination(destInfo.getDestination());
@@ -203,6 +203,7 @@
             }
             try {
                 next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()),
-1);
+            
             } catch (Exception expectedIfDestinationDidNotExistYet) {
             }
         }
@@ -221,10 +222,13 @@
         next.removeConsumer(context, info);
 
         // Don't advise advisory topics.
-        if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
-            ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
+        ActiveMQDestination dest = info.getDestination();
+        if (!AdvisorySupport.isAdvisoryTopic(dest)) {
+            ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
             consumers.remove(info.getConsumerId());
-            fireConsumerAdvisory(context,info.getDestination(), topic, info.createRemoveCommand());
+            if (!dest.isTemporary() || destinations.contains(dest)) {
+            	fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
+            }
         }
     }
 
@@ -232,10 +236,13 @@
         next.removeProducer(context, info);
 
         // Don't advise advisory topics.
-        if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination()))
{
-            ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
+        ActiveMQDestination dest = info.getDestination();
+        if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest))
{
+            ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest);
             producers.remove(info.getProducerId());
-            fireProducerAdvisory(context, info.getDestination(),topic, info.createRemoveCommand());
+            if (!dest.isTemporary() || destinations.contains(dest)) {
+                fireProducerAdvisory(context, dest,topic, info.createRemoveCommand());
+            }
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=647872&r1=647871&r2=647872&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Mon Apr 14 09:02:12 2008
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.broker.region;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -24,9 +23,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-
 import javax.jms.JMSException;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ConsumerBrokerExchange;
 import org.apache.activemq.broker.DestinationAlreadyExistsException;
@@ -96,18 +93,21 @@
             context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
             context.getBroker().addDestination(context, dest);
         }
-
-        for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();)
{
-            Destination dest = i.next();
-            dest.start();
+        synchronized (destinationsMutex) {
+            for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();)
{
+                Destination dest = i.next();
+                dest.start();
+            }
         }
     }
 
     public void stop() throws Exception {
         started = false;
-        for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();)
{
-            Destination dest = i.next();
-            dest.stop();
+        synchronized (destinationsMutex) {
+            for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();)
{
+                Destination dest = i.next();
+                dest.stop();
+            }
         }
         destinations.clear();
     }
@@ -169,10 +169,10 @@
         }
 
         LOG.debug("Removing destination: " + destination);
+        
         synchronized (destinationsMutex) {
             Destination dest = destinations.remove(destination);
             if (dest != null) {
-
                 // timeout<0 or we timed out, we now force any remaining
                 // subscriptions to un-subscribe.
                 for (Iterator<Subscription> iter = subscriptions.values().iterator();
iter.hasNext();) {
@@ -181,11 +181,10 @@
                         dest.removeSubscription(context, sub);
                     }
                 }
-
                 destinationMap.removeAll(destination);
                 dispose(context,dest);
 
-            } else {
+            } else {   
                 LOG.debug("Destination doesn't exist: " + dest);
             }
         }
@@ -259,9 +258,12 @@
             // so everything after this point would be leaked.
 
             // Add the subscription to all the matching queues.
-            for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();)
{
-                Destination dest = (Destination)iter.next();
-                dest.addSubscription(context, sub);
+            
+            synchronized(destinationsMutex) {
+                for (Iterator iter = destinationMap.get(info.getDestination()).iterator();
iter.hasNext();) {
+                    Destination dest = (Destination)iter.next();
+                    dest.addSubscription(context, sub);
+                }
             }
 
             if (info.isBrowser()) {
@@ -286,7 +288,9 @@
      */
     protected Set<ActiveMQDestination> getInactiveDestinations() {
         Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
-        inactiveDests.removeAll(destinations.keySet());
+        synchronized (destinationsMutex) {
+            inactiveDests.removeAll(destinations.keySet());
+        }
         return inactiveDests;
     }
 
@@ -298,10 +302,12 @@
         if (sub != null) {
 
             // remove the subscription from all the matching queues.
-            for (Iterator iter = destinationMap.get(info.getDestination())
-                    .iterator(); iter.hasNext();) {
-                Destination dest = (Destination) iter.next();
-                dest.removeSubscription(context, sub);
+            synchronized (destinationsMutex) {
+                for (Iterator iter = destinationMap.get(info.getDestination())
+                        .iterator(); iter.hasNext();) {
+                    Destination dest = (Destination) iter.next();
+                    dest.removeSubscription(context, sub);
+                }
             }
 
             destroySubscription(sub);
@@ -396,9 +402,11 @@
             Subscription sub = iter.next();
             sub.gc();
         }
-        for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();)
{
-            Destination dest = iter.next();
-            dest.gc();
+        synchronized (destinationsMutex) {
+            for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();)
{
+                Destination dest = iter.next();
+                dest.gc();
+            }
         }
     }
 
@@ -417,9 +425,11 @@
     }
     
     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{
-        for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();)
{
-            Destination dest = (Destination)iter.next();
-            dest.addProducer(context, info);
+        synchronized (destinationsMutex) {
+            for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();)
{
+                Destination dest = (Destination) iter.next();
+                dest.addProducer(context, info);
+            }
         }
     }
 
@@ -429,9 +439,11 @@
      * @throws Exception TODO
      */
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{
-        for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();)
{
-            Destination dest = (Destination)iter.next();
-            dest.removeProducer(context, info);
+        synchronized (destinationsMutex) {
+            for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();)
{
+                Destination dest = (Destination)iter.next();
+                dest.removeProducer(context, info);
+            }
         }
     }
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=647872&r1=647871&r2=647872&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
Mon Apr 14 09:02:12 2008
@@ -78,9 +78,11 @@
                 if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
                     // Remove the consumer first then add it.
                     durableSubscriptions.remove(key);
-                    for (Iterator<Destination> iter = destinations.values().iterator();
iter.hasNext();) {
-                        Topic topic = (Topic)iter.next();
-                        topic.deleteSubscription(context, key);
+                    synchronized (destinationsMutex) {
+                        for (Iterator<Destination> iter = destinations.values().iterator();
iter.hasNext();) {
+                            Topic topic = (Topic)iter.next();
+                            topic.deleteSubscription(context, key);
+                        }
                     }
                     super.removeConsumer(context, sub.getConsumerInfo());
                     super.addConsumer(context, info);
@@ -132,9 +134,11 @@
         }
 
         durableSubscriptions.remove(key);
-        for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();)
{
-            Topic topic = (Topic)iter.next();
-            topic.deleteSubscription(context, key);
+        synchronized (destinationsMutex) {
+            for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();)
{
+                Topic topic = (Topic)iter.next();
+                topic.deleteSubscription(context, key);
+            }
         }
         super.removeConsumer(context, sub.getConsumerInfo());
     }



Mime
View raw message