activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1214964 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory: AdvisoryBroker.java AdvisorySupport.java
Date Thu, 15 Dec 2011 21:49:44 GMT
Author: tabish
Date: Thu Dec 15 21:49:44 2011
New Revision: 1214964

URL: http://svn.apache.org/viewvc?rev=1214964&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3615

Reduce contention by not sending an advisory for every destination when not all destination
types are requested.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1214964&r1=1214963&r2=1214964&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Thu Dec 15 21:49:44 2011
@@ -40,8 +40,8 @@ import org.slf4j.LoggerFactory;
 /**
  * This broker filter handles tracking the state of the broker for purposes of
  * publishing advisory messages to advisory consumers.
- * 
- * 
+ *
+ *
  */
 public class AdvisoryBroker extends BrokerFilter {
 
@@ -54,9 +54,9 @@ public class AdvisoryBroker extends Brok
     protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations
= new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
     protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges =
new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
     protected final ProducerId advisoryProducerId = new ProducerId();
-    
+
     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
-    
+
     public AdvisoryBroker(Broker next) {
         super(next);
         advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
@@ -78,7 +78,7 @@ public class AdvisoryBroker extends Brok
     @Override
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws
Exception {
         Subscription answer = super.addConsumer(context, info);
-        
+
         // Don't advise advisory topics.
         if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
@@ -96,15 +96,23 @@ public class AdvisoryBroker extends Brok
                 }
             }
 
-            // We need to replay all the previously collected destination
-            // objects
-            // for this newly added consumer.
-            if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) {
-                // Replay the destinations.
-                for (Iterator<DestinationInfo> iter = destinations.values().iterator();
iter.hasNext();) {
-                    DestinationInfo value = iter.next();
-                    ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(value.getDestination());
-                    fireAdvisory(context, topic, value, info.getConsumerId());
+            // We check here whether the Destination is Temporary Destination specific or
not since we
+            // can avoid sending advisory messages to the consumer if it only wants Temporary
Destination
+            // notifications.  If its not just temporary destination related destinations
then we have
+            // to send them all, a composite destination could want both.
+            if (AdvisorySupport.isTempDestinationAdvisoryTopic(info.getDestination())) {
+                // Replay the temporary destinations.
+                for (DestinationInfo destination : destinations.values()) {
+                    if (destination.getDestination().isTemporary()) {
+                        ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
+                        fireAdvisory(context, topic, destination, info.getConsumerId());
+                    }
+                }
+            } else if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination()))
{
+                // Replay all the destinations.
+                for (DestinationInfo destination : destinations.values()) {
+                    ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
+                    fireAdvisory(context, topic, destination, info.getConsumerId());
                 }
             }
 
@@ -191,7 +199,7 @@ public class AdvisoryBroker extends Brok
             fireAdvisory(context, topic, info);
             try {
                 next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()),
-1);
-            } catch (Exception expectedIfDestinationDidNotExistYet) {                
+            } catch (Exception expectedIfDestinationDidNotExistYet) {
             }
             try {
                 next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()),
-1);
@@ -203,7 +211,7 @@ public class AdvisoryBroker extends Brok
 
     @Override
     public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo)
throws Exception {
-        super.removeDestinationInfo(context, destInfo);   
+        super.removeDestinationInfo(context, destInfo);
         DestinationInfo info = destinations.remove(destInfo.getDestination());
         if (info != null) {
             // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so
duplicate
@@ -243,7 +251,7 @@ public class AdvisoryBroker extends Brok
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
             consumers.remove(info.getConsumerId());
             if (!dest.isTemporary() || destinations.containsKey(dest)) {
-            	fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
+                fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
             }
         }
     }
@@ -279,7 +287,7 @@ public class AdvisoryBroker extends Brok
             handleFireFailure("expired", e);
         }
     }
-    
+
     @Override
     public void messageConsumed(ConnectionContext context, MessageReference messageReference)
{
         super.messageConsumed(context, messageReference);
@@ -294,7 +302,7 @@ public class AdvisoryBroker extends Brok
             handleFireFailure("consumed", e);
         }
     }
-    
+
     @Override
     public void messageDelivered(ConnectionContext context, MessageReference messageReference)
{
         super.messageDelivered(context, messageReference);
@@ -309,7 +317,7 @@ public class AdvisoryBroker extends Brok
             handleFireFailure("delivered", e);
         }
     }
-    
+
     @Override
     public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference
messageReference) {
         super.messageDiscarded(context, sub, messageReference);
@@ -329,7 +337,7 @@ public class AdvisoryBroker extends Brok
             handleFireFailure("discarded", e);
         }
     }
-    
+
     @Override
     public void slowConsumer(ConnectionContext context, Destination destination,Subscription
subs) {
         super.slowConsumer(context, destination,subs);
@@ -342,7 +350,7 @@ public class AdvisoryBroker extends Brok
             handleFireFailure("slow consumer", e);
         }
     }
-    
+
     @Override
     public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
         super.fastProducer(context, producerInfo);
@@ -355,7 +363,7 @@ public class AdvisoryBroker extends Brok
             handleFireFailure("fast producer", e);
         }
     }
-    
+
     @Override
     public void isFull(ConnectionContext context, Destination destination, Usage usage) {
         super.isFull(context, destination, usage);
@@ -372,13 +380,13 @@ public class AdvisoryBroker extends Brok
             }
         }
     }
-    
+
     @Override
-    public void nowMasterBroker() {   
+    public void nowMasterBroker() {
         super.nowMasterBroker();
         try {
             ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic();
-            ActiveMQMessage advisoryMessage = new ActiveMQMessage();                    
  
+            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
             ConnectionContext context = new ConnectionContext();
             context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
             context.setBroker(getBrokerService().getBroker());
@@ -387,7 +395,7 @@ public class AdvisoryBroker extends Brok
             handleFireFailure("now master broker", e);
         }
     }
-    
+
     @Override
     public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
                                       Subscription subscription){
@@ -401,7 +409,7 @@ public class AdvisoryBroker extends Brok
             }
         } catch (Exception e) {
             handleFireFailure("add to DLQ", e);
-        } 
+        }
     }
 
     @Override
@@ -476,7 +484,7 @@ public class AdvisoryBroker extends Brok
             }
         }
         advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count);
-        
+
         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
     }
 
@@ -505,13 +513,13 @@ public class AdvisoryBroker extends Brok
             advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME,
getBrokerName());
             String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
             advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID,
id);
-            
+
             String url = getBrokerService().getVmConnectorURI().toString();
             if (getBrokerService().getDefaultSocketURIString() != null) {
                 url = getBrokerService().getDefaultSocketURIString();
             }
             advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL,
url);
-            
+
             //set the data structure
             advisoryMessage.setDataStructure(command);
             advisoryMessage.setPersistent(false);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=1214964&r1=1214963&r2=1214964&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
Thu Dec 15 21:49:44 2011
@@ -59,7 +59,10 @@ public final class AdvisorySupport {
     public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
     public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount";
     public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount";
-    
+
+    public static final ActiveMQTopic ALL_DESTINATIONS_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
+            TOPIC_ADVISORY_TOPIC.getPhysicalName() + "," + QUEUE_ADVISORY_TOPIC.getPhysicalName()
+ "," +
+            TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
     public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
             TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
     private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
@@ -187,7 +190,7 @@ public final class AdvisorySupport {
                 + destination.getPhysicalName();
         return new ActiveMQTopic(name);
     }
-    
+
     public static ActiveMQTopic getMessageDLQdAdvisoryTopic(ActiveMQDestination destination)
{
         String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.getDestinationTypeAsString()
+ "."
                 + destination.getPhysicalName();
@@ -239,6 +242,20 @@ public final class AdvisorySupport {
         return isDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
     }
 
+    public static boolean isTempDestinationAdvisoryTopic(ActiveMQDestination destination)
{
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (!isTempDestinationAdvisoryTopic(compositeDestinations[i])) {
+                    return false;
+                }
+            }
+            return true;
+        } else {
+            return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC);
+        }
+    }
+
     public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) {
         if (destination.isComposite()) {
             ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();



Mime
View raw message