activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6430
Date Wed, 21 Sep 2016 13:35:14 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 6c01b641b -> 7c293b661


https://issues.apache.org/jira/browse/AMQ-6430

When a nolocal durable consumer reconnects the new connectionId is properly captured for
the NoLocal expression so that nolocal works on reconnect.  Also fixed
the detection of the nolocal value changing on consumer connect.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7c293b66
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7c293b66
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7c293b66

Branch: refs/heads/master
Commit: 7c293b661f22245ce21bf2b5aa1c5bf4192cb8c5
Parents: 6c01b64
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Wed Sep 21 09:32:37 2016 -0400
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Wed Sep 21 09:34:46 2016 -0400

----------------------------------------------------------------------
 .../activemq/broker/region/TopicRegion.java     |  15 +-
 .../DurableSubscriptionWithNoLocalTest.java     | 143 ++++++++++++++++++-
 2 files changed, 153 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7c293b66/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
index 51c9beb..eca3449 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
@@ -162,6 +162,12 @@ public class TopicRegion extends AbstractRegion {
                         sub.context = context;
                         sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId());
                     }
+                    //If NoLocal we need to update the NoLocal selector with the new connectionId
+                    //Simply setting the selector with the current one will trigger a
+                    //refresh of of the connectionId for the NoLocal expression
+                    if (info.isNoLocal()) {
+                        sub.setSelector(sub.getSelector());
+                    }
                     subscriptions.put(info.getConsumerId(), sub);
                 }
             } else {
@@ -189,8 +195,9 @@ public class TopicRegion extends AbstractRegion {
                 // deactivate only if given context is same
                 // as what is in the sub. otherwise, during linksteal
                 // sub will get new context, but will be removed here
-                if (sub.getContext() == context)
+                if (sub.getContext() == context) {
                     sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId());
+                }
             }
         } else {
             super.removeConsumer(context, info);
@@ -373,6 +380,12 @@ public class TopicRegion extends AbstractRegion {
         if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector()))
{
             return true;
         }
+        // Prior to V11 the broker did not store the noLocal value for durable subs.
+        if (broker.getBrokerService().getStoreOpenWireVersion() >= 11) {
+            if (info1.isNoLocal() ^ info2.isNoLocal()) {
+                return true;
+            }
+        }
         return !info1.getDestination().equals(info2.getDestination());
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c293b66/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java
index 4ecf811..ecbfac1 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java
@@ -20,18 +20,29 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
 import java.io.File;
-
+import java.util.Arrays;
+import java.util.Collection;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.network.DurableSyncNetworkBridgeTest.FLOW;
 import org.apache.activemq.store.kahadb.KahaDBStore;
 import org.junit.After;
 import org.junit.Before;
@@ -39,12 +50,16 @@ import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Test for spec compliance for durable subscriptions that change the noLocal flag.
  */
+@RunWith(Parameterized.class)
 public class DurableSubscriptionWithNoLocalTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionWithNoLocalTest.class);
@@ -57,6 +72,19 @@ public class DurableSubscriptionWithNoLocalTest {
     private BrokerService brokerService;
     private String connectionUri;
     private ActiveMQConnectionFactory factory;
+    private final boolean keepDurableSubsActive;
+
+    @Parameters(name="keepDurableSubsActive={0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {true},
+                {false}
+        });
+    }
+
+    public DurableSubscriptionWithNoLocalTest(final boolean keepDurableSubsActive) {
+        this.keepDurableSubsActive = keepDurableSubsActive;
+    }
 
     @Before
     public void setUp() throws Exception {
@@ -69,7 +97,115 @@ public class DurableSubscriptionWithNoLocalTest {
         brokerService.waitUntilStopped();
     }
 
-    @Ignore("Requires Broker be able to remove and recreate on noLocal change")
+    /**
+     * Make sure that NoLocal works for connection started/stopped
+     *
+     * @throws JMSException
+     */
+    @Test(timeout = 60000)
+    public void testNoLocalStillWorkWithConnectionRestart() throws Exception {
+        ActiveMQConnection connection = null;
+        try {
+            connection = (ActiveMQConnection) factory.createConnection();
+            connection.setClientID("test-client");
+            connection.start();
+            test(connection, "test message 1");
+            connection.stop();
+            connection.start();
+            test(connection, "test message 2");
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    /**
+     * Make sure that NoLocal works for multiple connections to the same subscription
+     *
+     * @throws JMSException
+     */
+    @Test(timeout = 60000)
+    public void testNoLocalStillWorksNewConnection() throws Exception {
+        ActiveMQConnection connection = null;
+        try {
+            connection = (ActiveMQConnection) factory.createConnection();
+            connection.setClientID("test-client");
+            connection.start();
+            test(connection, "test message 1");
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+
+        try {
+            connection = (ActiveMQConnection) factory.createConnection();
+            connection.setClientID("test-client");
+            connection.start();
+            test(connection, "test message 2");
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    /**
+     * Make sure that NoLocal works after restart
+     *
+     * @throws JMSException
+     */
+    @Test(timeout = 60000)
+    public void testNoLocalStillWorksRestartBroker() throws Exception {
+        ActiveMQConnection connection = null;
+        try {
+            connection = (ActiveMQConnection) factory.createConnection();
+            connection.setClientID("test-client");
+            connection.start();
+            test(connection, "test message 1");
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+
+        tearDown();
+        createBroker(false);
+
+        try {
+            connection = (ActiveMQConnection) factory.createConnection();
+            connection.setClientID("test-client");
+            connection.start();
+            test(connection, "test message 2");
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    void test(final ActiveMQConnection connection, final String body) throws Exception {
+
+        Session incomingMessagesSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = incomingMessagesSession.createTopic("test.topic");
+        TopicSubscriber consumer = incomingMessagesSession.createDurableSubscriber(topic,
"test-subscription", null, true);
+
+        Session outgoingMessagesSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = outgoingMessagesSession.createTopic("test.topic");
+        MessageProducer producer = outgoingMessagesSession.createProducer(destination);
+        TextMessage textMessage = outgoingMessagesSession.createTextMessage(body);
+        producer.send(textMessage);
+        producer.close();
+        System.out.println("message sent: " + textMessage.getJMSMessageID() + "; body: "
+ textMessage.getText());
+        outgoingMessagesSession.close();
+
+        assertNull(consumer.receive(2000));
+
+        consumer.close();
+        incomingMessagesSession.close();
+    }
+
     @Test(timeout = 60000)
     public void testDurableSubWithNoLocalChange() throws Exception {
         TopicConnection connection = factory.createTopicConnection();
@@ -126,7 +262,6 @@ public class DurableSubscriptionWithNoLocalTest {
         assertNull(durableSub.receive(100));
     }
 
-    @Ignore("Requires Broker be able to remove and recreate on noLocal change")
     @Test(timeout = 60000)
     public void testInvertedDurableSubWithNoLocalChange() throws Exception {
         TopicConnection connection = factory.createTopicConnection();
@@ -247,7 +382,6 @@ public class DurableSubscriptionWithNoLocalTest {
         assertNull(durableSub.receive(100));
     }
 
-    @Ignore("Requires Broker be able to remove and recreate on noLocal change")
     @Test(timeout = 60000)
     public void testInvertedDurableSubWithNoLocalChangeAfterRestart() throws Exception {
         TopicConnection connection = factory.createTopicConnection();
@@ -322,6 +456,7 @@ public class DurableSubscriptionWithNoLocalTest {
         brokerService.setStoreOpenWireVersion(CommandTypes.PROTOCOL_VERSION);
         brokerService.setUseJmx(false);
         brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        brokerService.setKeepDurableSubsActive(keepDurableSubsActive);
         TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0");
 
         brokerService.start();


Mime
View raw message