activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6027
Date Mon, 09 Nov 2015 20:29:04 GMT
https://issues.apache.org/jira/browse/AMQ-6027

Adding support for consumers on virtual destinations to create network
demand. This behavior is turned off by default but can be enabled.

For example, if a consumer comes online for a queue that is part of a
VirtualTopic, this will cause a network of brokers to forward messages
because a demand subscription will be created. Same for if a consumer
comes online for a forwarded destination from a composite
destination.

There is also an option to enable flow based on the existence of a
virtual destination if the virtual destination is forwarding to a
Queue.

Full configuration instructions for this feature will be on the wiki page.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cc81680e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cc81680e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cc81680e

Branch: refs/heads/master
Commit: cc81680e10e5c7140ec3e28091df23e9d3c3233b
Parents: 480b3e7
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Tue Oct 20 18:15:30 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Mon Nov 9 20:07:43 2015 +0000

----------------------------------------------------------------------
 .../activemq/advisory/AdvisoryBroker.java       |  267 ++++
 ...tinationFilterVirtualDestinationMatcher.java |   53 +
 .../advisory/VirtualDestinationMatcher.java     |   29 +
 .../java/org/apache/activemq/broker/Broker.java |    5 +
 .../apache/activemq/broker/BrokerFilter.java    |   13 +
 .../apache/activemq/broker/BrokerService.java   |   35 +
 .../org/apache/activemq/broker/EmptyBroker.java |   11 +
 .../org/apache/activemq/broker/ErrorBroker.java |   13 +
 .../activemq/broker/MutableBrokerFilter.java    |   13 +
 .../region/virtual/CompositeDestination.java    |   41 +
 .../broker/region/virtual/CompositeQueue.java   |    5 +
 .../broker/region/virtual/CompositeTopic.java   |    5 +
 .../broker/region/virtual/VirtualTopic.java     |   49 +
 .../network/DemandForwardingBridgeSupport.java  |    3 +-
 .../network/NetworkBridgeConfiguration.java     |   28 +-
 .../activemq/advisory/AdvisorySupport.java      |   31 +
 .../activemq/command/NetworkBridgeFilter.java   |    8 +-
 .../plugin/UpdateVirtualDestinationsTask.java   |   48 +
 activemq-unit-tests/pom.xml                     |    4 +
 .../network/VirtualConsumerDemandTest.java      | 1418 ++++++++++++++++++
 20 files changed, 2076 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 36f5f0b..bc5f105 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -18,6 +18,7 @@ package org.apache.activemq.advisory;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -40,6 +41,7 @@ import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -55,6 +57,7 @@ import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.SessionId;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.usage.Usage;
@@ -78,6 +81,22 @@ public class AdvisoryBroker extends BrokerFilter {
     private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
     protected final Map<ConsumerId, ConsumerInfo> consumers = new LinkedHashMap<ConsumerId, ConsumerInfo>();
 
+    /**
+     * This is a set to track all of the virtual destinations that have been added to the broker so
+     * they can be easily referenced later.
+     */
+    protected final Set<VirtualDestination> virtualDestinations = Collections.newSetFromMap(new ConcurrentHashMap<VirtualDestination, Boolean>());
+    /**
+     * This is a map to track all consumers that exist on the virtual destination so that we can fire
+     * an advisory later when they go away to remove the demand.
+     */
+    protected final ConcurrentMap<ConsumerInfo, VirtualDestination> virtualDestinationConsumers = new ConcurrentHashMap<>();
+    /**
+     * This is a map to track unique demand for the existence of a virtual destination so we make sure
+     * we don't send duplicate advisories.
+     */
+    protected final ConcurrentMap<VirtualConsumerPair, ConsumerInfo> brokerConsumerDests = new ConcurrentHashMap<>();
+
     protected final ConcurrentMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
     protected final ConcurrentMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
     protected final ConcurrentMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
@@ -85,6 +104,8 @@ public class AdvisoryBroker extends BrokerFilter {
 
     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
 
+    private VirtualDestinationMatcher virtualDestinationMatcher = new DestinationFilterVirtualDestinationMatcher();
+
     public AdvisoryBroker(Broker next) {
         super(next);
         advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
@@ -112,6 +133,15 @@ public class AdvisoryBroker extends BrokerFilter {
             consumersLock.writeLock().lock();
             try {
                 consumers.put(info.getConsumerId(), info);
+
+                //check if this is a consumer on a destination that matches a virtual destination
+                if (getBrokerService().isUseVirtualDestSubs()) {
+                    for (VirtualDestination virtualDestination : virtualDestinations) {
+                        if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) {
+                            fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination);
+                        }
+                    }
+                }
             } finally {
                 consumersLock.writeLock().unlock();
             }
@@ -171,6 +201,15 @@ public class AdvisoryBroker extends BrokerFilter {
                 }
             }
 
+            // Replay the virtual destination consumers.
+            if (AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
+                for (Iterator<ConsumerInfo> iter = virtualDestinationConsumers.keySet().iterator(); iter.hasNext(); ) {
+                    ConsumerInfo key = iter.next();
+                    ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(key.getDestination());
+                    fireConsumerAdvisory(context, key.getDestination(), topic, key);
+              }
+            }
+
             // Replay network bridges
             if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) {
                 for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext(); ) {
@@ -199,6 +238,16 @@ public class AdvisoryBroker extends BrokerFilter {
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception {
         Destination answer = super.addDestination(context, destination, create);
         if (!AdvisorySupport.isAdvisoryTopic(destination)) {
+            //for queues, create demand if isUseVirtualDestSubsOnCreation is true
+            if (getBrokerService().isUseVirtualDestSubsOnCreation() && destination.isQueue()) {
+                //check if this new destination matches a virtual destination that exists
+                for (VirtualDestination virtualDestination : virtualDestinations) {
+                    if (virtualDestinationMatcher.matches(virtualDestination, destination)) {
+                        fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination);
+                    }
+                }
+            }
+
             DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
             DestinationInfo previous = destinations.putIfAbsent(destination, info);
             if (previous == null) {
@@ -228,6 +277,28 @@ public class AdvisoryBroker extends BrokerFilter {
         super.removeDestination(context, destination, timeout);
         DestinationInfo info = destinations.remove(destination);
         if (info != null) {
+
+            //on destination removal, remove all demand if using virtual dest subs
+            if (getBrokerService().isUseVirtualDestSubs()) {
+                for (ConsumerInfo consumerInfo : virtualDestinationConsumers.keySet()) {
+                    //find all consumers for this virtual destination
+                    VirtualDestination virtualDestination = virtualDestinationConsumers.get(consumerInfo);
+
+                    //find a consumer that matches this virtualDest and destination
+                    if (virtualDestinationMatcher.matches(virtualDestination, destination)) {
+                        //in case of multiple matches
+                        VirtualConsumerPair key = new VirtualConsumerPair(virtualDestination, destination);
+                        ConsumerInfo i = brokerConsumerDests.get(key);
+                        if (consumerInfo.equals(i)) {
+                            if (brokerConsumerDests.remove(key) != null) {
+                                fireVirtualDestinationRemoveAdvisory(context, consumerInfo);
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+
             // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate
             info = info.copy();
             info.setDestination(destination);
@@ -285,6 +356,11 @@ public class AdvisoryBroker extends BrokerFilter {
             consumersLock.writeLock().lock();
             try {
                 consumers.remove(info.getConsumerId());
+
+                //remove the demand for this consumer if it matches a virtual destination
+                if(getBrokerService().isUseVirtualDestSubs()) {
+                    fireVirtualDestinationRemoveAdvisory(context, info);
+                }
             } finally {
                 consumersLock.writeLock().unlock();
             }
@@ -467,6 +543,140 @@ public class AdvisoryBroker extends BrokerFilter {
         }
     }
 
+    private final IdGenerator connectionIdGenerator = new IdGenerator("advisory");
+    private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
+    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
+
+    @Override
+    public void virtualDestinationAdded(ConnectionContext context,
+            VirtualDestination virtualDestination) {
+        super.virtualDestinationAdded(context, virtualDestination);
+
+        if (virtualDestinations.add(virtualDestination)) {
+            try {
+                // Don't advise advisory topics.
+                if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) {
+
+                    //create demand for consumers on virtual destinations
+                    consumersLock.readLock().lock();
+                    try {
+                        //loop through existing destinations to see if any match this newly
+                        //created virtual destination
+                        if (getBrokerService().isUseVirtualDestSubsOnCreation()) {
+                            //for matches that are a queue, fire an advisory for demand
+                            for (ActiveMQDestination destination : destinations.keySet()) {
+                                if(destination.isQueue()) {
+                                    if (virtualDestinationMatcher.matches(virtualDestination, destination)) {
+                                        fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination);
+                                    }
+                                }
+                            }
+                        }
+
+                        //loop through existing consumers to see if any of them are consuming on a destination
+                        //that matches the new virtual destination
+                        for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) {
+                            ConsumerInfo info = iter.next();
+                            if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) {
+                                fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination);
+                            }
+                        }
+                    } finally {
+                        consumersLock.readLock().unlock();
+                    }
+                }
+            } catch (Exception e) {
+                handleFireFailure("virtualDestinationAdded", e);
+            }
+        }
+    }
+
+    private void fireVirtualDestinationAddAdvisory(ConnectionContext context, ConsumerInfo info, ActiveMQDestination activeMQDest,
+            VirtualDestination virtualDestination) throws Exception {
+        //if no consumer info, we need to create one - this is the case when an advisory is fired
+        //because of the existence of a destination matching a virtual destination
+        if (info == null) {
+            ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
+            SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId());
+            ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+
+            info = new ConsumerInfo(consumerId);
+
+            //store the virtual destination and the activeMQDestination as a pair so that we can keep track
+            //of all matching forwarded destinations that caused demand
+            if(brokerConsumerDests.putIfAbsent(new VirtualConsumerPair(virtualDestination, activeMQDest), info) == null) {
+                info.setDestination(virtualDestination.getVirtualDestination());
+                ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
+
+                if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
+                    fireConsumerAdvisory(context, info.getDestination(), topic, info);
+                }
+            }
+        //this is the case of a real consumer coming online
+        } else {
+            info = info.copy();
+            info.setDestination(virtualDestination.getVirtualDestination());
+            ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
+
+            if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
+                fireConsumerAdvisory(context, info.getDestination(), topic, info);
+            }
+        }
+    }
+
+    @Override
+    public void virtualDestinationRemoved(ConnectionContext context,
+            VirtualDestination virtualDestination) {
+        super.virtualDestinationRemoved(context, virtualDestination);
+
+        if (virtualDestinations.remove(virtualDestination)) {
+            try {
+                consumersLock.readLock().lock();
+                try {
+                    // remove the demand created by the addition of the virtual destination
+                    if (getBrokerService().isUseVirtualDestSubsOnCreation()) {
+                        if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) {
+                            for (ConsumerInfo info : virtualDestinationConsumers.keySet()) {
+                                //find all consumers for this virtual destination
+                                if (virtualDestinationConsumers.get(info).equals(virtualDestination)) {
+                                    fireVirtualDestinationRemoveAdvisory(context, info);
+                                }
+
+                                //check consumers created for the existence of a destination to see if they
+                                //match the consumerinfo and clean up
+                                for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) {
+                                    ConsumerInfo i = brokerConsumerDests.get(activeMQDest);
+                                    if (info.equals(i)) {
+                                        brokerConsumerDests.remove(activeMQDest);
+                                    }
+                                }
+                            }
+                        }
+                    }
+                } finally {
+                    consumersLock.readLock().unlock();
+                }
+            } catch (Exception e) {
+                handleFireFailure("virtualDestinationAdded", e);
+            }
+        }
+    }
+
+    private void fireVirtualDestinationRemoveAdvisory(ConnectionContext context,
+            ConsumerInfo info) throws Exception {
+
+        VirtualDestination virtualDestination = virtualDestinationConsumers.remove(info);
+        if (virtualDestination != null) {
+            ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(virtualDestination.getVirtualDestination());
+
+            ActiveMQDestination dest = info.getDestination();
+
+            if (!dest.isTemporary() || destinations.containsKey(dest)) {
+                fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand());
+            }
+        }
+    }
+
     @Override
     public void isFull(ConnectionContext context, Destination destination, Usage usage) {
         super.isFull(context, destination, usage);
@@ -681,4 +891,61 @@ public class AdvisoryBroker extends BrokerFilter {
     public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() {
         return destinations;
     }
+
+    private class VirtualConsumerPair {
+        private final VirtualDestination virtualDestination;
+
+        //destination that matches this virtualDestination as part target
+        //this is so we can keep track of more than one destination that might
+        //match the virtualDestination and cause demand
+        private final ActiveMQDestination activeMQDestination;
+
+        public VirtualConsumerPair(VirtualDestination virtualDestination,
+                ActiveMQDestination activeMQDestination) {
+            super();
+            this.virtualDestination = virtualDestination;
+            this.activeMQDestination = activeMQDestination;
+        }
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + getOuterType().hashCode();
+            result = prime
+                    * result
+                    + ((activeMQDestination == null) ? 0 : activeMQDestination
+                            .hashCode());
+            result = prime
+                    * result
+                    + ((virtualDestination == null) ? 0 : virtualDestination
+                            .hashCode());
+            return result;
+        }
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            VirtualConsumerPair other = (VirtualConsumerPair) obj;
+            if (!getOuterType().equals(other.getOuterType()))
+                return false;
+            if (activeMQDestination == null) {
+                if (other.activeMQDestination != null)
+                    return false;
+            } else if (!activeMQDestination.equals(other.activeMQDestination))
+                return false;
+            if (virtualDestination == null) {
+                if (other.virtualDestination != null)
+                    return false;
+            } else if (!virtualDestination.equals(other.virtualDestination))
+                return false;
+            return true;
+        }
+        private AdvisoryBroker getOuterType() {
+            return AdvisoryBroker.this;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/advisory/DestinationFilterVirtualDestinationMatcher.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/DestinationFilterVirtualDestinationMatcher.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/DestinationFilterVirtualDestinationMatcher.java
new file mode 100644
index 0000000..5c57cf0
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/DestinationFilterVirtualDestinationMatcher.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import org.apache.activemq.broker.region.virtual.CompositeDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.filter.DestinationFilter;
+
+/**
+ * This class will use a destination filter to see if the activeMQ destination matches
+ * the given virtual destination
+ *
+ */
+public class DestinationFilterVirtualDestinationMatcher implements VirtualDestinationMatcher {
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.advisory.VirtualDestinationMatcher#matches(org.apache.activemq.broker.region.virtual.VirtualDestination)
+     */
+    @Override
+    public boolean matches(VirtualDestination virtualDestination, ActiveMQDestination activeMQDest) {
+        if (virtualDestination instanceof CompositeDestination) {
+            DestinationFilter filter = DestinationFilter.parseFilter(virtualDestination.getMappedDestinations());
+            if (filter.matches(activeMQDest)) {
+                return true;
+            }
+        } else if (virtualDestination instanceof VirtualTopic) {
+            DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
+            if (filter.matches(activeMQDest)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/advisory/VirtualDestinationMatcher.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/VirtualDestinationMatcher.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/VirtualDestinationMatcher.java
new file mode 100644
index 0000000..571a311
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/VirtualDestinationMatcher.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ *
+ *
+ */
+public interface VirtualDestinationMatcher {
+
+    public boolean matches(VirtualDestination virtualDestination, ActiveMQDestination activeMQDest);
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
index fa8e4fd..87cb3bc 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
@@ -26,6 +26,7 @@ import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Region;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -385,6 +386,10 @@ public interface Broker extends Region, Service {
      */
     void isFull(ConnectionContext context,Destination destination,Usage usage);
 
+    void virtualDestinationAdded(ConnectionContext context, VirtualDestination virtualDestination);
+
+    void virtualDestinationRemoved(ConnectionContext context, VirtualDestination virtualDestination);
+
     /**
      *  called when the broker becomes the master in a master/slave
      *  configuration

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
index 132b46d..2a8ae71 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -360,6 +361,18 @@ public class BrokerFilter implements Broker {
     }
 
     @Override
+    public void virtualDestinationAdded(ConnectionContext context,
+            VirtualDestination virtualDestination) {
+        next.virtualDestinationAdded(context, virtualDestination);
+    }
+
+    @Override
+    public void virtualDestinationRemoved(ConnectionContext context,
+            VirtualDestination virtualDestination) {
+        next.virtualDestinationRemoved(context, virtualDestination);
+    }
+
+    @Override
     public void nowMasterBroker() {
         next.nowMasterBroker();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 61a4cef..5e7dd97 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -203,6 +203,15 @@ public class BrokerService implements Service {
     private boolean useVirtualTopics = true;
     private boolean useMirroredQueues = false;
     private boolean useTempMirroredQueues = true;
+    /**
+     * Whether or not virtual destination subscriptions should cause network demand
+     */
+    private boolean useVirtualDestSubs = false;
+    /**
+     * Whether or no the creation of destinations that match virtual destinations
+     * should cause network demand
+     */
+    private boolean useVirtualDestSubsOnCreation = false;
     private BrokerId brokerId;
     private volatile DestinationInterceptor[] destinationInterceptors;
     private ActiveMQDestination[] destinations;
@@ -2699,6 +2708,14 @@ public class BrokerService implements Service {
                             if (virtualDestination instanceof VirtualTopic) {
                                 consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
                             }
+                            if (isUseVirtualDestSubs()) {
+                                try {
+                                    broker.virtualDestinationAdded(getAdminConnectionContext(), virtualDestination);
+                                    LOG.debug("Adding virtual destination: {}", virtualDestination);
+                                } catch (Exception e) {
+                                    LOG.warn("Could not fire virtual destination consumer advisory", e);
+                                }
+                            }
                         }
                     }
                 }
@@ -3133,4 +3150,22 @@ public class BrokerService implements Service {
     public void setRejectDurableConsumers(boolean rejectDurableConsumers) {
         this.rejectDurableConsumers = rejectDurableConsumers;
     }
+
+    public boolean isUseVirtualDestSubs() {
+        return useVirtualDestSubs;
+    }
+
+    public void setUseVirtualDestSubs(
+            boolean useVirtualDestSubs) {
+        this.useVirtualDestSubs = useVirtualDestSubs;
+    }
+
+    public boolean isUseVirtualDestSubsOnCreation() {
+        return useVirtualDestSubsOnCreation;
+    }
+
+    public void setUseVirtualDestSubsOnCreation(
+            boolean useVirtualDestSubsOnCreation) {
+        this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
index 8185554..c4059a0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -346,6 +347,16 @@ public class EmptyBroker implements Broker {
     }
 
     @Override
+    public void virtualDestinationAdded(ConnectionContext context,
+            VirtualDestination virtualDestination) {
+    }
+
+    @Override
+    public void virtualDestinationRemoved(ConnectionContext context,
+            VirtualDestination virtualDestination) {
+    }
+
+    @Override
     public void nowMasterBroker() {
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
index ae42141..35501e3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -362,6 +363,18 @@ public class ErrorBroker implements Broker {
     }
 
     @Override
+    public void virtualDestinationAdded(ConnectionContext context,
+            VirtualDestination virtualDestination) {
+        throw new BrokerStoppedException(this.message);
+    }
+
+    @Override
+    public void virtualDestinationRemoved(ConnectionContext context,
+            VirtualDestination virtualDestination) {
+        throw new BrokerStoppedException(this.message);
+    }
+
+    @Override
     public void nowMasterBroker() {
         throw new BrokerStoppedException(this.message);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
index 2eea2e8..6306325 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -371,6 +372,18 @@ public class MutableBrokerFilter implements Broker {
     }
 
     @Override
+    public void virtualDestinationAdded(ConnectionContext context,
+            VirtualDestination virtualDestination) {
+        getNext().virtualDestinationAdded(context, virtualDestination);
+    }
+
+    @Override
+    public void virtualDestinationRemoved(ConnectionContext context,
+            VirtualDestination virtualDestination) {
+        getNext().virtualDestinationRemoved(context, virtualDestination);
+    }
+
+    @Override
     public void nowMasterBroker() {
        getNext().nowMasterBroker();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
index 5658839..1b976c0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
@@ -143,4 +143,45 @@ public abstract class CompositeDestination implements VirtualDestination {
             }
         };
     }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (concurrentSend ? 1231 : 1237);
+        result = prime * result + (copyMessage ? 1231 : 1237);
+        result = prime * result + (forwardOnly ? 1231 : 1237);
+        result = prime * result
+                + ((forwardTo == null) ? 0 : forwardTo.hashCode());
+        result = prime * result + ((name == null) ? 0 : name.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        CompositeDestination other = (CompositeDestination) obj;
+        if (concurrentSend != other.concurrentSend)
+            return false;
+        if (copyMessage != other.copyMessage)
+            return false;
+        if (forwardOnly != other.forwardOnly)
+            return false;
+        if (forwardTo == null) {
+            if (other.forwardTo != null)
+                return false;
+        } else if (!forwardTo.equals(other.forwardTo))
+            return false;
+        if (name == null) {
+            if (other.name != null)
+                return false;
+        } else if (!name.equals(other.name))
+            return false;
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java
index 1b0f75d..d253d9f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java
@@ -38,4 +38,9 @@ public class CompositeQueue extends CompositeDestination {
         // nothing to do for mapped destinations
         return destination;
     }
+
+    @Override
+    public String toString() {
+        return "CompositeQueue [" + getName() + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java
index 667a80c..9b817d0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java
@@ -41,4 +41,9 @@ public class CompositeTopic extends CompositeDestination {
         }
         return destination;
     }
+
+    @Override
+    public String toString() {
+        return "CompositeTopic [" + getName() + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
index 14ea3fe..7049ccb 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
@@ -194,4 +194,53 @@ public class VirtualTopic implements VirtualDestination {
     public void setTransactedSend(boolean transactedSend) {
         this.transactedSend = transactedSend;
     }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (concurrentSend ? 1231 : 1237);
+        result = prime * result + (local ? 1231 : 1237);
+        result = prime * result + ((name == null) ? 0 : name.hashCode());
+        result = prime * result + ((postfix == null) ? 0 : postfix.hashCode());
+        result = prime * result + ((prefix == null) ? 0 : prefix.hashCode());
+        result = prime * result + (selectorAware ? 1231 : 1237);
+        result = prime * result + (transactedSend ? 1231 : 1237);
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        VirtualTopic other = (VirtualTopic) obj;
+        if (concurrentSend != other.concurrentSend)
+            return false;
+        if (local != other.local)
+            return false;
+        if (name == null) {
+            if (other.name != null)
+                return false;
+        } else if (!name.equals(other.name))
+            return false;
+        if (postfix == null) {
+            if (other.postfix != null)
+                return false;
+        } else if (!postfix.equals(other.postfix))
+            return false;
+        if (prefix == null) {
+            if (other.prefix != null)
+                return false;
+        } else if (!prefix.equals(other.prefix))
+            return false;
+        if (selectorAware != other.selectorAware)
+            return false;
+        if (transactedSend != other.transactedSend)
+            return false;
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index ad6fd61..fac39ac 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -1352,7 +1352,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
     }
 
     protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
-        if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
+        if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) ||
+                AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
             sub.getLocalInfo().setDispatchAsync(true);
         } else {
             sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index 3a59f30..09127a1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -29,6 +29,7 @@ import org.apache.activemq.command.ConsumerInfo;
 public class NetworkBridgeConfiguration {
 
     private boolean conduitSubscriptions = true;
+    private boolean useVirtualDestSubs;
     private boolean dynamicOnly;
     private boolean dispatchAsync = true;
     private boolean decreaseNetworkConsumerPriority;
@@ -237,11 +238,27 @@ public class NetworkBridgeConfiguration {
                         filter.append(".");
                         filter.append(destination.getPhysicalName());
                         delimiter = ",";
+
+                        if (useVirtualDestSubs) {
+                            filter.append(delimiter);
+                            filter.append(AdvisorySupport.VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX);
+                            filter.append(destination.getDestinationTypeAsString());
+                            filter.append(".");
+                            filter.append(destination.getPhysicalName());
+                        }
                     }
                 }
                 return filter.toString();
             }   else {
-                return AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">";
+                StringBuffer filter = new StringBuffer();
+                filter.append(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX);
+                filter.append(">");
+                if (useVirtualDestSubs) {
+                    filter.append(",");
+                    filter.append(AdvisorySupport.VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX);
+                    filter.append(">");
+                }
+                return filter.toString();
             }
         } else {
             // prepend consumer advisory prefix
@@ -449,4 +466,13 @@ public class NetworkBridgeConfiguration {
         this.checkDuplicateMessagesOnDuplex = checkDuplicateMessagesOnDuplex;
     }
 
+    public boolean isUseVirtualDestSus() {
+        return useVirtualDestSubs;
+    }
+
+    public void setUseVirtualDestSubs(
+            boolean useVirtualDestSubs) {
+        this.useVirtualDestSubs = useVirtualDestSubs;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
index b26c600..ac3ee03 100755
--- a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
@@ -37,8 +37,11 @@ public final class AdvisorySupport {
     public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
     public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
     public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
+    public static final String VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "VirtualDestination.Consumer.";
     public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
     public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
+    public static final String QUEUE_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
+    public static final String TOPIC_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
     public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic.";
     public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
     public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
@@ -116,6 +119,16 @@ public final class AdvisorySupport {
         return getAdvisoryTopic(destination, prefix, true);
     }
 
+    public static ActiveMQTopic getVirtualDestinationConsumerAdvisoryTopic(ActiveMQDestination destination) {
+        String prefix;
+        if (destination.isQueue()) {
+            prefix = QUEUE_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX;
+        } else {
+            prefix = TOPIC_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX;
+        }
+        return getAdvisoryTopic(destination, prefix, true);
+    }
+
     public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException {
         return getProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
     }
@@ -389,6 +402,24 @@ public final class AdvisorySupport {
         }
     }
 
+    public static boolean isVirtualDestinationConsumerAdvisoryTopic(Destination destination) throws JMSException {
+        return isVirtualDestinationConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
+    }
+
+    public static boolean isVirtualDestinationConsumerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isVirtualDestinationConsumerAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX);
+        }
+    }
+
     public static boolean isSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
         return isSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java b/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
index 245c098..5bd80b0 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
@@ -51,14 +51,17 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
         this.consumerInfo = consumerInfo;
     }
 
+    @Override
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
 
+    @Override
     public boolean isMarshallAware() {
         return false;
     }
 
+    @Override
     public boolean matches(MessageEvaluationContext mec) throws JMSException {
         try {
             // for Queues - the message can be acknowledged and dropped whilst
@@ -72,6 +75,7 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
         }
     }
 
+    @Override
     public Object evaluate(MessageEvaluationContext message) throws JMSException {
         return matches(message) ? Boolean.TRUE : Boolean.FALSE;
     }
@@ -125,7 +129,9 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
     }
 
     public static boolean isAdvisoryInterpretedByNetworkBridge(Message message) {
-        return AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) || AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination());
+        return AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) ||
+                AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(message.getDestination()) ||
+                AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination());
     }
 
     public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java
index cd0121c..ef7e1b5 100644
--- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java
@@ -18,16 +18,23 @@ package org.apache.activemq.plugin;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
 import org.apache.activemq.broker.region.DestinationInterceptor;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class UpdateVirtualDestinationsTask implements Runnable {
 
+    public static final Logger LOG = LoggerFactory.getLogger(UpdateVirtualDestinationsTask.class);
     private final AbstractRuntimeConfigurationBroker plugin;
 
     public UpdateVirtualDestinationsTask(
@@ -49,11 +56,52 @@ public abstract class UpdateVirtualDestinationsTask implements Runnable {
                 // update existing interceptor
                 final VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) destinationInterceptor;
 
+                Set<VirtualDestination> existingVirtualDests = new HashSet<>();
+                Collections.addAll(existingVirtualDests, virtualDestinationInterceptor.getVirtualDestinations());
+
+                Set<VirtualDestination> newVirtualDests = new HashSet<>();
+                Collections.addAll(newVirtualDests, getVirtualDestinations());
+
+                Set<VirtualDestination> addedVirtualDests = new HashSet<>();
+                Set<VirtualDestination> removedVirtualDests = new HashSet<>();
+                //detect new virtual destinations
+                for (VirtualDestination newVirtualDest : newVirtualDests) {
+                    if (!existingVirtualDests.contains(newVirtualDest)) {
+                        addedVirtualDests.add(newVirtualDest);
+                    }
+                }
+                //detect removed virtual destinations
+                for (VirtualDestination existingVirtualDest : existingVirtualDests) {
+                    if (!newVirtualDests.contains(existingVirtualDest)) {
+                        removedVirtualDests.add(existingVirtualDest);
+                    }
+                }
+
                 virtualDestinationInterceptor
                         .setVirtualDestinations(getVirtualDestinations());
                 plugin.info("applied updates to: "
                         + virtualDestinationInterceptor);
                 updatedExistingInterceptor = true;
+
+                ConnectionContext connectionContext;
+                try {
+                    connectionContext = plugin.getBrokerService().getAdminConnectionContext();
+                    //signal updates
+                    if (plugin.getBrokerService().isUseVirtualDestSubs()) {
+                        for (VirtualDestination removedVirtualDest : removedVirtualDests) {
+                            plugin.virtualDestinationRemoved(connectionContext, removedVirtualDest);
+                            LOG.info("Removing virtual destination: {}", removedVirtualDest);
+                        }
+
+                        for (VirtualDestination addedVirtualDest : addedVirtualDests) {
+                            plugin.virtualDestinationAdded(connectionContext, addedVirtualDest);
+                            LOG.info("Adding virtual destination: {}", addedVirtualDest);
+                        }
+                    }
+
+                } catch (Exception e) {
+                    LOG.warn("Could not process virtual destination advisories", e);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index 5df7fcf..e03b246 100755
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -71,6 +71,10 @@
       <artifactId>activemq-partition</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-runtime-config</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.geronimo.specs</groupId>
       <artifactId>geronimo-jms_1.1_spec</artifactId>
     </dependency>


Mime
View raw message