activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1101 Read the correct values for timestamp and user-id
Date Fri, 07 Apr 2017 20:37:21 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master a41a1316d -> bb6a41837


ARTEMIS-1101 Read the correct values for timestamp and user-id

Fix the getUserID and getTimestamp methods in AMQPMessage to read and
return the correct values.  Adds some tests to cover these cases and
cleans up some others.  

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3b45261f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3b45261f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3b45261f

Branch: refs/heads/master
Commit: 3b45261f1970871b63cd79ba9c9f92cadf599ce3
Parents: a41a131
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Apr 7 16:12:58 2017 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Apr 7 16:12:58 2017 -0400

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPMessage.java       |  30 ++--
 .../protocol/amqp/message/AMQPMessageTest.java  | 178 +++++++++++++++++--
 .../amqp/AmqpMessagePriorityTest.java           |   4 +-
 3 files changed, 179 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b45261f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index ffc3783..d076b21 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -37,6 +38,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
@@ -60,8 +62,8 @@ import io.netty.buffer.Unpooled;
 // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
 public class AMQPMessage extends RefCountMessage {
 
-   private static final int DEFAULT_MESSAGE_PRIORITY = 4;
-   private static final int MAX_MESSAGE_PRIORITY = 9;
+   public static final int DEFAULT_MESSAGE_PRIORITY = 4;
+   public static final int MAX_MESSAGE_PRIORITY = 9;
 
    final long messageFormat;
    ByteBuf data;
@@ -91,21 +93,18 @@ public class AMQPMessage extends RefCountMessage {
       this.messageFormat = messageFormat;
       this.bufferValid = true;
       parseHeaders();
-
    }
 
    /** for persistence reload */
    public AMQPMessage(long messageFormat) {
       this.messageFormat = messageFormat;
       this.bufferValid = false;
-
    }
 
    public AMQPMessage(long messageFormat, Message message) {
       this.messageFormat = messageFormat;
       this.protonMessage = (MessageImpl) message;
       this.bufferValid = false;
-
    }
 
    public AMQPMessage(Message message) {
@@ -171,7 +170,6 @@ public class AMQPMessage extends RefCountMessage {
          }
          this.appLocation = -1;
          TLSEncode.getDecoder().setByteBuffer(null);
-
       }
 
       return applicationProperties;
@@ -238,7 +236,6 @@ public class AMQPMessage extends RefCountMessage {
       return null;
    }
 
-
    private void setSymbol(String symbol, Object value) {
       setSymbol(Symbol.getSymbol(symbol), value);
    }
@@ -331,7 +328,6 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public synchronized boolean acceptsConsumer(long consumer) {
-
       if (rejectedConsumers == null) {
          return true;
       } else {
@@ -348,7 +344,6 @@ public class AMQPMessage extends RefCountMessage {
       rejectedConsumers.add(consumer);
    }
 
-
    private synchronized void partialDecode(ByteBuffer buffer) {
       DecoderImpl decoder = TLSEncode.getDecoder();
       decoder.setByteBuffer(buffer);
@@ -516,10 +511,11 @@ public class AMQPMessage extends RefCountMessage {
    @Override
    public Object getUserID() {
       Properties properties = getProperties();
-      if (properties != null && properties.getMessageId() != null) {
-         return properties.getMessageId();
+      if (properties != null && properties.getUserId() != null) {
+         Binary binary = properties.getUserId();
+         return new String(binary.getArray(), binary.getArrayOffset(), binary.getLength(),
StandardCharsets.UTF_8);
       } else {
-         return this;
+         return null;
       }
    }
 
@@ -585,8 +581,8 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public long getTimestamp() {
-      if (getHeader() != null && getHeader().getTtl() != null) {
-         return getHeader().getTtl().longValue();
+      if (getProperties() != null && getProperties().getCreationTime() != null) {
+         return getProperties().getCreationTime().getTime();
       } else {
          return 0L;
       }
@@ -594,7 +590,7 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public org.apache.activemq.artemis.api.core.Message setTimestamp(long timestamp) {
-      getHeader().setTtl(UnsignedInteger.valueOf(timestamp));
+      getProperties().setCreationTime(new Date(timestamp));
       return this;
    }
 
@@ -868,7 +864,6 @@ public class AMQPMessage extends RefCountMessage {
       return this;
    }
 
-
    @Override
    public void reencode() {
       if (_deliveryAnnotations != null) getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations);
@@ -879,8 +874,6 @@ public class AMQPMessage extends RefCountMessage {
       checkBuffer();
    }
 
-
-
    @Override
    public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException
{
       return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key));
@@ -995,7 +988,6 @@ public class AMQPMessage extends RefCountMessage {
       } else {
          return null;
       }
-
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b45261f/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
index db40a8e..496454b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
@@ -17,8 +17,14 @@
 
 package org.apache.activemq.artemis.protocol.amqp.message;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.commons.collections.map.HashedMap;
@@ -28,9 +34,11 @@ import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
-import org.junit.Assert;
 import org.junit.Test;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 public class AMQPMessageTest {
 
    @Test
@@ -44,20 +52,168 @@ public class AMQPMessageTest {
       protonMessage.getHeader().setDurable(Boolean.TRUE);
       protonMessage.setApplicationProperties(new ApplicationProperties(new HashedMap()));
 
-      ByteBuf nettyBuffer = Unpooled.buffer(1500);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
 
-      protonMessage.encode(new NettyWritable(nettyBuffer));
+      assertEquals(7, decoded.getHeader().getDeliveryCount().intValue());
+      assertEquals(true, decoded.getHeader().getDurable());
+      assertEquals("someNiceLocal", decoded.getAddress());
+   }
 
-      byte[] bytes = new byte[nettyBuffer.writerIndex()];
+   @Test
+   public void testGetAddressFromMessage() {
+      final String ADDRESS = "myQueue";
 
-      nettyBuffer.readBytes(bytes);
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setAddress(ADDRESS);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(ADDRESS, decoded.getAddress());
+   }
+
+   @Test
+   public void testGetAddressSimpleStringFromMessage() {
+      final String ADDRESS = "myQueue";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setAddress(ADDRESS);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(ADDRESS, decoded.getAddressSimpleString().toString());
+   }
+
+   @Test
+   public void testGetAddressFromMessageWithNoValueSet() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getAddress());
+      assertNull(decoded.getAddressSimpleString());
+   }
+
+   @Test
+   public void testIsDurableFromMessage() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setDurable(true);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertTrue(decoded.isDurable());
+   }
+
+   @Test
+   public void testIsDurableFromMessageWithNoValueSet() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertFalse(decoded.isDurable());
+   }
+
+   @Test
+   public void testGetGroupIDFromMessage() {
+      final String GROUP_ID = "group-1";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setGroupId(GROUP_ID);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(GROUP_ID, decoded.getGroupID().toString());
+   }
+
+   @Test
+   public void testGetGroupIDFromMessageWithNoGroupId() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getUserID());
+   }
+
+   @Test
+   public void testGetUserIDFromMessage() {
+      final String USER_NAME = "foo";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setUserId(USER_NAME.getBytes(StandardCharsets.UTF_8));
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(USER_NAME, decoded.getUserID());
+   }
+
+   @Test
+   public void testGetUserIDFromMessageWithNoUserID() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getUserID());
+   }
+
+   @Test
+   public void testGetPriorityFromMessage() {
+      final short PRIORITY = 7;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setPriority(PRIORITY);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(PRIORITY, decoded.getPriority());
+   }
+
+   @Test
+   public void testGetPriorityFromMessageWithNoPrioritySet() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
 
-      AMQPMessage encode = new AMQPMessage(0, bytes);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
 
-      Assert.assertEquals(7, encode.getHeader().getDeliveryCount().intValue());
-      Assert.assertEquals(true, encode.getHeader().getDurable());
-      Assert.assertEquals("someNiceLocal", encode.getAddress());
+      assertEquals(AMQPMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
+   }
 
+   @Test
+   public void testGetTimestampFromMessage() {
+      Date timestamp = new Date(System.currentTimeMillis());
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader( new Header());
+      Properties properties = new Properties();
+      properties.setCreationTime(timestamp);
+
+      protonMessage.setProperties(properties);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(timestamp.getTime(), decoded.getTimestamp());
+   }
+
+   @Test
+   public void testGetTimestampFromMessageWithNoCreateTimeSet() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader( new Header());
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0L, decoded.getTimestamp());
+   }
+
+   private AMQPMessage encodeAndDecodeMessage(MessageImpl message) {
+      ByteBuf nettyBuffer = Unpooled.buffer(1500);
+
+      message.encode(new NettyWritable(nettyBuffer));
+      byte[] bytes = new byte[nettyBuffer.writerIndex()];
+      nettyBuffer.readBytes(bytes);
 
+      return new AMQPMessage(0, bytes);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b45261f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java
index d1467b1..2b57354 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java
@@ -67,9 +67,8 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
       connection.close();
    }
 
-
    @Test(timeout = 60000)
-   public void testRestartServer() throws Exception {
+   public void testMessagePriorityPreservedAfterServerRestart() throws Exception {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
       AmqpSession session = connection.createSession();
@@ -81,7 +80,6 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
       message.setMessageId("MessageID:1");
       message.setPriority((short) 7);
 
-
       sender.send(message);
       sender.close();
       connection.close();


Mime
View raw message