activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-494 - implement filters correctly
Date Wed, 20 Apr 2016 12:56:09 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 17fe4707f -> d142c2b52


ARTEMIS-494 - implement filters correctly

took this code from ActiveMQ which was over hawled in this area

https://issues.apache.org/jira/browse/ARTEMIS-494


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f47bb6cc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f47bb6cc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f47bb6cc

Branch: refs/heads/master
Commit: f47bb6cc4b79cdf4f1538966fb58872d9c34e449
Parents: 17fe470
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Wed Apr 20 12:11:49 2016 +0100
Committer: Andy Taylor <andy.tayls67@gmail.com>
Committed: Wed Apr 20 12:14:04 2016 +0100

----------------------------------------------------------------------
 .../main/java/org/proton/plug/AmqpSupport.java  | 129 +++++++++++++++++++
 .../context/AbstractProtonContextSender.java    |  11 ++
 .../context/AbstractProtonReceiverContext.java  |   7 +
 .../plug/context/ProtonDeliveryHandler.java     |   3 +
 .../plug/context/ProtonTransactionHandler.java  |   5 +
 .../server/ProtonServerSenderContext.java       |  45 ++++++-
 6 files changed, 196 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f47bb6cc/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
new file mode 100644
index 0000000..f57fd81
--- /dev/null
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.proton.plug;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+import java.util.AbstractMap;
+import java.util.Map;
+
+/**
+ * Set of useful methods and definitions used in the AMQP protocol handling
+ */
+public class AmqpSupport {
+
+   // Identification values used to locating JMS selector types.
+   public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L);
+   public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string");
+   public static final Object[] JMS_SELECTOR_FILTER_IDS = new Object[]{JMS_SELECTOR_CODE,
JMS_SELECTOR_NAME};
+   public static final UnsignedLong NO_LOCAL_CODE = UnsignedLong.valueOf(0x0000468C00000003L);
+   public static final Symbol NO_LOCAL_NAME = Symbol.valueOf("apache.org:no-local-filter:list");
+   public static final Object[] NO_LOCAL_FILTER_IDS = new Object[]{NO_LOCAL_CODE, NO_LOCAL_NAME};
+
+   // Capabilities used to identify destination type in some requests.
+   public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
+   public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
+
+   // Symbols used to announce connection information to remote peer.
+   public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field");
+   public static final Symbol CONTAINER_ID = Symbol.valueOf("container-id");
+
+   // Symbols used to announce connection information to remote peer.
+   public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+   public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
+   public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
+   public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
+   public static final Symbol PRODUCT = Symbol.valueOf("product");
+   public static final Symbol VERSION = Symbol.valueOf("version");
+   public static final Symbol PLATFORM = Symbol.valueOf("platform");
+
+   // Symbols used in configuration of newly opened links.
+   public static final Symbol COPY = Symbol.getSymbol("copy");
+
+   // Lifetime policy symbols
+   public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
+
+   /**
+    * Search for a given Symbol in a given array of Symbol object.
+    *
+    * @param symbols
+    *        the set of Symbols to search.
+    * @param key
+    *        the value to try and find in the Symbol array.
+    *
+    * @return true if the key is found in the given Symbol array.
+    */
+   public static boolean contains(Symbol[] symbols, Symbol key) {
+      if (symbols == null || symbols.length == 0) {
+         return false;
+      }
+
+      for (Symbol symbol : symbols) {
+         if (symbol.equals(key)) {
+            return true;
+         }
+      }
+
+      return false;
+   }
+
+   /**
+    * Search for a particular filter using a set of known indentification values
+    * in the Map of filters.
+    *
+    * @param filters
+    *        The filters map that should be searched.
+    * @param filterIds
+    *        The aliases for the target filter to be located.
+    *
+    * @return the filter if found in the mapping or null if not found.
+    */
+   public static Map.Entry<Symbol, DescribedType> findFilter(Map<Symbol, Object>
filters, Object[] filterIds) {
+
+      if (filterIds == null || filterIds.length == 0) {
+         StringBuilder ids = new StringBuilder();
+         if (filterIds != null) {
+            for (Object filterId : filterIds) {
+               ids.append(filterId).append(" ");
+            }
+         }
+         throw new IllegalArgumentException("Invalid Filter Ids array passed: " + ids);
+      }
+
+      if (filters == null || filters.isEmpty()) {
+         return null;
+      }
+
+      for (Map.Entry<Symbol, Object> filter : filters.entrySet()) {
+         if (filter.getValue() instanceof DescribedType) {
+            DescribedType describedType = ((DescribedType) filter.getValue());
+            Object descriptor = describedType.getDescriptor();
+
+            for (Object filterId : filterIds) {
+               if (descriptor.equals(filterId)) {
+                  return new AbstractMap.SimpleImmutableEntry<>(filter.getKey(), describedType);
+               }
+            }
+         }
+      }
+
+      return null;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f47bb6cc/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
index baf0710..7a4d295 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
@@ -18,6 +18,7 @@ package org.proton.plug.context;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Sender;
@@ -76,6 +77,16 @@ public abstract class AbstractProtonContextSender extends ProtonInitializable
im
       connection.flush();
    }
 
+   /*
+   * close the session
+   * */
+   @Override
+   public void close(ErrorCondition condition) throws ActiveMQAMQPException {
+      closed = true;
+      sender.setCondition(condition);
+      close();
+   }
+
    @Override
    /*
    * handle an incoming Ack from Proton, basically pass to ActiveMQ Artemis to handle

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f47bb6cc/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
index 2cb38b3..8481853 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
@@ -16,6 +16,7 @@
  */
 package org.proton.plug.context;
 
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Receiver;
 import org.proton.plug.AMQPSessionCallback;
 import org.proton.plug.exceptions.ActiveMQAMQPException;
@@ -56,6 +57,12 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
       protonSession.removeReceiver(receiver);
    }
 
+   @Override
+   public void close(ErrorCondition condition) throws ActiveMQAMQPException {
+      receiver.setCondition(condition);
+      close();
+   }
+
    public void flow(int credits) {
       synchronized (connection.getLock()) {
          receiver.flow(credits);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f47bb6cc/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
index 63bc277..128ea65 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
@@ -16,6 +16,7 @@
  */
 package org.proton.plug.context;
 
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Delivery;
 import org.proton.plug.exceptions.ActiveMQAMQPException;
 
@@ -29,4 +30,6 @@ public interface ProtonDeliveryHandler {
    void onMessage(Delivery delivery) throws ActiveMQAMQPException;
 
    void close() throws ActiveMQAMQPException;
+
+   void close(ErrorCondition condition) throws ActiveMQAMQPException;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f47bb6cc/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
index 5e5115f..6a9ad6a 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
@@ -119,4 +119,9 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler
{
    public void close() throws ActiveMQAMQPException {
       //noop
    }
+
+   @Override
+   public void close(ErrorCondition condition) throws ActiveMQAMQPException {
+      //noop
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f47bb6cc/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
index db8b409..dfc69df 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
@@ -18,13 +18,17 @@ package org.proton.plug.context.server;
 
 import java.util.Map;
 
+import org.apache.activemq.artemis.selector.filter.FilterException;
+import org.apache.activemq.artemis.selector.impl.SelectorParser;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Modified;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Sender;
@@ -39,6 +43,9 @@ import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.proton.plug.context.ProtonPlugSender;
 import org.apache.qpid.proton.amqp.messaging.Source;
 
+import static org.proton.plug.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
+import static org.proton.plug.AmqpSupport.findFilter;
+
 public class ProtonServerSenderContext extends AbstractProtonContextSender implements ProtonPlugSender
{
 
    private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector");
@@ -94,14 +101,29 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender
imple
       String queue;
 
       String selector = null;
-      Map filter = source == null ? null : source.getFilter();
+
+      /*
+      * even tho the filter is a map it will only return a single filter unless a nolocal
is also provided
+      * */
+      Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
       if (filter != null) {
-         DescribedType value = (DescribedType) filter.get(SELECTOR);
-         if (value != null) {
-            selector = value.getDescribed().toString();
+         selector = filter.getValue().getDescribed().toString();
+         // Validate the Selector.
+         try {
+            SelectorParser.parse(selector);
+         }
+         catch (FilterException e) {
+            close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
+            return;
          }
       }
 
+      //filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
+
+      //if (filter != null) {
+         //todo implement nolocal filter
+      //}
+
       if (source != null) {
          if (source.getDynamic()) {
             //if dynamic we have to create the node (queue) and set the address on the target,
the node is temporary and
@@ -148,6 +170,21 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender
imple
    * close the session
    * */
    @Override
+   public void close(ErrorCondition condition) throws ActiveMQAMQPException {
+      super.close(condition);
+      try {
+         sessionSPI.closeSender(brokerConsumer);
+      }
+      catch (Exception e) {
+         e.printStackTrace();
+         throw new ActiveMQAMQPInternalErrorException(e.getMessage());
+      }
+   }
+
+   /*
+   * close the session
+   * */
+   @Override
    public void close() throws ActiveMQAMQPException {
       super.close();
       try {


Mime
View raw message