activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-776 Attach response should only list supported filters
Date Fri, 07 Oct 2016 22:39:20 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 736886fc1 -> 1bb3c1536


ARTEMIS-776 Attach response should only list supported filters

The broker needs to return only the filters that are supported on a
receiver attach otherwise the remote is not aware that the broker is not
able to honor the requested configuration of the receiver.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/398da40f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/398da40f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/398da40f

Branch: refs/heads/master
Commit: 398da40f7dd17b97838f4a2b377d7b19b82c6c7a
Parents: 736886f
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Oct 7 17:29:46 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Oct 7 17:35:27 2016 -0400

----------------------------------------------------------------------
 .../amqp/proton/ProtonServerSenderContext.java  | 206 ++++++++++---------
 .../integration/amqp/AmqpClientTestSupport.java |   5 +-
 .../amqp/AmqpReceiverWithFiltersTest.java       | 117 +++++++++++
 3 files changed, 228 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/398da40f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 94e6a47..1ed5745 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
@@ -59,7 +60,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
 
    private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
 
-   private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector");
    private static final Symbol COPY = Symbol.valueOf("copy");
    private static final Symbol TOPIC = Symbol.valueOf("topic");
 
@@ -72,10 +72,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
    protected final AMQPSessionCallback sessionSPI;
    protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
 
-   public ProtonServerSenderContext(AMQPConnectionContext connection,
-                                    Sender sender,
-                                    AMQPSessionContext protonSession,
-                                    AMQPSessionCallback server) {
+   public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext
protonSession, AMQPSessionCallback server) {
       super();
       this.connection = connection;
       this.sender = sender;
@@ -98,20 +95,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
    }
 
    /*
-* start the session
-* */
+    * start the session
+    */
    public void start() throws ActiveMQAMQPException {
       sessionSPI.start();
       // protonSession.getServerSession().start();
 
-      //todo add flow control
+      // todo add flow control
       try {
          // to do whatever you need to make the broker start sending messages to the consumer
-         //this could be null if a link reattach has happened
+         // this could be null if a link reattach has happened
          if (brokerConsumer != null) {
             sessionSPI.startSender(brokerConsumer);
          }
-         //protonSession.getServerSession().receiveConsumerCredits(consumerID, -1);
+         // protonSession.getServerSession().receiveConsumerCredits(consumerID, -1);
       } catch (Exception e) {
          throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage());
       }
@@ -120,20 +117,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
    /**
     * create the actual underlying ActiveMQ Artemis Server Consumer
     */
+   @SuppressWarnings("unchecked")
    @Override
    public void initialise() throws Exception {
       super.initialise();
 
       Source source = (Source) sender.getRemoteSource();
-
       String queue;
-
       String selector = null;
+      final Map<Symbol, Object> supportedFilters = new HashMap<>();
 
-      /*
-      * even tho the filter is a map it will only return a single filter unless a nolocal
is also provided
-      * */
       if (source != null) {
+         // We look for message selectors on every receiver, while in other cases we might
only
+         // consume the filter depending on the subscription type.
          Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(),
AmqpSupport.JMS_SELECTOR_FILTER_IDS);
          if (filter != null) {
             selector = filter.getValue().getDescribed().toString();
@@ -144,17 +140,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
                close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
                return;
             }
+
+            supportedFilters.put(filter.getKey(), filter.getValue());
          }
       }
 
-      /*
-      * if we have a capability for a topic (qpid-jms) or we are configured on this address
to act like a topic then act
-      * like a subscription.
-      * */
+      // if we have a capability for a Topic (AMQP -> JMS Mapping) or we are configured
on this
+      // address to act like a topic then act like a subscription.
       boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
 
       if (isPubSub) {
-         if (AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS)
!= null) {
+         Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(),
AmqpSupport.NO_LOCAL_FILTER_IDS);
+         if (filter != null) {
             String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
             String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString()
+ "<>'" + remoteContainerId + "'";
             if (selector != null) {
@@ -162,20 +159,25 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
             } else {
                selector = noLocalFilter;
             }
+
+            supportedFilters.put(filter.getKey(), filter.getValue());
          }
       }
 
       if (source == null) {
-         // Attempt to recover a previous subscription happens when a link reattach happens
on a subscription queue
+         // Attempt to recover a previous subscription happens when a link reattach happens
on a
+         // subscription queue
          String clientId = connection.getRemoteContainer();
          String pubId = sender.getName();
          queue = createQueueName(clientId, pubId);
          boolean exists = sessionSPI.queueQuery(queue, false).isExists();
 
-         /*
-         * If it exists then we know it is a subscription so we set the capabilities on the
source so we can delete on a
-         * link remote close.
-         * */
+         // Once confirmed that the address exists we need to return a Source that reflects
+         // the lifetime policy and capabilities of the new subscription.
+         //
+         // TODO we are not applying selector or noLocal filters to the source we just
+         // looked up which would violate expectations if the client checked that they
+         // are present on subscription recovery (JMS Durable Re-subscribe) etc
          if (exists) {
             source = new org.apache.qpid.proton.amqp.messaging.Source();
             source.setAddress(queue);
@@ -187,80 +189,87 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
          } else {
             throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName());
          }
+      } else if (source.getDynamic()) {
+         // if dynamic we have to create the node (queue) and set the address on the target,
the
+         // node is temporary and  will be deleted on closing of the session
+         queue = java.util.UUID.randomUUID().toString();
+         try {
+            sessionSPI.createTemporaryQueue(queue);
+            // protonSession.getServerSession().createQueue(queue, queue, null, true, false);
+         } catch (Exception e) {
+            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
+         }
+         source.setAddress(queue);
       } else {
-         if (source.getDynamic()) {
-            //if dynamic we have to create the node (queue) and set the address on the target,
the node is temporary and
-            // will be deleted on closing of the session
-            queue = java.util.UUID.randomUUID().toString();
-            try {
-               sessionSPI.createTemporaryQueue(queue);
-               //protonSession.getServerSession().createQueue(queue, queue, null, true, false);
-            } catch (Exception e) {
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
-            }
-            source.setAddress(queue);
-         } else {
-            //if not dynamic then we use the targets address as the address to forward the
messages to, however there has to
-            //be a queue bound to it so we nee to check this.
-            if (isPubSub) {
-               // if we are a subscription and durable create a durable queue using the container
id and link name
-               if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable()))
{
-                  String clientId = connection.getRemoteContainer();
-                  String pubId = sender.getName();
-                  queue = createQueueName(clientId, pubId);
-                  QueueQueryResult result = sessionSPI.queueQuery(queue, false);
-
-                  if (result.isExists()) {
-                     // If a client reattaches to a durable subscription with a different
no-local filter value, selector
-                     // or address then we must recreate the queue (JMS semantics).
-
-                     if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector))
|| (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString())))
{
-                        if (result.getConsumerCount() == 0) {
-                           sessionSPI.deleteQueue(queue);
-                           sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
-                        } else {
-                           throw new ActiveMQAMQPIllegalStateException("Unable to recreate
subscription, consumers already exist");
-                        }
+         // if not dynamic then we use the target's address as the address to forward the
+         // messages to, however there has to be a queue bound to it so we need to check
this.
+         if (isPubSub) {
+            // if we are a subscription and durable create a durable queue using the container
+            // id and link name
+            if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable()))
{
+               String clientId = connection.getRemoteContainer();
+               String pubId = sender.getName();
+               queue = createQueueName(clientId, pubId);
+               QueueQueryResult result = sessionSPI.queueQuery(queue, false);
+
+               if (result.isExists()) {
+                  // If a client reattaches to a durable subscription with a different no-local
+                  // filter value, selector or address then we must recreate the queue (JMS
semantics).
+                  if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector))
||
+                     (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString())))
{
+
+                     if (result.getConsumerCount() == 0) {
+                        sessionSPI.deleteQueue(queue);
+                        sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
+                     } else {
+                        throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription,
consumers already exist");
                      }
-                  } else {
-                     sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
                   }
-                  source.setAddress(queue);
                } else {
-                  //otherwise we are a volatile subscription
-                  queue = java.util.UUID.randomUUID().toString();
-                  try {
-                     sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector);
-                  } catch (Exception e) {
-                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
-                  }
-                  source.setAddress(queue);
+                  sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
                }
+               source.setAddress(queue);
             } else {
-               queue = source.getAddress();
-            }
-            if (queue == null) {
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
-            }
-
-            try {
-               if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) {
-                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
+               // otherwise we are a volatile subscription
+               queue = java.util.UUID.randomUUID().toString();
+               try {
+                  sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector);
+               } catch (Exception e) {
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
                }
-            } catch (ActiveMQAMQPNotFoundException e) {
-               throw e;
-            } catch (Exception e) {
-               throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+               source.setAddress(queue);
             }
+         } else {
+            queue = source.getAddress();
+         }
+
+         if (queue == null) {
+            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
          }
 
-         boolean browseOnly = !isPubSub && source.getDistributionMode() != null &&
source.getDistributionMode().equals(COPY);
          try {
-            brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector,
browseOnly);
+            if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) {
+               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
+            }
+         } catch (ActiveMQAMQPNotFoundException e) {
+            throw e;
          } catch (Exception e) {
-            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
+            throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
          }
       }
+
+      // We need to update the source with any filters we support otherwise the client
+      // is free to consider the attach as having failed if we don't send back what we
+      // do support or if we send something we don't support the client won't know we
+      // have not honored what it asked for.
+      source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
+
+      boolean browseOnly = !isPubSub && source.getDistributionMode() != null &&
source.getDistributionMode().equals(COPY);
+      try {
+         brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector,
browseOnly);
+      } catch (Exception e) {
+         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
+      }
    }
 
    private boolean isPubSub(Source source) {
@@ -269,8 +278,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
    }
 
    /*
-   * close the session
-   * */
+    * close the session
+    */
    @Override
    public void close(ErrorCondition condition) throws ActiveMQAMQPException {
       closed = true;
@@ -289,14 +298,14 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
    }
 
    /*
-   * close the session
-   * */
+    * close the session
+    */
    @Override
    public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
       try {
          sessionSPI.closeSender(brokerConsumer);
-         //if this is a link close rather than a connection close or detach, we need to delete
any durable resources for
-         // say pub subs
+         // if this is a link close rather than a connection close or detach, we need to
delete
+         // any durable resources for say pub subs
          if (remoteLinkClose) {
             Source source = (Source) sender.getSource();
             if (source != null && source.getAddress() != null && hasCapabilities(TOPIC,
source)) {
@@ -324,7 +333,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
       }
    }
 
-
    @Override
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
       Object message = delivery.getContext();
@@ -349,7 +357,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
 
                      delivery.disposition(txAccepted);
                   }
-                  //we have to individual ack as we can't guarantee we will get the delivery
updates (including acks) in order
+                  // we have to individual ack as we can't guarantee we will get the delivery
+                  // updates (including acks) in order
                   // from dealer, a perf hit but a must
                   try {
                      sessionSPI.ack(tx, brokerConsumer, message);
@@ -359,7 +368,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
                }
             }
          } else if (remoteState instanceof Accepted) {
-            //we have to individual ack as we can't guarantee we will get the delivery updates
(including acks) in order
+            // we have to individual ack as we can't guarantee we will get the delivery updates
+            // (including acks) in order
             // from dealer, a perf hit but a must
             try {
                sessionSPI.ack(null, brokerConsumer, message);
@@ -379,7 +389,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(),
e.getMessage());
             }
          }
-         //todo add tag caching
+         // todo add tag caching
          if (!preSettle) {
             protonSession.replaceTag(delivery.getTag());
          }
@@ -390,7 +400,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
          }
 
       } else {
-         //todo not sure if we need to do anything here
+         // todo not sure if we need to do anything here
       }
    }
 
@@ -417,7 +427,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
          }
       }
 
-      // presettle means we can settle the message on the dealer side before we send it,
i.e. for browsers
+      // presettle means we can settle the message on the dealer side before we send it,
i.e.
+      // for browsers
       boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
 
       // we only need a tag if we are going to settle later
@@ -478,5 +489,4 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
    private static String createQueueName(String clientId, String pubId) {
       return clientId + "." + pubId;
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/398da40f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index f0f52ba..2c7ce6f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -48,6 +48,7 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
    @Override
    public void setUp() throws Exception {
       super.setUp();
+
       server = createServer(true, true);
       server.start();
    }
@@ -55,8 +56,6 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
    @After
    @Override
    public void tearDown() throws Exception {
-      super.tearDown();
-
       for (AmqpConnection conn : connections) {
          try {
             conn.close();
@@ -65,6 +64,8 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
          }
       }
       server.stop();
+
+      super.tearDown();
    }
 
    public Queue getProxyToQueue(String queueName) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/398da40f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.java
new file mode 100644
index 0000000..2c24382
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.transport.amqp.client.AmqpUnknownFilterType;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.engine.Receiver;
+import org.junit.Test;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception {
+      AmqpClient client = createAmqpClient();
+
+      client.setValidator(new AmqpValidator() {
+
+         @SuppressWarnings("unchecked")
+         @Override
+         public void inspectOpenedResource(Receiver receiver) {
+
+            if (receiver.getRemoteSource() == null) {
+               markAsInvalid("Link opened with null source.");
+            }
+
+            Source source = (Source) receiver.getRemoteSource();
+            Map<Symbol, Object> filters = source.getFilter();
+
+            if (findFilter(filters, AmqpUnknownFilterType.UNKNOWN_FILTER_IDS) != null) {
+               markAsInvalid("Broker should not return unsupported filter on attach.");
+            }
+         }
+      });
+
+      Map<Symbol, DescribedType> filters = new HashMap<>();
+      filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKNOWN_FILTER);
+
+      Source source = new Source();
+      source.setAddress(getTestName());
+      source.setFilter(filters);
+      source.setDurable(TerminusDurability.NONE);
+      source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      session.createReceiver(source);
+
+      assertEquals(1, server.getTotalConsumerCount());
+
+      connection.getStateInspector().assertValid();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testSupportedFiltersAreListedAsSupported() throws Exception {
+      AmqpClient client = createAmqpClient();
+
+      client.setValidator(new AmqpValidator() {
+
+         @SuppressWarnings("unchecked")
+         @Override
+         public void inspectOpenedResource(Receiver receiver) {
+
+            if (receiver.getRemoteSource() == null) {
+               markAsInvalid("Link opened with null source.");
+            }
+
+            Source source = (Source) receiver.getRemoteSource();
+            Map<Symbol, Object> filters = source.getFilter();
+
+            if (findFilter(filters, AmqpSupport.JMS_SELECTOR_FILTER_IDS) == null) {
+               markAsInvalid("Broker should return selector filter on attach.");
+            }
+         }
+      });
+
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      session.createReceiver(getTestName(), "color = red");
+
+      connection.getStateInspector().assertValid();
+      connection.close();
+   }
+}


Mime
View raw message