activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5799
Date Tue, 26 May 2015 21:36:40 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 8e183db03 -> b5c626478


https://issues.apache.org/jira/browse/AMQ-5799

Return the noLocal filter and set selector if one exists for the
existing durable subscription when a lookup is requested.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b5c62647
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b5c62647
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b5c62647

Branch: refs/heads/master
Commit: b5c62647897b119364794d2ef0027b8d84b216d3
Parents: 8e183db
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue May 26 17:30:54 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue May 26 17:30:54 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpConnection.java |  7 +-
 .../transport/amqp/protocol/AmqpSession.java    | 23 ++++++-
 .../amqp/interop/AmqpDurableReceiverTest.java   | 67 +++++++++++++++++++-
 3 files changed, 90 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b5c62647/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index 365c0fc..711156a 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -49,6 +49,7 @@ import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.MessageDispatch;
@@ -630,8 +631,8 @@ public class AmqpConnection implements AmqpProtocolConverter {
         subscriptionsByConsumerId.remove(consumerId);
     }
 
-    ActiveMQDestination lookupSubscription(String subscriptionName) throws AmqpProtocolException
{
-        ActiveMQDestination result = null;
+    ConsumerInfo lookupSubscription(String subscriptionName) throws AmqpProtocolException
{
+        ConsumerInfo result = null;
         RegionBroker regionBroker;
 
         try {
@@ -643,7 +644,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
         final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
         DurableTopicSubscription subscription = topicRegion.lookupSubscription(subscriptionName,
connectionInfo.getClientId());
         if (subscription != null) {
-            result = subscription.getActiveMQDestination();
+            result = subscription.getConsumerInfo();
         }
 
         return result;

http://git-wip-us.apache.org/repos/asf/activemq/blob/b5c62647/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
index abc680b..cdef850 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -18,7 +18,9 @@ package org.apache.activemq.transport.amqp.protocol;
 
 import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
 import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
 import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
 import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination;
 import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
 
@@ -43,6 +45,8 @@ import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.qpid.jms.provider.amqp.AmqpJmsNoLocalType;
+import org.apache.qpid.jms.provider.amqp.AmqpJmsSelectorType;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Target;
@@ -255,14 +259,29 @@ public class AmqpSession implements AmqpResource {
             ActiveMQDestination destination;
             if (source == null) {
                 // Attempt to recover previous subscription
-                destination = connection.lookupSubscription(protonSender.getName());
+                ConsumerInfo storedInfo = connection.lookupSubscription(protonSender.getName());
+
+                if (storedInfo != null) {
+                    destination = storedInfo.getDestination();
 
-                if (destination != null) {
                     source = new org.apache.qpid.proton.amqp.messaging.Source();
                     source.setAddress(destination.getQualifiedName());
                     source.setDurable(TerminusDurability.UNSETTLED_STATE);
                     source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
                     source.setDistributionMode(COPY);
+
+                    Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
+                    if (storedInfo.isNoLocal()) {
+                        filters.put(NO_LOCAL_NAME, AmqpJmsNoLocalType.NO_LOCAL);
+                    }
+
+                    if (storedInfo.getSelector() != null && !storedInfo.getSelector().trim().equals(""))
{
+                        filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorType(storedInfo.getSelector()));
+                    }
+
+                    if (!filters.isEmpty()) {
+                        source.setFilter(filters);
+                    }
                 } else {
                     sender.close(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription
link: " + protonSender.getName()));
                     return;

http://git-wip-us.apache.org/repos/asf/activemq/blob/b5c62647/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
index 31c8961..e2d2495 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
@@ -17,8 +17,12 @@
 package org.apache.activemq.transport.amqp.interop;
 
 import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
@@ -219,6 +223,11 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
         assertNotNull(protonReceiver.getRemoteSource());
         Source remoteSource = (Source) protonReceiver.getRemoteSource();
 
+        if (remoteSource.getFilter() != null) {
+            assertFalse(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
+            assertFalse(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
+        }
+
         assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
         assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
         assertEquals(COPY, remoteSource.getDistributionMode());
@@ -235,7 +244,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 60000)
-    public void testLookupExistingSubscriptionAfterRestart() throws Exception {
+    public void testLookupExistingSubscriptionWithSelectorAndNoLocal() throws Exception {
 
         final BrokerViewMBean brokerView = getProxyToBroker();
 
@@ -245,7 +254,56 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
         connection.connect();
 
         AmqpSession session = connection.createSession();
-        AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(),
getTestName());
+        AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(),
getTestName(), "color = red", true);
+
+        assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        receiver.detach();
+
+        assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        receiver = session.lookupSubscription(getTestName());
+
+        assertNotNull(receiver);
+
+        Receiver protonReceiver = receiver.getReceiver();
+        assertNotNull(protonReceiver.getRemoteSource());
+        Source remoteSource = (Source) protonReceiver.getRemoteSource();
+
+        if (remoteSource.getFilter() != null) {
+            assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
+            assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
+        }
+
+        assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
+        assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
+        assertEquals(COPY, remoteSource.getDistributionMode());
+
+        assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        receiver.close();
+
+        assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testLookupExistingSubscriptionAfterRestartWithSelectorAndNoLocal() throws
Exception {
+
+        final BrokerViewMBean brokerView = getProxyToBroker();
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.createConnection();
+        connection.setContainerId(getTestName());
+        connection.connect();
+
+        AmqpSession session = connection.createSession();
+        AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(),
getTestName(), "color = red", true);
 
         assertEquals(1, brokerView.getDurableTopicSubscribers().length);
         assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
@@ -270,6 +328,11 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
         assertNotNull(protonReceiver.getRemoteSource());
         Source remoteSource = (Source) protonReceiver.getRemoteSource();
 
+        if (remoteSource.getFilter() != null) {
+            assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
+            assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
+        }
+
         assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
         assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
         assertEquals(COPY, remoteSource.getDistributionMode());


Mime
View raw message