activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6423
Date Thu, 08 Sep 2016 12:29:12 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x 4dbe61dd5 -> 5956bdc1f


https://issues.apache.org/jira/browse/AMQ-6423

Fixing durable sync over a network bridge so that network subscriptions
that are no longer permissible are also cleaned up

(cherry picked from commit a038655605e8fa1de279b37989ba69a68f83c601)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5956bdc1
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5956bdc1
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5956bdc1

Branch: refs/heads/activemq-5.14.x
Commit: 5956bdc1f5d6b26bbf610e2e1bdd4f8f83d317c7
Parents: 4dbe61d
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Thu Sep 8 08:27:49 2016 -0400
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Thu Sep 8 08:29:03 2016 -0400

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java  |  12 +-
 .../activemq/network/DurableConduitBridge.java  |  25 ++++
 .../network/DurableSyncNetworkBridgeTest.java   | 129 ++++++++++++++++++-
 .../network/DynamicNetworkTestSupport.java      |  12 ++
 4 files changed, 170 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5956bdc1/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 8a3a56a..70449f0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -652,12 +652,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge,
Br
                             this.brokerService.getBrokerName(), subInfo.getBrokerName());
 
                     if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
-                            && !configuration.isDynamicOnly() && subInfo.getSubscriptionInfos()
!= null) {
+                            && !configuration.isDynamicOnly()) {
                         if (started.get()) {
-                            for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
-                                if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)
&&
-                                        matchesDynamicallyIncludedDestinations(info.getDestination()))
{
-                                    serviceRemoteConsumerAdvisory(info);
+                            if (subInfo.getSubscriptionInfos() != null) {
+                                for (ConsumerInfo info : subInfo.getSubscriptionInfos())
{
+                                    if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)
&&
+                                            matchesDynamicallyIncludedDestinations(info.getDestination()))
{
+                                        serviceRemoteConsumerAdvisory(info);
+                                    }
                                 }
                             }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/5956bdc1/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index e699272..969c386 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -26,6 +26,7 @@ import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.util.TypeConversionSupport;
@@ -88,6 +89,30 @@ public class DurableConduitBridge extends ConduitBridge {
                         LOG.error("Failed to add static destination {}", dest, e);
                     }
                     LOG.trace("Forwarding messages for durable destination: {}", dest);
+                } else if (configuration.isSyncDurableSubs() && !isPermissableDestination(dest))
{
+                    if (dest.isTopic()) {
+                        RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
+                        TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
+
+                        String candidateSubName = getSubscriberName(dest);
+                        for (Subscription subscription : topicRegion.getDurableSubscriptions().values())
{
+                            String subName = subscription.getConsumerInfo().getSubscriptionName();
+                            if (subName != null && subName.equals(candidateSubName))
{
+                               try {
+                                    // remove the NC subscription as it is no longer for
a permissable dest
+                                    RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
+                                    sending.setClientId(localClientId);
+                                    sending.setSubscriptionName(subName);
+                                    sending.setConnectionId(this.localConnectionInfo.getConnectionId());
+                                    localBroker.oneway(sending);
+                                } catch (IOException e) {
+                                    LOG.debug("Exception removing NC durable subscription:
{}", subName, e);
+                                    serviceRemoteException(e);
+                                }
+                                break;
+                            }
+                        }
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/5956bdc1/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 67e9e24..62b3dec 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
@@ -56,14 +57,18 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
 
     protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class);
 
+    protected String staticIncludeTopics = "include.static.test";
+    protected String includedTopics = "include.test.>";
     protected String testTopicName2 = "include.test.bar2";
     private boolean dynamicOnly = false;
+    private boolean forceDurable = false;
     private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
     public static enum FLOW {FORWARD, REVERSE};
 
     private BrokerService broker1;
     private BrokerService broker2;
     private Session session1;
+    private Session session2;
     private final FLOW flow;
 
     @Rule
@@ -98,7 +103,10 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
 
     @Before
     public void setUp() throws Exception {
+        includedTopics = "include.test.>";
+        staticIncludeTopics = "include.static.test";
         dynamicOnly = false;
+        forceDurable = false;
         remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
         doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder());
     }
@@ -135,6 +143,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
         assertNCDurableSubsCount(broker2, topic, 1);
 
         restartBrokers(true);
+        assertBridgeStarted();
 
         assertSubscriptionsCount(broker1, topic, 1);
         assertNCDurableSubsCount(broker2, topic, 1);
@@ -157,6 +166,43 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
 
         doTearDown();
         restartBroker(broker1, false);
+        restartBroker(broker2, false);
+
+        //Send some messages to the NC sub and make sure it can still be deleted
+        MessageProducer prod = session2.createProducer(topic);
+        for (int i = 0; i < 10; i++) {
+            prod.send(session2.createTextMessage("test"));
+        }
+
+        assertSubscriptionsCount(broker1, topic, 1);
+        removeSubscription(broker1, topic, subName);
+        assertSubscriptionsCount(broker1, topic, 0);
+        doTearDown();
+
+        //Test that on successful reconnection of the bridge that
+        //the NC sub will be removed
+        restartBroker(broker2, true);
+        assertNCDurableSubsCount(broker2, topic, 1);
+        restartBroker(broker1, true);
+        assertBridgeStarted();
+        assertNCDurableSubsCount(broker2, topic, 0);
+
+    }
+
+    @Test
+    public void testRemoveSubscriptionWithBridgeOfflineIncludedChanged() throws Exception
{
+        final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+        MessageConsumer sub1 = session1.createDurableSubscriber(topic, subName);
+        sub1.close();
+
+        assertSubscriptionsCount(broker1, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        doTearDown();
+
+        //change the included topics to make sure we still cleanup non-matching NC durables
+        includedTopics = "different.topic";
+        restartBroker(broker1, false);
         assertSubscriptionsCount(broker1, topic, 1);
         removeSubscription(broker1, topic, subName);
         assertSubscriptionsCount(broker1, topic, 0);
@@ -166,8 +212,74 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
         restartBroker(broker2, true);
         assertNCDurableSubsCount(broker2, topic, 1);
         restartBroker(broker1, true);
+        assertBridgeStarted();
+        assertNCDurableSubsCount(broker2, topic, 0);
+
+    }
+
+    @Test
+    public void testSubscriptionRemovedAfterIncludedChanged() throws Exception {
+        final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+        MessageConsumer sub1 = session1.createDurableSubscriber(topic, subName);
+        sub1.close();
+
+        assertSubscriptionsCount(broker1, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        doTearDown();
+
+        //change the included topics to make sure we still cleanup non-matching NC durables
+        includedTopics = "different.topic";
+        restartBroker(broker1, false);
+        assertSubscriptionsCount(broker1, topic, 1);
+
+        //Test that on successful reconnection of the bridge that
+        //the NC sub will be removed because even though the local subscription exists,
+        //it no longer matches the included filter
+        restartBroker(broker2, true);
+        assertNCDurableSubsCount(broker2, topic, 1);
+        restartBroker(broker1, true);
+        assertBridgeStarted();
         assertNCDurableSubsCount(broker2, topic, 0);
+        assertSubscriptionsCount(broker1, topic, 1);
+
+    }
+
+    @Test
+    public void testSubscriptionRemovedAfterStaticChanged() throws Exception {
+        forceDurable = true;
+        this.restartBrokers(true);
+
+        final ActiveMQTopic topic = new ActiveMQTopic(this.staticIncludeTopics);
+        MessageConsumer sub1 = session1.createDurableSubscriber(topic, subName);
+        sub1.close();
+
+        assertSubscriptionsCount(broker1, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        doTearDown();
 
+        //change the included topics to make sure we still cleanup non-matching NC durables
+        staticIncludeTopics = "different.topic";
+        this.restartBrokers(false);
+        assertSubscriptionsCount(broker1, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        //Send some messages to the NC sub and make sure it can still be deleted
+        MessageProducer prod = session2.createProducer(topic);
+        for (int i = 0; i < 10; i++) {
+            prod.send(session2.createTextMessage("test"));
+        }
+
+        //Test that on successful reconnection of the bridge that
+        //the NC sub will be removed because even though the local subscription exists,
+        //it no longer matches the included static filter
+        restartBroker(broker2, true);
+        assertNCDurableSubsCount(broker2, topic, 1);
+        restartBroker(broker1, true);
+        assertBridgeStarted();
+        assertNCDurableSubsCount(broker2, topic, 0);
+        assertSubscriptionsCount(broker1, topic, 1);
     }
 
     @Test
@@ -199,9 +311,9 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
 
         //After sync, remove old NC and create one for topic 2
         restartBroker(broker1, true);
+        assertBridgeStarted();
         assertNCDurableSubsCount(broker2, topic, 0);
         assertNCDurableSubsCount(broker2, topic2, 1);
-
     }
 
     @Test
@@ -225,6 +337,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
         assertSubscriptionsCount(broker1, topic2, 1);
 
         restartBrokers(true);
+        assertBridgeStarted();
         assertNCDurableSubsCount(broker2, topic, 1);
         assertNCDurableSubsCount(broker2, topic2, 1);
         assertNCDurableSubsCount(broker2, excludeTopic, 0);
@@ -265,6 +378,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
             assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 1);
         }
 
+        assertBridgeStarted();
     }
 
 
@@ -291,6 +405,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
         //not be added
         restartBrokers(true);
         assertNCDurableSubsCount(broker2, topic, 0);
+        assertBridgeStarted();
 
     }
 
@@ -312,6 +427,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
 
         restartBrokers(true);
         assertNCDurableSubsCount(broker2, topic, 0);
+        assertBridgeStarted();
     }
 
     @Test
@@ -335,6 +451,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
         //bring online again
         session1.createDurableSubscriber(topic, subName);
         assertNCDurableSubsCount(broker2, topic, 1);
+        assertBridgeStarted();
 
     }
 
@@ -358,6 +475,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
         restartBrokers(true);
         assertNCDurableSubsCount(broker2, topic, 1);
         assertNCDurableSubsCount(broker2, excludeTopic, 0);
+        assertBridgeStarted();
 
     }
 
@@ -389,6 +507,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
         //between the sync command and the online durables that are added over
         //the consumer advisory
         restartBrokers(true);
+        assertBridgeStarted();
 
         //Re-create
         session1.createDurableSubscriber(topic, subName);
@@ -460,7 +579,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
                 public boolean isSatisified() throws Exception {
                     return localBroker.getNetworkConnectors().get(0).activeBridges().size()
== 1;
                 }
-            }, 10000, 500);
+            }, 5000, 500);
         }
         localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
@@ -469,6 +588,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
             session1 = localSession;
         } else {
             broker2 = localBroker;
+            session2 = localSession;
         }
     }
 
@@ -486,6 +606,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
 
         if (flow.equals(FLOW.FORWARD)) {
             broker2 = remoteBroker;
+            session2 = remoteSession;
         } else {
             broker1 = remoteBroker;
             session1 = remoteSession;
@@ -524,8 +645,10 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport
{
         connector.setDuplex(true);
         connector.setStaticBridge(false);
         connector.setSyncDurableSubs(true);
+        connector.setStaticallyIncludedDestinations(
+                Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(staticIncludeTopics
+ "?forceDurable=" + forceDurable)));
         connector.setDynamicallyIncludedDestinations(
-                Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic("include.test.>")));
+                Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(includedTopics)));
         connector.setExcludedDestinations(
                 Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(excludeTopicName)));
         return connector;

http://git-wip-us.apache.org/repos/asf/activemq/blob/5956bdc1/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
index 0b388cc..4b8942b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -88,6 +88,16 @@ public abstract class DynamicNetworkTestSupport {
         }
     }
 
+
+    protected void assertBridgeStarted() throws Exception {
+        assertTrue(Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return localBroker.getNetworkConnectors().get(0).activeBridges().size() ==
1;
+            }
+        }, 10000, 500));
+    }
+
     protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final ConnectionContext context,
             final BrokerService brokerService) throws Exception {
         RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
@@ -181,6 +191,7 @@ public abstract class DynamicNetworkTestSupport {
             destination = (Topic) d;
         }
 
+
         for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
             if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX))
{
                 DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
@@ -189,6 +200,7 @@ public abstract class DynamicNetworkTestSupport {
                 }
             }
         }
+
         return subs;
     }
 


Mime
View raw message