activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2372 Filtering on Message Annotations
Date Tue, 28 Apr 2020 15:23:30 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ebf3c8  ARTEMIS-2372 Filtering on Message Annotations
     new 1a83b13  This closes #2695
2ebf3c8 is described below

commit 2ebf3c8e1b90558987d7f169c11f4e7560cf72cf
Author: Clebert Suconic <clebertsuconic@apache.org>
AuthorDate: Wed Jun 5 16:10:38 2019 -0400

    ARTEMIS-2372 Filtering on Message Annotations
---
 .../apache/activemq/artemis/api/core/Message.java  |  4 ++
 .../artemis/protocol/amqp/broker/AMQPMessage.java  | 13 ++++
 .../artemis/core/filter/impl/FilterImpl.java       |  2 +-
 .../integration/amqp/AmqpExpiredMessageTest.java   | 69 +++++++++++++++++++++-
 .../integration/amqp/AmqpSendReceiveTest.java      | 39 ++++++++++++
 5 files changed, 123 insertions(+), 4 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index a8b54c7..568cdda 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -614,6 +614,10 @@ public interface Message {
 
    Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException;
 
+   default Object getObjectPropertyForFilter(SimpleString key) {
+      return getObjectProperty(key);
+   }
+
    Object getObjectProperty(SimpleString key);
 
    default Object removeAnnotation(SimpleString key) {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 64fec6e..e0cd94b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -273,6 +273,19 @@ public abstract class AMQPMessage extends RefCountMessage implements
org.apache.
       return protonMessage;
    }
 
+   @Override
+   public Object getObjectPropertyForFilter(SimpleString key) {
+      Object value = getObjectProperty(key);
+      if (value == null) {
+         value = getMessageAnnotation(key.toString());
+      }
+      if (value == null) {
+         value = getExtraBytesProperty(key);
+      }
+
+      return value;
+   }
+
    /**
     * Returns a copy of the message Header if one is present, changes to the returned
     * Header instance do not affect the original Message.
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
index da36d6f..288a91c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
@@ -239,7 +239,7 @@ public class FilterImpl implements Filter {
             result = bytes == null ? null : ByteUtil.bytesToInt(bytes);
          }
          if (result == null) {
-            result = message.getObjectProperty(id);
+            result = message.getObjectPropertyForFilter(id);
          }
          if (result != null) {
             if (result.getClass() == SimpleString.class) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
index 759a854..54458b2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
@@ -27,6 +27,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
@@ -120,6 +121,68 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
       connection.close();
    }
 
+   /** This test is validating a broker feature where the message copy through the DLQ will
receive an annotation.
+    *  It is also testing filter on that annotation. */
+   @Test(timeout = 60000)
+   public void testExpiryThroughTTLValidateAnnotation() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getQueueName());
+
+      // Get the Queue View early to avoid racing the delivery.
+      final Queue queueView = getProxyToQueue(getQueueName());
+      assertNotNull(queueView);
+
+      AmqpMessage message = new AmqpMessage();
+      message.setTimeToLive(1);
+      message.setText("Test-Message");
+      message.setDurable(true);
+      message.setApplicationProperty("key1", "Value1");
+      sender.send(message);
+      sender.close();
+
+      Thread.sleep(100);
+
+      // Now try and get the message
+      AmqpReceiver receiver = session.createReceiver(getQueueName());
+      receiver.flow(1);
+      AmqpMessage received = receiver.receiveNoWait();
+      assertNull(received);
+
+      Wait.assertEquals(1, queueView::getMessagesExpired);
+
+      connection.close();
+
+      // This will stop and start the server
+      // to make sure the message is decoded again from its binary format
+      // avoiding any parsing cached at the server.
+      server.stop();
+      server.start();
+
+      final Queue dlqView = getProxyToQueue(getDeadLetterAddress());
+      assertNotNull(dlqView);
+      Wait.assertEquals(1, dlqView::getMessageCount);
+
+      client = createAmqpClient();
+      connection = addConnection(client.connect());
+      session = connection.createSession();
+
+      AmqpReceiver receiverDLQ = session.createReceiver(getDeadLetterAddress(), "_AMQ_ORIG_ADDRESS='"
+ getQueueName() + "'");
+      receiverDLQ.flow(1);
+      received = receiverDLQ.receive(5, TimeUnit.SECONDS);
+      Assert.assertNotNull(received);
+      received.accept();
+
+      assertNotNull("Should have read message from DLQ", received);
+      assertEquals(0, received.getTimeToLive());
+      assertNotNull(received);
+      assertEquals("Value1", received.getApplicationProperty("key1"));
+
+      connection.close();
+   }
+
    @Test(timeout = 60000)
    public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception {
       AmqpClient client = createAmqpClient();
@@ -272,7 +335,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
       sender.send(message);
       sender.close();
 
-      assertEquals(1, queueView.getMessageCount());
+      Wait.assertEquals(1, queueView::getMessageCount);
 
       // Now try and get the message
       AmqpReceiver receiver = session.createReceiver(getQueueName());
@@ -280,7 +343,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
       AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
       assertNotNull(received);
 
-      assertEquals(0, queueView.getMessagesExpired());
+      Wait.assertEquals(0, queueView::getMessagesExpired);
 
       connection.close();
    }
@@ -305,7 +368,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
 
       Thread.sleep(50);
 
-      assertEquals(1, queueView.getMessageCount());
+      Wait.assertEquals(1, queueView::getMessageCount);
 
       // Now try and get the message
       AmqpReceiver receiver = session.createReceiver(getQueueName());
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index cb1db92..85c304c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -385,6 +385,45 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 60000)
+   public void testSendFilterAnnotation() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getQueueName());
+
+      AmqpMessage message = new AmqpMessage();
+
+      message.setMessageId("msg" + 1);
+      message.setMessageAnnotation("serialNo", 1);
+      message.setText("Test-Message");
+      sender.send(message);
+
+      message = new AmqpMessage();
+      message.setMessageId("msg" + 2);
+      message.setMessageAnnotation("serialNo", 2);
+      message.setText("Test-Message 2");
+      sender.send(message);
+      sender.close();
+
+      LOG.debug("Attempting to read message with receiver");
+      AmqpReceiver receiver = session.createReceiver(getQueueName(), "serialNo=2");
+      receiver.flow(2);
+      AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+      assertNotNull("Should have read message", received);
+      assertEquals("msg2", received.getMessageId());
+      received.accept();
+
+      Assert.assertNull(receiver.receiveNoWait());
+
+      receiver.close();
+
+      connection.close();
+   }
+
+
+   @Test(timeout = 60000)
    public void testCloseBusyReceiver() throws Exception {
       final int MSG_COUNT = 20;
 


Mime
View raw message