activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5996
Date Thu, 01 Oct 2015 23:25:00 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 7c7c50505 -> 81b4b9ae3


https://issues.apache.org/jira/browse/AMQ-5996

When possible check for noLocl changes on durable subscription
reactivation and recreate the durable sub if it changes.  For both
selector change and noLocal change also update the AbstractSubscription
selectorExpression so it matches with what was requested. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/81b4b9ae
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/81b4b9ae
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/81b4b9ae

Branch: refs/heads/master
Commit: 81b4b9ae3d17c83c42c17290348e2aed6f17d28b
Parents: 7c7c505
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Oct 1 19:23:48 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu Oct 1 19:23:58 2015 -0400

----------------------------------------------------------------------
 .../amqp/JMSDurableSubNoLocalChangedTest.java   | 177 +++++++--
 .../broker/region/AbstractSubscription.java     |  11 +-
 .../broker/region/DurableTopicSubscription.java |   7 +-
 .../apache/activemq/broker/region/Topic.java    |  24 ++
 .../DurableSubscriptionUpdatesTest.java         | 388 +++++++++++++++++++
 5 files changed, 564 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/81b4b9ae/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSDurableSubNoLocalChangedTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSDurableSubNoLocalChangedTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSDurableSubNoLocalChangedTest.java
index 823f4e7..a398934 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSDurableSubNoLocalChangedTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSDurableSubNoLocalChangedTest.java
@@ -22,8 +22,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -38,7 +36,6 @@ import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.util.Wait;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -53,7 +50,7 @@ public class JMSDurableSubNoLocalChangedTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(JMSDurableSubNoLocalChangedTest.class);
 
-    private final int MSG_COUNT = 10;
+    private final int MSG_COUNT = 5;
 
     private BrokerService brokerService;
     private URI connectionUri;
@@ -62,7 +59,7 @@ public class JMSDurableSubNoLocalChangedTest {
     private String subscriptionName;
     private String topicName;
 
-    private final List<TopicConnection> connections = new ArrayList<TopicConnection>();
+    private TopicConnection connection;
 
     @Rule public TestName name = new TestName();
 
@@ -70,49 +67,27 @@ public class JMSDurableSubNoLocalChangedTest {
         TopicConnection connection = JMSClientContext.INSTANCE.createTopicConnection(connectionUri,
null, null, clientId, true);
         connection.start();
 
-        connections.add(connection);
-
         return connection;
     }
 
     @Before
     public void setUp() throws Exception {
-        brokerService = new BrokerService();
-        brokerService.setUseJmx(true);
-        brokerService.getManagementContext().setCreateMBeanServer(false);
-        brokerService.setPersistent(false);
-        brokerService.setAdvisorySupport(false);
-        brokerService.setSchedulerSupport(false);
-        brokerService.setKeepDurableSubsActive(false);
-        brokerService.addConnector("amqp://0.0.0.0:0");
-        brokerService.start();
-
-        connectionUri = new URI("amqp://localhost:" +
-            brokerService.getTransportConnectorByScheme("amqp").getPublishableConnectURI().getPort());
-
-        clientId = name.getMethodName() + "-ClientId";
-        subscriptionName = name.getMethodName() + "-Subscription";
-        topicName = name.getMethodName();
+        startBroker();
     }
 
     @After
     public void tearDown() throws Exception {
-        for (TopicConnection connection : connections) {
-            try {
-                connection.close();
-            } catch (Exception e) {}
+        try {
+            connection.close();
+        } catch (Exception e) {
         }
 
-        connections.clear();
-
-        brokerService.stop();
-        brokerService.waitUntilStopped();
+        stopBroker();
     }
 
-    @Ignore("Not yet working with current QPid JMS client")
     @Test(timeout = 60000)
-    public void testDurableResubscribeWithNewNoLocalValue() throws Exception {
-        TopicConnection connection = createConnection();
+    public void testResubscribeWithNewNoLocalValueNoBrokerRestart() throws Exception {
+        connection = createConnection();
         TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
 
         Topic topic = session.createTopic(topicName);
@@ -130,13 +105,13 @@ public class JMSDurableSubNoLocalChangedTest {
 
         // Standard subscriber should receive them
         for (int i = 0; i < MSG_COUNT; ++i) {
-            Message message = nonDurableSubscriber.receive(5000);
+            Message message = nonDurableSubscriber.receive(2000);
             assertNotNull(message);
         }
 
         // Durable noLocal=true subscription should not receive them
         {
-            Message message = durableSubscriber.receive(2000);
+            Message message = durableSubscriber.receive(500);
             assertNull(message);
         }
 
@@ -175,7 +150,98 @@ public class JMSDurableSubNoLocalChangedTest {
         // Durable noLocal=false subscription should not receive them as the subscriptions
should
         // have been removed and recreated to update the noLocal flag.
         {
+            Message message = durableSubscriber.receive(500);
+            assertNull(message);
+        }
+
+        // Public third set which should get queued for the durable sub with noLocal=false
+        publishToTopic(session, topic);
+
+        // Durable subscriber should receive them
+        for (int i = 0; i < MSG_COUNT; ++i) {
             Message message = durableSubscriber.receive(2000);
+            assertNotNull("Should get local messages now", message);
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testDurableResubscribeWithNewNoLocalValueWithBrokerRestart() throws Exception
{
+        connection = createConnection();
+        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Topic topic = session.createTopic(topicName);
+
+        // Create a Durable Topic Subscription with noLocal set to true.
+        TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName,
null, true);
+
+        // Create a Durable Topic Subscription with noLocal set to true.
+        TopicSubscriber nonDurableSubscriber = session.createSubscriber(topic);
+
+        // Public first set, only the non durable sub should get these.
+        publishToTopic(session, topic);
+
+        LOG.debug("Testing that noLocal=true subscription doesn't get any messages.");
+
+        // Standard subscriber should receive them
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            Message message = nonDurableSubscriber.receive(2000);
+            assertNotNull(message);
+        }
+
+        // Durable noLocal=true subscription should not receive them
+        {
+            Message message = durableSubscriber.receive(500);
+            assertNull(message);
+        }
+
+        // Public second set for testing durable sub changed.
+        publishToTopic(session, topic);
+
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        // Durable now goes inactive.
+        durableSubscriber.close();
+
+        assertTrue("Should have no durables.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getDurableTopicSubscribers().length ==
0;
+            }
+        }));
+        assertTrue("Should have an inactive sub.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length
== 1;
+            }
+        }));
+
+        LOG.debug("Testing that updated noLocal=false subscription does get any messages.");
+
+        connection.close();
+
+        restartBroker();
+
+        connection = createConnection();
+
+        session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // The previous subscription should be restored as an offline subscription.
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        // Recreate a Durable Topic Subscription with noLocal set to false.
+        durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null,
false);
+
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        // Durable noLocal=false subscription should not receive them as the subscriptions
should
+        // have been removed and recreated to update the noLocal flag.
+        {
+            Message message = durableSubscriber.receive(500);
             assertNull(message);
         }
 
@@ -184,7 +250,7 @@ public class JMSDurableSubNoLocalChangedTest {
 
         // Durable subscriber should receive them
         for (int i = 0; i < MSG_COUNT; ++i) {
-            Message message = durableSubscriber.receive(5000);
+            Message message = durableSubscriber.receive(2000);
             assertNotNull("Should get local messages now", message);
         }
     }
@@ -197,4 +263,41 @@ public class JMSDurableSubNoLocalChangedTest {
 
         publisher.close();
     }
+
+    private void startBroker() throws Exception {
+        createBroker(true);
+    }
+
+    private void restartBroker() throws Exception {
+        stopBroker();
+        createBroker(false);
+    }
+
+    private void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+            brokerService = null;
+        }
+    }
+
+    private void createBroker(boolean deleteMessages) throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setUseJmx(true);
+        brokerService.getManagementContext().setCreateMBeanServer(false);
+        brokerService.setPersistent(true);
+        brokerService.setDeleteAllMessagesOnStartup(deleteMessages);
+        brokerService.setAdvisorySupport(false);
+        brokerService.setSchedulerSupport(false);
+        brokerService.setKeepDurableSubsActive(false);
+        brokerService.addConnector("amqp://0.0.0.0:0");
+        brokerService.start();
+
+        connectionUri = new URI("amqp://localhost:" +
+            brokerService.getTransportConnectorByScheme("amqp").getPublishableConnectURI().getPort());
+
+        clientId = name.getMethodName() + "-ClientId";
+        subscriptionName = name.getMethodName() + "-Subscription";
+        topicName = name.getMethodName();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/81b4b9ae/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
index d22801c..e03ca32 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
@@ -20,7 +20,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
+
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.management.ObjectName;
@@ -43,11 +43,13 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractSubscription implements Subscription {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscription.class);
+
     protected Broker broker;
     protected ConnectionContext context;
     protected ConsumerInfo info;
     protected final DestinationFilter destinationFilter;
     protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
+
     private BooleanExpression selectorExpression;
     private ObjectName objectName;
     private int cursorMemoryHighWaterMark = 70;
@@ -180,6 +182,7 @@ public abstract class AbstractSubscription implements Subscription {
     public int getPrefetchSize() {
         return info.getPrefetchSize();
     }
+
     public void setPrefetchSize(int newSize) {
         info.setPrefetchSize(newSize);
     }
@@ -210,7 +213,6 @@ public abstract class AbstractSubscription implements Subscription {
             if (result) {
                 doAddRecoveredMessage(message);
             }
-
         } finally {
             msgContext.clear();
         }
@@ -245,7 +247,6 @@ public abstract class AbstractSubscription implements Subscription {
      * @param destination
      */
     public void addDestination(Destination destination) {
-
     }
 
     /**
@@ -253,7 +254,6 @@ public abstract class AbstractSubscription implements Subscription {
      * @param destination
      */
     public void removeDestination(Destination destination) {
-
     }
 
     @Override
@@ -289,14 +289,17 @@ public abstract class AbstractSubscription implements Subscription {
         this.lastAckTime = value;
     }
 
+    @Override
     public long getConsumedCount(){
         return subscriptionStatistics.getConsumedCount().getCount();
     }
 
+    @Override
     public void incrementConsumedCount(){
         subscriptionStatistics.getConsumedCount().increment();
     }
 
+    @Override
     public void resetConsumedCount(){
         subscriptionStatistics.getConsumedCount().reset();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/81b4b9ae/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index cf60fdf..64cdd4e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -307,7 +307,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements
Us
 
     @Override
     public void setSelector(String selector) throws InvalidSelectorException {
-        throw new UnsupportedOperationException("You cannot dynamically change the selector
for durable topic subscriptions");
+        if (active.get()) {
+            throw new UnsupportedOperationException("You cannot dynamically change the selector
for durable topic subscriptions");
+        } else {
+            super.setSelector(getSelector());
+        }
     }
 
     @Override
@@ -348,7 +352,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements
Us
                     MessageReference node = pending.next();
                     node.decrementReferenceCount();
                 }
-
             } finally {
                 pending.release();
                 pending.clear();

http://git-wip-us.apache.org/repos/asf/activemq/blob/81b4b9ae/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 61c62ce..02c5fbe 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -212,9 +212,29 @@ public class Topic extends BaseDestination implements Task {
     }
 
     private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) {
+        if (hasSelectorChanged(info1, info2)) {
+            return true;
+        }
+
+        return hasNoLocalChanged(info1, info2);
+    }
+
+    private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) {
+        // Prior to V11 the broker did not store the noLocal value for durable subs.
+        if (brokerService.getStoreOpenWireVersion() >= 11) {
+            if (info1.isNoLocal() ^ info2.isNoLocal()) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) {
         if (info1.getSelector() != null ^ info2.getSelector() != null) {
             return true;
         }
+
         if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector()))
{
             return true;
         }
@@ -242,6 +262,10 @@ public class Topic extends BaseDestination implements Task {
                     // Need to delete the subscription
                     topicStore.deleteSubscription(clientId, subscriptionName);
                     info = null;
+                    // Force a rebuild of the selector chain for the subscription otherwise
+                    // the stored subscription is updated but the selector expression is
not
+                    // and the subscription will not behave according to the new configuration.
+                    subscription.setSelector(subscription.getConsumerInfo().getSelector());
                     synchronized (consumers) {
                         consumers.remove(subscription);
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/81b4b9ae/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUpdatesTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUpdatesTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUpdatesTest.java
new file mode 100644
index 0000000..73d5099
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUpdatesTest.java
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that the durable sub updates when the offline sub is reactivated with new values.
+ */
+public class DurableSubscriptionUpdatesTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionUpdatesTest.class);
+
+    private final int MSG_COUNT = 5;
+
+    private BrokerService brokerService;
+    private URI connectionUri;
+
+    private String clientId;
+    private String subscriptionName;
+    private String topicName;
+
+    private TopicConnection connection;
+
+    @Rule public TestName name = new TestName();
+
+    protected TopicConnection createConnection() throws JMSException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        factory.setUseAsyncSend(true);
+
+        TopicConnection connection = factory.createTopicConnection();
+        connection.setClientID(clientId);
+        connection.start();
+
+        return connection;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        startBroker();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        try {
+            connection.close();
+        } catch (Exception e) {
+        }
+
+        stopBroker();
+    }
+
+    @Test(timeout = 60000)
+    public void testSelectorChange() throws Exception {
+        connection = createConnection();
+        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Topic topic = session.createTopic(topicName);
+
+        // Create a Durable Topic Subscription with noLocal set to true.
+        TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName,
"JMSPriority > 8", false);
+
+        // Public first set, only the non durable sub should get these.
+        publishToTopic(session, topic, 9);
+        publishToTopic(session, topic, 8);
+
+        // Standard subscriber should receive them
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            Message message = durableSubscriber.receive(2000);
+            assertNotNull(message);
+            assertEquals(9, message.getJMSPriority());
+        }
+
+        // Subscriber should not receive the others.
+        {
+            Message message = durableSubscriber.receive(500);
+            assertNull(message);
+        }
+
+        // Public second set for testing durable sub changed.
+        publishToTopic(session, topic, 9);
+
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        // Durable now goes inactive.
+        durableSubscriber.close();
+
+        assertTrue("Should have no durables.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getDurableTopicSubscribers().length ==
0;
+            }
+        }));
+        assertTrue("Should have an inactive sub.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length
== 1;
+            }
+        }));
+
+        LOG.debug("Testing that updated selector subscription does get any messages.");
+
+        // Recreate a Durable Topic Subscription with noLocal set to false.
+        durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, "JMSPriority
> 7", false);
+
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        // Durable subscription should not receive them as the subscriptions should
+        // have been removed and recreated to update the noLocal flag.
+        {
+            Message message = durableSubscriber.receive(500);
+            assertNull(message);
+        }
+
+        // Public third set which should get queued for the durable sub with noLocal=false
+        publishToTopic(session, topic, 8);
+
+        // Durable subscriber should receive them
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            Message message = durableSubscriber.receive(5000);
+            assertNotNull("Should get messages now", message);
+            assertEquals(8, message.getJMSPriority());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testResubscribeWithNewNoLocalValueNoBrokerRestart() throws Exception {
+        connection = createConnection();
+        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Topic topic = session.createTopic(topicName);
+
+        // Create a Durable Topic Subscription with noLocal set to true.
+        TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName,
null, true);
+
+        // Create a Durable Topic Subscription with noLocal set to true.
+        TopicSubscriber nonDurableSubscriber = session.createSubscriber(topic);
+
+        // Public first set, only the non durable sub should get these.
+        publishToTopic(session, topic);
+
+        LOG.debug("Testing that noLocal=true subscription doesn't get any messages.");
+
+        // Standard subscriber should receive them
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            Message message = nonDurableSubscriber.receive(2000);
+            assertNotNull(message);
+        }
+
+        // Durable noLocal=true subscription should not receive them
+        {
+            Message message = durableSubscriber.receive(500);
+            assertNull(message);
+        }
+
+        // Public second set for testing durable sub changed.
+        publishToTopic(session, topic);
+
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        // Durable now goes inactive.
+        durableSubscriber.close();
+
+        assertTrue("Should have no durables.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getDurableTopicSubscribers().length ==
0;
+            }
+        }));
+        assertTrue("Should have an inactive sub.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length
== 1;
+            }
+        }));
+
+        LOG.debug("Testing that updated noLocal=false subscription does get any messages.");
+
+        // Recreate a Durable Topic Subscription with noLocal set to false.
+        durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null,
false);
+
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        // Durable noLocal=false subscription should not receive them as the subscriptions
should
+        // have been removed and recreated to update the noLocal flag.
+        {
+            Message message = durableSubscriber.receive(500);
+            assertNull(message);
+        }
+
+        // Public third set which should get queued for the durable sub with noLocal=false
+        publishToTopic(session, topic);
+
+        // Durable subscriber should receive them
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            Message message = durableSubscriber.receive(5000);
+            assertNotNull("Should get local messages now", message);
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testDurableResubscribeWithNewNoLocalValueWithBrokerRestart() throws Exception
{
+        connection = createConnection();
+        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Topic topic = session.createTopic(topicName);
+
+        // Create a Durable Topic Subscription with noLocal set to true.
+        TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName,
null, true);
+
+        // Create a Durable Topic Subscription with noLocal set to true.
+        TopicSubscriber nonDurableSubscriber = session.createSubscriber(topic);
+
+        // Public first set, only the non durable sub should get these.
+        publishToTopic(session, topic);
+
+        LOG.debug("Testing that noLocal=true subscription doesn't get any messages.");
+
+        // Standard subscriber should receive them
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            Message message = nonDurableSubscriber.receive(2000);
+            assertNotNull(message);
+        }
+
+        // Durable noLocal=true subscription should not receive them
+        {
+            Message message = durableSubscriber.receive(500);
+            assertNull(message);
+        }
+
+        // Public second set for testing durable sub changed.
+        publishToTopic(session, topic);
+
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        // Durable now goes inactive.
+        durableSubscriber.close();
+
+        assertTrue("Should have no durables.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getDurableTopicSubscribers().length ==
0;
+            }
+        }));
+        assertTrue("Should have an inactive sub.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length
== 1;
+            }
+        }));
+
+        LOG.debug("Testing that updated noLocal=false subscription does get any messages.");
+
+        connection.close();
+
+        restartBroker();
+
+        connection = createConnection();
+
+        session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // The previous subscription should be restored as an offline subscription.
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        // Recreate a Durable Topic Subscription with noLocal set to false.
+        durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null,
false);
+
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        // Durable noLocal=false subscription should not receive them as the subscriptions
should
+        // have been removed and recreated to update the noLocal flag.
+        {
+            Message message = durableSubscriber.receive(500);
+            assertNull(message);
+        }
+
+        // Public third set which should get queued for the durable sub with noLocal=false
+        publishToTopic(session, topic);
+
+        // Durable subscriber should receive them
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            Message message = durableSubscriber.receive(2000);
+            assertNotNull("Should get local messages now", message);
+        }
+    }
+
+    private void publishToTopic(TopicSession session, Topic destination) throws Exception
{
+        publishToTopic(session, destination, Message.DEFAULT_PRIORITY);
+    }
+
+    private void publishToTopic(TopicSession session, Topic destination, int priority) throws
Exception {
+        TopicPublisher publisher = session.createPublisher(destination);
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            publisher.send(session.createMessage(), Message.DEFAULT_DELIVERY_MODE, priority,
Message.DEFAULT_TIME_TO_LIVE);
+        }
+
+        publisher.close();
+    }
+
+    private void startBroker() throws Exception {
+        createBroker(true);
+    }
+
+    private void restartBroker() throws Exception {
+        stopBroker();
+        createBroker(false);
+    }
+
+    private void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+            brokerService = null;
+        }
+    }
+
+    private void createBroker(boolean deleteMessages) throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setUseJmx(true);
+        brokerService.getManagementContext().setCreateMBeanServer(false);
+        brokerService.setPersistent(true);
+        brokerService.setDeleteAllMessagesOnStartup(deleteMessages);
+        brokerService.setAdvisorySupport(false);
+        brokerService.setSchedulerSupport(false);
+        brokerService.setKeepDurableSubsActive(false);
+        TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0");
+        brokerService.start();
+
+        connectionUri = connector.getPublishableConnectURI();
+
+        clientId = name.getMethodName() + "-ClientId";
+        subscriptionName = name.getMethodName() + "-Subscription";
+        topicName = name.getMethodName();
+    }
+}


Mime
View raw message