Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 072EB200BA3 for ; Thu, 20 Oct 2016 19:49:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 05E68160AE0; Thu, 20 Oct 2016 17:49:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7ED69160ACC for ; Thu, 20 Oct 2016 19:49:19 +0200 (CEST) Received: (qmail 75519 invoked by uid 500); 20 Oct 2016 17:49:18 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 75510 invoked by uid 99); 20 Oct 2016 17:49:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Oct 2016 17:49:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 93643E008F; Thu, 20 Oct 2016 17:49:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cshannon@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-6472 Date: Thu, 20 Oct 2016 17:49:18 +0000 (UTC) archived-at: Thu, 20 Oct 2016 17:49:21 -0000 Repository: activemq Updated Branches: refs/heads/activemq-5.14.x 77b827f46 -> 9b6b31ca2 https://issues.apache.org/jira/browse/AMQ-6472 Durable sync over a network bridge will now also sync non-durable subscriptions proplrly if the consumer belongs to a destination that is configured to force network durable subscriptions. (cherry picked from commit d206621a73e4731b00eff49518ea93ba9a4ee3c0) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9b6b31ca Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9b6b31ca Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9b6b31ca Branch: refs/heads/activemq-5.14.x Commit: 9b6b31ca2f16243ca3ffceac6ab09ac304f71c65 Parents: 77b827f Author: Christopher L. Shannon (cshannon) Authored: Thu Oct 20 13:46:27 2016 -0400 Committer: Christopher L. Shannon (cshannon) Committed: Thu Oct 20 13:49:05 2016 -0400 ---------------------------------------------------------------------- .../activemq/advisory/AdvisoryBroker.java | 4 + .../activemq/broker/TransportConnection.java | 30 +-- .../network/DemandForwardingBridgeSupport.java | 40 +--- .../activemq/network/DurableConduitBridge.java | 41 +---- .../activemq/util/NetworkBridgeUtils.java | 184 +++++++++++++++++++ .../network/DurableSyncNetworkBridgeTest.java | 138 +++++++++++++- 6 files changed, 344 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/9b6b31ca/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 5ac201e..556c149 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -893,6 +893,10 @@ public class AdvisoryBroker extends BrokerFilter { return destinations; } + public ConcurrentMap getVirtualDestinationConsumers() { + return virtualDestinationConsumers; + } + private class VirtualConsumerPair { private final VirtualDestination virtualDestination; http://git-wip-us.apache.org/repos/asf/activemq/blob/9b6b31ca/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index a05ba8c..a32d4f6 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -20,14 +20,15 @@ import java.io.EOFException; import java.io.IOException; import java.net.SocketException; import java.net.URI; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -39,10 +40,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.transaction.xa.XAResource; +import org.apache.activemq.advisory.AdvisoryBroker; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.region.ConnectionStatistics; import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerInfo; @@ -103,7 +106,7 @@ import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.MarshallingSupport; -import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter; +import org.apache.activemq.util.NetworkBridgeUtils; import org.apache.activemq.util.SubscriptionKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1378,23 +1381,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor { this.pendingStop = pendingStop; } - public static BrokerSubscriptionInfo getBrokerSubscriptionInfo(final BrokerService brokerService) { - RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); - TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); - List subscriptionInfos = new ArrayList<>(); - for (SubscriptionKey key : topicRegion.getDurableSubscriptions().keySet()) { - DurableTopicSubscription sub = topicRegion.getDurableSubscriptions().get(key); - if (sub != null) { - ConsumerInfo ci = sub.getConsumerInfo().copy(); - ci.setClientId(key.getClientId()); - subscriptionInfos.add(ci); - } - } - BrokerSubscriptionInfo bsi = new BrokerSubscriptionInfo(brokerService.getBrokerName()); - bsi.setSubscriptionInfos(subscriptionInfos.toArray(new ConsumerInfo[0])); - return bsi; - } - private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException { Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties()); Map props = createMap(properties); @@ -1412,7 +1398,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { NetworkBridgeConfiguration config = getNetworkConfiguration(info); if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); - dispatchSync(getBrokerSubscriptionInfo(this.broker.getBrokerService())); + dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); } } catch (Exception e) { LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e); @@ -1425,9 +1411,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { NetworkBridgeConfiguration config = getNetworkConfiguration(info); config.setBrokerName(broker.getBrokerName()); - if (config.isSyncDurableSubs() && protocolVersion.get() >= 12) { + if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); - dispatchSync(getBrokerSubscriptionInfo(this.broker.getBrokerService())); + dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); } // check for existing duplex connection hanging about http://git-wip-us.apache.org/repos/asf/activemq/blob/9b6b31ca/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index b58259d..a8c45b0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -99,6 +99,7 @@ import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.MarshallingSupport; +import org.apache.activemq.util.NetworkBridgeUtils; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter; @@ -575,7 +576,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br remoteBroker.oneway(brokerInfo); if (configuration.isSyncDurableSubs() && remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { - remoteBroker.oneway(TransportConnection.getBrokerSubscriptionInfo(brokerService)); + remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService, + configuration)); } } if (remoteConnectionInfo != null) { @@ -656,8 +658,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (started.get()) { if (subInfo.getSubscriptionInfos() != null) { for (ConsumerInfo info : subInfo.getSubscriptionInfos()) { - if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) && - matchesDynamicallyIncludedDestinations(info.getDestination())) { + //re-add any process any non-NC consumers that match the + //dynamicallyIncludedDestinations list + if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) && + NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) { serviceRemoteConsumerAdvisory(info); } } @@ -666,7 +670,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br //After re-added, clean up any empty durables for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { DemandSubscription ds = i.next(); - if (matchesDynamicallyIncludedDestinations(ds.getLocalInfo().getDestination())) { + if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) { cleanupDurableSub(ds, i); } } @@ -907,7 +911,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br Iterator i) throws IOException { if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty() && ds.getForcedDurableConsumersSize() == 0) { - // deactivate subscriber RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId()); localBroker.oneway(removeInfo); @@ -1245,33 +1248,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return true; } - private boolean matchesDynamicallyIncludedDestinations(ActiveMQDestination destination) { - ActiveMQDestination[] dests = dynamicallyIncludedDestinations; - if (dests != null && dests.length > 0) { - for (ActiveMQDestination dest : dests) { - DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); - if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { - return true; - } - } - } - - return false; - } - - protected ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) { - if (dests != null && dests.length > 0) { - for (ActiveMQDestination dest : dests) { - DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); - if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { - return dest; - } - } - } - - return null; - } - /** * Subscriptions for these destinations are always created */ http://git-wip-us.apache.org/repos/asf/activemq/blob/9b6b31ca/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java index 969c386..50c9855 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java @@ -17,9 +17,7 @@ package org.apache.activemq.network; import java.io.IOException; -import java.util.Map; -import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; @@ -29,7 +27,7 @@ import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.transport.Transport; -import org.apache.activemq.util.TypeConversionSupport; +import org.apache.activemq.util.NetworkBridgeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,7 +118,8 @@ public class DurableConduitBridge extends ConduitBridge { @Override protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { - boolean isForcedDurable = isForcedDurable(info); + boolean isForcedDurable = NetworkBridgeUtils.isForcedDurable(info, + dynamicallyIncludedDestinations, staticallyIncludedDestinations); if (addToAlreadyInterestedConsumers(info, isForcedDurable)) { return null; // don't want this subscription added @@ -146,40 +145,6 @@ public class DurableConduitBridge extends ConduitBridge { return demandSubscription; } - - private boolean isForcedDurable(ConsumerInfo info) { - if (info.isDurable()) { - return false; - } - - ActiveMQDestination destination = info.getDestination(); - if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() || - destination.isQueue()) { - return false; - } - - ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination); - if (matching != null) { - return isDestForcedDurable(matching); - } - matching = findMatchingDestination(staticallyIncludedDestinations, destination); - if (matching != null) { - return isDestForcedDurable(matching); - } - return false; - } - - private boolean isDestForcedDurable(ActiveMQDestination destination) { - final Map options = destination.getOptions(); - - boolean isForceDurable = false; - if (options != null) { - isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class); - } - - return isForceDurable; - } - protected String getSubscriberName(ActiveMQDestination dest) { String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName(); return subscriberName; http://git-wip-us.apache.org/repos/asf/activemq/blob/9b6b31ca/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java b/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java new file mode 100644 index 0000000..700baf6 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/util/NetworkBridgeUtils.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.activemq.advisory.AdvisoryBroker; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DurableTopicSubscription; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.TopicRegion; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.BrokerSubscriptionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.filter.DestinationFilter; +import org.apache.activemq.network.NetworkBridgeConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NetworkBridgeUtils { + + private static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeUtils.class); + + /** + * Generate the BrokerSubscriptionInfo which is used to tell the broker on the other + * side of the network bridge which NC durable subscriptions are still needed for demand. + * @param brokerService + * @param config + * @return + */ + public static BrokerSubscriptionInfo getBrokerSubscriptionInfo(final BrokerService brokerService, + final NetworkBridgeConfiguration config) { + + RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); + TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); + Set subscriptionInfos = new HashSet<>(); + + //Add all durable subscriptions to the set that match the network config + //which currently is just the dynamicallyIncludedDestinations list + for (SubscriptionKey key : topicRegion.getDurableSubscriptions().keySet()) { + DurableTopicSubscription sub = topicRegion.getDurableSubscriptions().get(key); + if (sub != null && NetworkBridgeUtils.matchesNetworkConfig(config, sub.getConsumerInfo().getDestination())) { + ConsumerInfo ci = sub.getConsumerInfo().copy(); + ci.setClientId(key.getClientId()); + subscriptionInfos.add(ci); + } + } + + //We also need to iterate over all normal subscriptions and check if they are part of + //any dynamicallyIncludedDestination that is configured with forceDurable to be true + //over the network bridge. If forceDurable is true then we want to add the consumer to the set + for (Subscription sub : topicRegion.getSubscriptions().values()) { + if (sub != null && NetworkBridgeUtils.isForcedDurable(sub.getConsumerInfo(), + config.getDynamicallyIncludedDestinations())) { + subscriptionInfos.add(sub.getConsumerInfo().copy()); + } + } + + try { + //Lastly, if isUseVirtualDestSubs is configured on this broker (to fire advisories) and + //configured on the network connector (to listen to advisories) then also add any virtual + //dest subscription to the set if forceDurable is true for its destination + AdvisoryBroker ab = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); + if (ab != null && brokerService.isUseVirtualDestSubs() && config.isUseVirtualDestSubs()) { + for (ConsumerInfo info : ab.getVirtualDestinationConsumers().keySet()) { + if (NetworkBridgeUtils.isForcedDurable(info, config.getDynamicallyIncludedDestinations())) { + subscriptionInfos.add(info.copy()); + } + } + } + } catch (Exception e) { + LOG.warn("Error processing virtualDestinationSubs for BrokerSubscriptionInfo"); + LOG.debug("Error processing virtualDestinationSubs for BrokerSubscriptionInfo", e); + } + BrokerSubscriptionInfo bsi = new BrokerSubscriptionInfo(brokerService.getBrokerName()); + bsi.setSubscriptionInfos(subscriptionInfos.toArray(new ConsumerInfo[0])); + return bsi; + } + + public static boolean isForcedDurable(final ConsumerInfo info, + final List dynamicallyIncludedDestinations) { + return dynamicallyIncludedDestinations != null + ? isForcedDurable(info, + dynamicallyIncludedDestinations.toArray(new ActiveMQDestination[0]), null) : false; + } + + public static boolean isForcedDurable(final ConsumerInfo info, + final ActiveMQDestination[] dynamicallyIncludedDestinations, + final ActiveMQDestination[] staticallyIncludedDestinations) { + + if (info.isDurable() || info.getDestination().isQueue()) { + return false; + } + + ActiveMQDestination destination = info.getDestination(); + if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() || + destination.isQueue()) { + return false; + } + + ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination); + if (matching != null) { + return isDestForcedDurable(matching); + } + matching = findMatchingDestination(staticallyIncludedDestinations, destination); + if (matching != null) { + return isDestForcedDurable(matching); + } + return false; + } + + public static boolean matchesNetworkConfig(final NetworkBridgeConfiguration config, + ActiveMQDestination destination) { + List includedDests = config.getDynamicallyIncludedDestinations(); + if (includedDests != null && includedDests.size() > 0) { + for (ActiveMQDestination dest : includedDests) { + DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); + if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { + return true; + } + } + } + + return false; + } + + public static boolean matchesDestinations(ActiveMQDestination[] dests, final ActiveMQDestination destination) { + if (dests != null && dests.length > 0) { + for (ActiveMQDestination dest : dests) { + DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); + if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { + return true; + } + } + } + + return false; + } + + public static ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) { + if (dests != null && dests.length > 0) { + for (ActiveMQDestination dest : dests) { + DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); + if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { + return dest; + } + } + } + + return null; + } + + public static boolean isDestForcedDurable(final ActiveMQDestination destination) { + boolean isForceDurable = false; + if (destination != null) { + final Map options = destination.getOptions(); + + if (options != null) { + isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class); + } + } + + return isForceDurable; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/9b6b31ca/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java index 62b3dec..4a705f3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.network; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.io.File; import java.net.URI; import java.util.Arrays; @@ -23,17 +26,25 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.advisory.AdvisoryBroker; +import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.virtual.CompositeTopic; +import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.CommandTypes; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; import org.apache.activemq.util.Wait; @@ -57,11 +68,13 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class); + protected JavaRuntimeConfigurationBroker remoteRuntimeBroker; protected String staticIncludeTopics = "include.static.test"; protected String includedTopics = "include.test.>"; protected String testTopicName2 = "include.test.bar2"; private boolean dynamicOnly = false; private boolean forceDurable = false; + private boolean useVirtualDestSubs = false; private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION; public static enum FLOW {FORWARD, REVERSE}; @@ -107,6 +120,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { staticIncludeTopics = "include.static.test"; dynamicOnly = false; forceDurable = false; + useVirtualDestSubs = false; remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION; doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder()); } @@ -521,6 +535,116 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { } + @Test(timeout = 60 * 1000) + public void testVirtualDestSubForceDurableSync() throws Exception { + Assume.assumeTrue(flow == FLOW.FORWARD); + forceDurable = true; + useVirtualDestSubs = true; + this.restartBrokers(true); + + //configure a virtual destination that forwards messages from topic testQueueName + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge")); + remoteRuntimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true); + + MessageProducer includedProducer = localSession.createProducer(included); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); + final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination( + new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); + + //Make sure that the NC durable is created because of the compositeTopic + waitForConsumerCount(destinationStatistics, 1); + assertNCDurableSubsCount(localBroker, included, 1); + + //Send message and make sure it is dispatched across the bridge + includedProducer.send(test); + waitForDispatchFromLocalBroker(destinationStatistics, 1); + assertLocalBrokerStatistics(destinationStatistics, 1); + assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount()); + + //Stop the remote broker so the bridge stops and then send 500 messages so + //the messages build up on the NC durable + this.stopRemoteBroker(); + for (int i = 0; i < 500; i++) { + includedProducer.send(test); + } + this.stopLocalBroker(); + + //Restart the brokers + this.restartRemoteBroker(); + remoteRuntimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true); + this.restartLocalBroker(true); + + //We now need to verify that 501 messages made it to the queue on the remote side + //which means that the NC durable was not deleted and recreated during the sync + final DestinationStatistics remoteDestStatistics2 = remoteBroker.getDestination( + new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); + + assertTrue(Wait.waitFor(new Condition() { + + @Override + public boolean isSatisified() throws Exception { + return remoteDestStatistics2.getMessages().getCount() == 501; + } + })); + + } + + @Test(timeout = 60 * 1000) + public void testForceDurableTopicSubSync() throws Exception { + Assume.assumeTrue(flow == FLOW.FORWARD); + forceDurable = true; + this.restartBrokers(true); + + //configure a virtual destination that forwards messages from topic testQueueName + remoteSession.createConsumer(included); + + MessageProducer includedProducer = localSession.createProducer(included); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); + + //Make sure that the NC durable is created because of the compositeTopic + waitForConsumerCount(destinationStatistics, 1); + assertNCDurableSubsCount(localBroker, included, 1); + + //Send message and make sure it is dispatched across the bridge + includedProducer.send(test); + waitForDispatchFromLocalBroker(destinationStatistics, 1); + assertLocalBrokerStatistics(destinationStatistics, 1); + + //Stop the network connector and send messages to the local broker so they build + //up on the durable + this.localBroker.getNetworkConnectorByName("networkConnector").stop(); + + for (int i = 0; i < 500; i++) { + includedProducer.send(test); + } + + //restart the local broker and bridge + this.stopLocalBroker(); + this.restartLocalBroker(true); + + //We now need to verify that the 500 messages on the NC durable are dispatched + //on bridge sync which shows that the durable wasn't destroyed/recreated + final DestinationStatistics destinationStatistics2 = + localBroker.getDestination(included).getDestinationStatistics(); + waitForDispatchFromLocalBroker(destinationStatistics2, 500); + assertLocalBrokerStatistics(destinationStatistics2, 500); + + } + + protected CompositeTopic createCompositeTopic(String name, ActiveMQDestination...forwardTo) { + CompositeTopic compositeTopic = new CompositeTopic(); + compositeTopic.setName(name); + compositeTopic.setForwardOnly(true); + compositeTopic.setForwardTo( Lists.newArrayList(forwardTo)); + + return compositeTopic; + } + protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception { if (broker.getBrokerName().equals("localBroker")) { restartLocalBroker(startNetworkConnector); @@ -607,12 +731,15 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { if (flow.equals(FLOW.FORWARD)) { broker2 = remoteBroker; session2 = remoteSession; + remoteRuntimeBroker = (JavaRuntimeConfigurationBroker) + remoteBroker.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class); } else { broker1 = remoteBroker; session1 = remoteSession; } } + protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConnector) throws Exception { BrokerService brokerService = new BrokerService(); brokerService.setMonitorConnectionSplits(true); @@ -622,6 +749,8 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { adapter.setDirectory(dataDir); adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name()); brokerService.setPersistenceAdapter(adapter); + brokerService.setUseVirtualDestSubs(useVirtualDestSubs); + brokerService.setUseVirtualDestSubsOnCreation(useVirtualDestSubs); if (startNetworkConnector) { brokerService.addNetworkConnector(configureLocalNetworkConnector()); @@ -645,10 +774,11 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { connector.setDuplex(true); connector.setStaticBridge(false); connector.setSyncDurableSubs(true); + connector.setUseVirtualDestSubs(useVirtualDestSubs); connector.setStaticallyIncludedDestinations( Lists.newArrayList(new ActiveMQTopic(staticIncludeTopics + "?forceDurable=" + forceDurable))); connector.setDynamicallyIncludedDestinations( - Lists.newArrayList(new ActiveMQTopic(includedTopics))); + Lists.newArrayList(new ActiveMQTopic(includedTopics + "?forceDurable=" + forceDurable))); connector.setExcludedDestinations( Lists.newArrayList(new ActiveMQTopic(excludeTopicName))); return connector; @@ -665,6 +795,12 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { adapter.setDirectory(dataDir); adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name()); brokerService.setPersistenceAdapter(adapter); + brokerService.setUseVirtualDestSubs(useVirtualDestSubs); + brokerService.setUseVirtualDestSubsOnCreation(useVirtualDestSubs); + + if (useVirtualDestSubs) { + brokerService.setPlugins(new BrokerPlugin[] {new JavaRuntimeConfigurationPlugin()}); + } remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);