activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-888 - AMQP headers arent always set
Date Thu, 15 Dec 2016 13:16:08 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 88f4a8cce -> ceb2b38c8


ARTEMIS-888 - AMQP headers arent always set

https://issues.apache.org/jira/browse/ARTEMIS-888


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3b75c954
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3b75c954
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3b75c954

Branch: refs/heads/master
Commit: 3b75c954135fdc58b172a381403cb7daa223b602
Parents: 88f4a8c
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Wed Dec 14 16:09:36 2016 +0000
Committer: Andy Taylor <andy.tayls67@gmail.com>
Committed: Thu Dec 15 07:07:04 2016 +0000

----------------------------------------------------------------------
 .../converter/message/AMQPMessageSupport.java   |   4 +
 .../converter/message/InboundTransformer.java   |   4 +
 .../message/JMSMappingOutboundTransformer.java  |  14 ++
 .../transport/amqp/client/AmqpMessage.java      |  19 +++
 .../integration/amqp/AmqpSendReceiveTest.java   | 157 +++++++++++++++++++
 5 files changed, 198 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b75c954/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
index 9eab737..8c4612d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
@@ -69,12 +69,16 @@ public final class AMQPMessageSupport {
    public static final String CONTENT_TYPE = "ContentType";
    public static final String CONTENT_ENCODING = "ContentEncoding";
    public static final String REPLYTO_GROUP_ID = "ReplyToGroupID";
+   public static final String DURABLE = "DURABLE";
+   public static final String PRIORITY = "PRIORITY";
 
    public static final String DELIVERY_ANNOTATION_PREFIX = "DA_";
    public static final String MESSAGE_ANNOTATION_PREFIX = "MA_";
    public static final String FOOTER_PREFIX = "FT_";
 
    public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER;
+   public static final String JMS_AMQP_HEADER_DURABLE = JMS_AMQP_PREFIX + HEADER + DURABLE;
+   public static final String JMS_AMQP_HEADER_PRIORITY = JMS_AMQP_PREFIX + HEADER + PRIORITY;
    public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES;
    public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING;
    public static final String JMS_AMQP_MESSAGE_FORMAT = JMS_AMQP_PREFIX + MESSAGE_FORMAT;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b75c954/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
index 5094af5..9c40cd8 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
@@ -22,6 +22,8 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMe
 import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
 import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
 import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
 import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
 import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
 
@@ -76,12 +78,14 @@ public abstract class InboundTransformer {
          jms.setBooleanProperty(JMS_AMQP_HEADER, true);
 
          if (header.getDurable() != null) {
+            jms.setBooleanProperty(JMS_AMQP_HEADER_DURABLE, true);
             jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT
: DeliveryMode.NON_PERSISTENT);
          } else {
             jms.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
          }
 
          if (header.getPriority() != null) {
+            jms.setBooleanProperty(JMS_AMQP_HEADER_PRIORITY, true);
             jms.setJMSPriority(header.getPriority().intValue());
          } else {
             jms.setJMSPriority(Message.DEFAULT_PRIORITY);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b75c954/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
index 2fa7145..7dbc6d4 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
@@ -32,6 +32,8 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMe
 import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
 import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
 import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
 import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
 import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
 import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE;
@@ -287,6 +289,18 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer
{
                   header = new Header();
                }
                continue;
+            } else if (key.equals(JMS_AMQP_HEADER_DURABLE)) {
+               if (header == null) {
+                  header = new Header();
+               }
+               header.setDurable(message.getInnerMessage().isDurable());
+               continue;
+            } else if (key.equals(JMS_AMQP_HEADER_PRIORITY)) {
+               if (header == null) {
+                  header = new Header();
+               }
+               header.setPriority(UnsignedByte.valueOf(priority));
+               continue;
             } else if (key.startsWith(JMS_AMQP_PROPERTIES)) {
                if (properties == null) {
                   properties = new Properties();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b75c954/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index f7a9364..5cf2c0a 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -402,6 +402,25 @@ public class AmqpMessage {
    }
 
    /**
+    * Sets the priority header on the outgoing message.
+    *
+    * @param priority the priority value to set.
+    */
+   public void setPriority(short priority) {
+      checkReadOnly();
+      lazyCreateHeader();
+      getWrappedMessage().setPriority(priority);
+   }
+
+   /**
+    * Sets the priority header on the outgoing message.
+    */
+   public short getPriority() {
+      return getWrappedMessage().getPriority();
+   }
+
+
+   /**
     * Sets a given application property on an outbound message.
     *
     * @param key   the name to assign the new property.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b75c954/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index e102c77..b9d5504 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -239,6 +239,126 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 60000)
+   public void testMessageDurableFalse() throws Exception {
+      sendMessages(getTestName(), 1, false);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(receive);
+      assertFalse(receive.isDurable());
+      receiver.close();
+
+      assertEquals(1, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageDurableTrue() throws Exception {
+      sendMessages(getTestName(), 1, true);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(receive);
+      assertTrue(receive.isDurable());
+      receiver.close();
+
+      assertEquals(1, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageDefaultPriority() throws Exception {
+      sendMessages(getTestName(), 1, (short) 4);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(receive);
+      assertEquals((short) 4, receive.getPriority());
+      receiver.close();
+
+      assertEquals(1, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageNonDefaultPriority() throws Exception {
+      sendMessages(getTestName(), 1, (short) 0);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(receive);
+      assertEquals((short) 0, receive.getPriority());
+      receiver.close();
+
+      assertEquals(1, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageNoPriority() throws Exception {
+      sendMessages(getTestName(), 1);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(receive);
+      assertEquals((short) 4, receive.getPriority());
+      receiver.close();
+
+      assertEquals(1, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
    public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception
{
       int MSG_COUNT = 4;
       sendMessages(getTestName(), MSG_COUNT);
@@ -940,4 +1060,41 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
          connection.close();
       }
    }
+
+
+   public void sendMessages(String destinationName, int count, boolean durable) throws Exception
{
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(destinationName);
+
+         for (int i = 0; i < count; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("MessageID:" + i);
+            message.setDurable(durable);
+            sender.send(message);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   public void sendMessages(String destinationName, int count, short priority) throws Exception
{
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(destinationName);
+
+         for (int i = 0; i < count; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("MessageID:" + i);
+            message.setPriority(priority);
+            sender.send(message);
+         }
+      } finally {
+         connection.close();
+      }
+   }
 }


Mime
View raw message