activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-799 Fix issues with the AMQP Durable Topic Subscription model
Date Fri, 14 Oct 2016 01:54:43 GMT
ARTEMIS-799 Fix issues with the AMQP Durable Topic Subscription model

Fixes several issues found in the handling of durable topic
subscriptions (test cases added).


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/226f28ab
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/226f28ab
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/226f28ab

Branch: refs/heads/master
Commit: 226f28abf5468148c7dcbc397b90c383e72b3a3e
Parents: 9743043
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Oct 12 17:33:08 2016 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Fri Oct 14 03:54:35 2016 +0200

----------------------------------------------------------------------
 .../amqp/proton/AMQPConnectionContext.java      |  13 +-
 .../amqp/proton/AmqpJmsSelectorFilter.java      |  46 +++
 .../protocol/amqp/proton/AmqpNoLocalFilter.java |  43 +++
 .../amqp/proton/ProtonServerSenderContext.java  |  41 ++-
 .../amqp/proton/handler/EventHandler.java       |   2 +-
 .../protocol/amqp/proton/handler/Events.java    |   5 +-
 .../amqp/AmqpDurableReceiverTest.java           | 350 +++++++++++++++++++
 7 files changed, 483 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/226f28ab/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 70e4fd0..1f193eb 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -24,7 +24,6 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@@ -46,6 +45,8 @@ import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.engine.Transport;
 import org.jboss.logging.Logger;
 
+import io.netty.buffer.ByteBuf;
+
 public class AMQPConnectionContext extends ProtonInitializable {
 
    private static final Logger log = Logger.getLogger(AMQPConnectionContext.class);
@@ -181,7 +182,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
 
    protected void remoteLinkOpened(Link link) throws Exception {
 
-      AMQPSessionContext protonSession = (AMQPSessionContext) getSessionExtension(link.getSession());
+      AMQPSessionContext protonSession = getSessionExtension(link.getSession());
 
       link.setSource(link.getRemoteSource());
       link.setTarget(link.getRemoteTarget());
@@ -321,6 +322,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
       public void onRemoteClose(Connection connection) {
          synchronized (getLock()) {
             connection.close();
+            connection.free();
             for (AMQPSessionContext protonSession : sessions.values()) {
                protonSession.close();
             }
@@ -352,6 +354,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
       public void onRemoteClose(Session session) throws Exception {
          synchronized (getLock()) {
             session.close();
+            session.free();
          }
 
          AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
@@ -375,6 +378,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
       @Override
       public void onRemoteClose(Link link) throws Exception {
          link.close();
+         link.free();
          ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
          if (linkContext != null) {
             linkContext.close(true);
@@ -384,10 +388,11 @@ public class AMQPConnectionContext extends ProtonInitializable {
       @Override
       public void onRemoteDetach(Link link) throws Exception {
          link.detach();
+         link.free();
       }
 
       @Override
-      public void onDetach(Link link) throws Exception {
+      public void onLocalDetach(Link link) throws Exception {
          Object context = link.getContext();
          if (context instanceof ProtonServerSenderContext) {
             ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
@@ -402,10 +407,8 @@ public class AMQPConnectionContext extends ProtonInitializable {
             handler.onMessage(delivery);
          } else {
             // TODO: logs
-
             System.err.println("Handler is null, can't delivery " + delivery);
          }
       }
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/226f28ab/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpJmsSelectorFilter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpJmsSelectorFilter.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpJmsSelectorFilter.java
new file mode 100644
index 0000000..2e6ec2f
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpJmsSelectorFilter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+
+/**
+ * A Described Type wrapper for JMS selector values.
+ */
+public class AmqpJmsSelectorFilter implements DescribedType {
+
+   private final String selector;
+
+   public AmqpJmsSelectorFilter(String selector) {
+      this.selector = selector;
+   }
+
+   @Override
+   public Object getDescriptor() {
+      return AmqpSupport.JMS_SELECTOR_CODE;
+   }
+
+   @Override
+   public Object getDescribed() {
+      return this.selector;
+   }
+
+   @Override
+   public String toString() {
+      return "AmqpJmsSelectorType{" + selector + "}";
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/226f28ab/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpNoLocalFilter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpNoLocalFilter.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpNoLocalFilter.java
new file mode 100644
index 0000000..24f3ead
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpNoLocalFilter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+
+/**
+ * A Described Type wrapper for JMS no local option for MessageConsumer.
+ */
+public class AmqpNoLocalFilter implements DescribedType {
+
+   public static final AmqpNoLocalFilter NO_LOCAL = new AmqpNoLocalFilter();
+
+   private final String noLocal;
+
+   public AmqpNoLocalFilter() {
+      this.noLocal = "NoLocalFilter{}";
+   }
+
+   @Override
+   public Object getDescriptor() {
+      return AmqpSupport.NO_LOCAL_CODE;
+   }
+
+   @Override
+   public Object getDescribed() {
+      return this.noLocal;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/226f28ab/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 1ed5745..76279c5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -170,21 +170,46 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
          String clientId = connection.getRemoteContainer();
          String pubId = sender.getName();
          queue = createQueueName(clientId, pubId);
-         boolean exists = sessionSPI.queueQuery(queue, false).isExists();
+         QueueQueryResult result = sessionSPI.queueQuery(queue, false);
 
          // Once confirmed that the address exists we need to return a Source that reflects
          // the lifetime policy and capabilities of the new subscription.
-         //
-         // TODO we are not applying selector or noLocal filters to the source we just
-         // looked up which would violate expectations if the client checked that they
-         // are present on subscription recovery (JMS Durable Re-subscribe) etc
-         if (exists) {
+         if (result.isExists()) {
             source = new org.apache.qpid.proton.amqp.messaging.Source();
             source.setAddress(queue);
             source.setDurable(TerminusDurability.UNSETTLED_STATE);
             source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
             source.setDistributionMode(COPY);
             source.setCapabilities(TOPIC);
+
+            SimpleString filterString = result.getFilterString();
+            if (filterString != null) {
+               selector = filterString.toString();
+               boolean noLocal = false;
+
+               String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
+               String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString()
+ "<>'" + remoteContainerId + "'";
+
+               if (selector.endsWith(noLocalFilter)) {
+                  if (selector.length() > noLocalFilter.length()) {
+                     noLocalFilter = " AND " + noLocalFilter;
+                     selector = selector.substring(0, selector.length() - noLocalFilter.length());
+                  } else {
+                     selector = null;
+                  }
+
+                  noLocal = true;
+               }
+
+               if (noLocal) {
+                  supportedFilters.put(AmqpSupport.NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
+               }
+
+               if (selector != null && !selector.trim().isEmpty()) {
+                  supportedFilters.put(AmqpSupport.JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(selector));
+               }
+            }
+
             sender.setSource(source);
          } else {
             throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName());
@@ -228,7 +253,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
                } else {
                   sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
                }
-               source.setAddress(queue);
             } else {
                // otherwise we are a volatile subscription
                queue = java.util.UUID.randomUUID().toString();
@@ -237,7 +261,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
                } catch (Exception e) {
                   throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
                }
-               source.setAddress(queue);
             }
          } else {
             queue = source.getAddress();
@@ -308,7 +331,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
          // any durable resources for say pub subs
          if (remoteLinkClose) {
             Source source = (Source) sender.getSource();
-            if (source != null && source.getAddress() != null && hasCapabilities(TOPIC,
source)) {
+            if (source != null && source.getAddress() != null && (hasCapabilities(TOPIC,
source) || isPubSub(source))) {
                String queueName = source.getAddress();
                QueueQueryResult result = sessionSPI.queueQuery(queueName, false);
                if (result.isExists() && source.getDynamic()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/226f28ab/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
index 91c9a67..00bd27a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
@@ -69,7 +69,7 @@ public interface EventHandler {
 
    void onRemoteDetach(Link link) throws Exception;
 
-   void onDetach(Link link) throws Exception;
+   void onLocalDetach(Link link) throws Exception;
 
    void onDelivery(Delivery delivery) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/226f28ab/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
index 6552f64..405491a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
@@ -85,7 +85,7 @@ public final class Events {
             handler.onFinal(event.getLink());
             break;
          case LINK_LOCAL_DETACH:
-            handler.onDetach(event.getLink());
+            handler.onLocalDetach(event.getLink());
             break;
          case LINK_REMOTE_DETACH:
             handler.onRemoteDetach(event.getLink());
@@ -96,7 +96,8 @@ public final class Events {
          case DELIVERY:
             handler.onDelivery(event.getDelivery());
             break;
+         default:
+            break;
       }
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/226f28ab/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
new file mode 100644
index 0000000..e0c6b6c
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.engine.Receiver;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for broker side support of the Durable Subscription mapping for JMS.
+ */
+public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
+
+   private static final Logger LOG = LoggerFactory.getLogger(AmqpDurableReceiverTest.class);
+
+   private final String SELECTOR_STRING = "color = red";
+
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+      server.createQueue(new SimpleString(getTopicName()), new SimpleString(getTopicName()),
null, true, false);
+   }
+
+   @Test(timeout = 60000)
+   public void testCreateDurableReceiver() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.createConnection());
+      connection.setContainerId(getContainerID());
+      connection.connect();
+
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
+      receiver.flow(1);
+
+      assertEquals(getTopicName(), lookupSubscription());
+
+      AmqpSender sender = session.createSender(getTopicName());
+      AmqpMessage message = new AmqpMessage();
+      message.setMessageId("message:1");
+      sender.send(message);
+
+      message = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(message);
+
+      connection.close();
+
+      assertEquals(getTopicName(), lookupSubscription());
+   }
+
+   @Test(timeout = 60000)
+   public void testDetachedDurableReceiverRemainsActive() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.createConnection());
+      connection.setContainerId(getContainerID());
+      connection.connect();
+
+      AmqpSession session = connection.createSession();
+      AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
+
+      assertEquals(getTopicName(), lookupSubscription());
+
+      receiver.detach();
+
+      assertEquals(getTopicName(), lookupSubscription());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testCloseDurableReceiverRemovesSubscription() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.createConnection());
+      connection.setContainerId(getContainerID());
+      connection.connect();
+
+      AmqpSession session = connection.createSession();
+      AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
+
+      assertEquals(getTopicName(), lookupSubscription());
+
+      receiver.close();
+
+      assertNull(lookupSubscription());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testReattachToDurableNode() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.createConnection());
+      connection.setContainerId(getContainerID());
+      connection.connect();
+
+      AmqpSession session = connection.createSession();
+      AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
+
+      receiver.detach();
+
+      receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
+
+      receiver.close();
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testLookupExistingSubscription() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.createConnection());
+      connection.setContainerId(getContainerID());
+      connection.connect();
+
+      AmqpSession session = connection.createSession();
+      AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
+
+      receiver.detach();
+
+      receiver = session.lookupSubscription(getSubscriptionName());
+
+      assertNotNull(receiver);
+
+      Receiver protonReceiver = receiver.getReceiver();
+      assertNotNull(protonReceiver.getRemoteSource());
+      Source remoteSource = (Source) protonReceiver.getRemoteSource();
+
+      if (remoteSource.getFilter() != null) {
+         assertFalse(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
+         assertFalse(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
+      }
+
+      assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
+      assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
+      assertEquals(COPY, remoteSource.getDistributionMode());
+
+      receiver.close();
+
+      try {
+         receiver = session.lookupSubscription(getSubscriptionName());
+         fail("Should not be able to lookup the subscription");
+      } catch (Exception e) {
+      }
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testLookupExistingSubscriptionWithSelector() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.createConnection());
+      connection.setContainerId(getContainerID());
+      connection.connect();
+
+      AmqpSession session = connection.createSession();
+      AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName(),
SELECTOR_STRING, false);
+
+      receiver.detach();
+
+      receiver = session.lookupSubscription(getSubscriptionName());
+
+      assertNotNull(receiver);
+
+      Receiver protonReceiver = receiver.getReceiver();
+      assertNotNull(protonReceiver.getRemoteSource());
+      Source remoteSource = (Source) protonReceiver.getRemoteSource();
+
+      assertNotNull(remoteSource.getFilter());
+      assertFalse(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
+      assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
+      String selector = (String) ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed();
+      assertEquals(SELECTOR_STRING, selector);
+
+      assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
+      assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
+      assertEquals(COPY, remoteSource.getDistributionMode());
+
+      receiver.close();
+
+      try {
+         receiver = session.lookupSubscription(getSubscriptionName());
+         fail("Should not be able to lookup the subscription");
+      } catch (Exception e) {
+      }
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testLookupExistingSubscriptionWithNoLocal() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.createConnection());
+      connection.setContainerId(getContainerID());
+      connection.connect();
+
+      AmqpSession session = connection.createSession();
+      AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName(),
null, true);
+
+      receiver.detach();
+
+      receiver = session.lookupSubscription(getSubscriptionName());
+
+      assertNotNull(receiver);
+
+      Receiver protonReceiver = receiver.getReceiver();
+      assertNotNull(protonReceiver.getRemoteSource());
+      Source remoteSource = (Source) protonReceiver.getRemoteSource();
+
+      assertNotNull(remoteSource.getFilter());
+      assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
+      assertFalse(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
+
+      assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
+      assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
+      assertEquals(COPY, remoteSource.getDistributionMode());
+
+      receiver.close();
+
+      try {
+         receiver = session.lookupSubscription(getSubscriptionName());
+         fail("Should not be able to lookup the subscription");
+      } catch (Exception e) {
+      }
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testLookupExistingSubscriptionWithSelectorAndNoLocal() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.createConnection());
+      connection.setContainerId(getContainerID());
+      connection.connect();
+
+      AmqpSession session = connection.createSession();
+      AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName(),
SELECTOR_STRING, true);
+
+      receiver.detach();
+
+      receiver = session.lookupSubscription(getSubscriptionName());
+
+      assertNotNull(receiver);
+
+      Receiver protonReceiver = receiver.getReceiver();
+      assertNotNull(protonReceiver.getRemoteSource());
+      Source remoteSource = (Source) protonReceiver.getRemoteSource();
+
+      assertNotNull(remoteSource.getFilter());
+      assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
+      assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
+      String selector = (String) ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed();
+      assertEquals(SELECTOR_STRING, selector);
+
+      assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
+      assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
+      assertEquals(COPY, remoteSource.getDistributionMode());
+
+      receiver.close();
+
+      try {
+         receiver = session.lookupSubscription(getSubscriptionName());
+         fail("Should not be able to lookup the subscription");
+      } catch (Exception e) {
+      }
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testLookupNonExistingSubscription() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.createConnection());
+      connection.setContainerId(getContainerID());
+      connection.connect();
+
+      AmqpSession session = connection.createSession();
+
+      try {
+         session.lookupSubscription(getSubscriptionName());
+         fail("Should throw an exception since there is not subscription");
+      } catch (Exception e) {
+         LOG.info("Error on lookup: {}", e.getMessage());
+      }
+
+      connection.close();
+   }
+
+   public String lookupSubscription() {
+      Binding binding = server.getPostOffice().getBinding(new SimpleString(getContainerID()
+ "." + getSubscriptionName()));
+      if (binding != null) {
+         return binding.getAddress().toString();
+      }
+
+      return null;
+   }
+
+   public String getContainerID() {
+      return "myContainerID";
+   }
+
+   public String getSubscriptionName() {
+      return "mySubscription";
+   }
+
+   public String getTopicName() {
+      return "jms.topic.myTopic";
+   }
+}


Mime
View raw message