activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5799
Date Tue, 26 May 2015 22:30:49 GMT
Repository: activemq
Updated Branches:
  refs/heads/master b5c626478 -> ffcd99ac8


https://issues.apache.org/jira/browse/AMQ-5799

Add unit test for case of JMS AMQP client changing properties on a
durable sub when resubscribing, not working with current version. 

fix: Use existing filters map to avoind creating a new instance.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ffcd99ac
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ffcd99ac
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ffcd99ac

Branch: refs/heads/master
Commit: ffcd99ac853c0f5f8b31ffb41b507cec9c60ea0d
Parents: b5c6264
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue May 26 18:28:28 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue May 26 18:28:28 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpSession.java    |   9 +-
 .../amqp/JMSDurableSubNoLocalChangedTest.java   | 200 +++++++++++++++++++
 2 files changed, 202 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ffcd99ac/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
index cdef850..ff106fd 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -270,17 +270,12 @@ public class AmqpSession implements AmqpResource {
                     source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
                     source.setDistributionMode(COPY);
 
-                    Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
                     if (storedInfo.isNoLocal()) {
-                        filters.put(NO_LOCAL_NAME, AmqpJmsNoLocalType.NO_LOCAL);
+                        supportedFilters.put(NO_LOCAL_NAME, AmqpJmsNoLocalType.NO_LOCAL);
                     }
 
                     if (storedInfo.getSelector() != null && !storedInfo.getSelector().trim().equals(""))
{
-                        filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorType(storedInfo.getSelector()));
-                    }
-
-                    if (!filters.isEmpty()) {
-                        source.setFilter(filters);
+                        supportedFilters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorType(storedInfo.getSelector()));
                     }
                 } else {
                     sender.close(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription
link: " + protonSender.getName()));

http://git-wip-us.apache.org/repos/asf/activemq/blob/ffcd99ac/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSDurableSubNoLocalChangedTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSDurableSubNoLocalChangedTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSDurableSubNoLocalChangedTest.java
new file mode 100644
index 0000000..823f4e7
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSDurableSubNoLocalChangedTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for behavior of durable subscriber that changes noLocal setting
+ * on reconnect.
+ */
+public class JMSDurableSubNoLocalChangedTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JMSDurableSubNoLocalChangedTest.class);
+
+    private final int MSG_COUNT = 10;
+
+    private BrokerService brokerService;
+    private URI connectionUri;
+
+    private String clientId;
+    private String subscriptionName;
+    private String topicName;
+
+    private final List<TopicConnection> connections = new ArrayList<TopicConnection>();
+
+    @Rule public TestName name = new TestName();
+
+    protected TopicConnection createConnection() throws JMSException {
+        TopicConnection connection = JMSClientContext.INSTANCE.createTopicConnection(connectionUri,
null, null, clientId, true);
+        connection.start();
+
+        connections.add(connection);
+
+        return connection;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setUseJmx(true);
+        brokerService.getManagementContext().setCreateMBeanServer(false);
+        brokerService.setPersistent(false);
+        brokerService.setAdvisorySupport(false);
+        brokerService.setSchedulerSupport(false);
+        brokerService.setKeepDurableSubsActive(false);
+        brokerService.addConnector("amqp://0.0.0.0:0");
+        brokerService.start();
+
+        connectionUri = new URI("amqp://localhost:" +
+            brokerService.getTransportConnectorByScheme("amqp").getPublishableConnectURI().getPort());
+
+        clientId = name.getMethodName() + "-ClientId";
+        subscriptionName = name.getMethodName() + "-Subscription";
+        topicName = name.getMethodName();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        for (TopicConnection connection : connections) {
+            try {
+                connection.close();
+            } catch (Exception e) {}
+        }
+
+        connections.clear();
+
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    @Ignore("Not yet working with current QPid JMS client")
+    @Test(timeout = 60000)
+    public void testDurableResubscribeWithNewNoLocalValue() throws Exception {
+        TopicConnection connection = createConnection();
+        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Topic topic = session.createTopic(topicName);
+
+        // Create a Durable Topic Subscription with noLocal set to true.
+        TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName,
null, true);
+
+        // Create a Durable Topic Subscription with noLocal set to true.
+        TopicSubscriber nonDurableSubscriber = session.createSubscriber(topic);
+
+        // Public first set, only the non durable sub should get these.
+        publishToTopic(session, topic);
+
+        LOG.debug("Testing that noLocal=true subscription doesn't get any messages.");
+
+        // Standard subscriber should receive them
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            Message message = nonDurableSubscriber.receive(5000);
+            assertNotNull(message);
+        }
+
+        // Durable noLocal=true subscription should not receive them
+        {
+            Message message = durableSubscriber.receive(2000);
+            assertNull(message);
+        }
+
+        // Public second set for testing durable sub changed.
+        publishToTopic(session, topic);
+
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        // Durable now goes inactive.
+        durableSubscriber.close();
+
+        assertTrue("Should have no durables.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getDurableTopicSubscribers().length ==
0;
+            }
+        }));
+        assertTrue("Should have an inactive sub.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length
== 1;
+            }
+        }));
+
+        LOG.debug("Testing that updated noLocal=false subscription does get any messages.");
+
+        // Recreate a Durable Topic Subscription with noLocal set to false.
+        durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null,
false);
+
+        assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        // Durable noLocal=false subscription should not receive them as the subscriptions
should
+        // have been removed and recreated to update the noLocal flag.
+        {
+            Message message = durableSubscriber.receive(2000);
+            assertNull(message);
+        }
+
+        // Public third set which should get queued for the durable sub with noLocal=false
+        publishToTopic(session, topic);
+
+        // Durable subscriber should receive them
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            Message message = durableSubscriber.receive(5000);
+            assertNotNull("Should get local messages now", message);
+        }
+    }
+
+    private void publishToTopic(TopicSession session, Topic destination) throws Exception
{
+        TopicPublisher publisher = session.createPublisher(destination);
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            publisher.send(session.createMessage());
+        }
+
+        publisher.close();
+    }
+}


Mime
View raw message