activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1359 Skip re-encode Headers on messages if not needed
Date Wed, 23 Aug 2017 02:07:38 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 47485ce80 -> 8d3f48438


ARTEMIS-1359 Skip re-encode Headers on messages if not needed

Only reencode the Header on a Message when the redelivering the Message
to avoid overhead and unneeded modification to the original encoding of
the Header.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/cb9482d9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/cb9482d9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/cb9482d9

Branch: refs/heads/master
Commit: cb9482d9fafbc2f554752a636573d7dad19a408b
Parents: 47485ce
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Aug 21 17:35:51 2017 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Aug 22 22:02:41 2017 -0400

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPMessage.java       | 66 +++++++++++++-------
 1 file changed, 45 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cb9482d9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index b475208..93b6145 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -75,8 +75,13 @@ public class AMQPMessage extends RefCountMessage {
    MessageImpl protonMessage;
    private volatile int memoryEstimate = -1;
    private long expiration = 0;
-   // this is to store where to start sending bytes, ignoring header and delivery annotations.
-   private int sendFrom = 0;
+
+   // Records where the Header section ends if present.
+   private int headerEnds = 0;
+
+   // Records where the message payload starts, ignoring DeliveryAnnotations if present
+   private int messagePaylodStart = 0;
+
    private boolean parsedHeaders = false;
    private Header _header;
    private DeliveryAnnotations _deliveryAnnotations;
@@ -130,14 +135,15 @@ public class AMQPMessage extends RefCountMessage {
    private void initalizeObjects() {
       if (protonMessage == null) {
          if (data == null) {
-            this.sendFrom = 0;
+            headerEnds = 0;
+            messagePaylodStart = 0;
             _header = new Header();
             _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
             _properties = new Properties();
-            this.applicationProperties = new ApplicationProperties(new HashMap<>());
-            this.protonMessage = (MessageImpl) Message.Factory.create();
-            this.protonMessage.setApplicationProperties(applicationProperties);
-            this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
+            applicationProperties = new ApplicationProperties(new HashMap<>());
+            protonMessage = (MessageImpl) Message.Factory.create();
+            protonMessage.setApplicationProperties(applicationProperties);
+            protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
          }
       }
    }
@@ -364,8 +370,9 @@ public class AMQPMessage extends RefCountMessage {
          }
 
          if (section instanceof Header) {
-            sendFrom = buffer.position();
             _header = (Header) section;
+            headerEnds = buffer.position();
+            messagePaylodStart = headerEnds;
 
             if (_header.getTtl() != null) {
                this.expiration = System.currentTimeMillis() + _header.getTtl().intValue();
@@ -376,13 +383,17 @@ public class AMQPMessage extends RefCountMessage {
             } else {
                section = null;
             }
+
          } else {
             // meaning there is no header
-            sendFrom = 0;
+            headerEnds = 0;
          }
          if (section instanceof DeliveryAnnotations) {
             _deliveryAnnotations = (DeliveryAnnotations) section;
-            sendFrom = buffer.position();
+
+            // Advance the start beyond the delivery annotations so they are not written
+            // out on send of the message.
+            messagePaylodStart = buffer.position();
 
             if (buffer.hasRemaining()) {
                section = (Section) decoder.readObject();
@@ -463,10 +474,14 @@ public class AMQPMessage extends RefCountMessage {
       checkBuffer();
 
       byte[] origin = data.array();
-      byte[] newData = new byte[data.array().length - sendFrom];
-      for (int i = 0; i < newData.length; i++) {
-         newData[i] = origin[i + sendFrom];
-      }
+      byte[] newData = new byte[data.array().length - (messagePaylodStart - headerEnds)];
+
+      // Copy the original header
+      System.arraycopy(origin, 0, newData, 0, headerEnds);
+
+      // Copy the body following the delivery annotations if present
+      System.arraycopy(origin, messagePaylodStart, newData, headerEnds, data.array().length
- messagePaylodStart);
+
       AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData);
       newEncode.setDurable(isDurable());
       return newEncode;
@@ -550,7 +565,7 @@ public class AMQPMessage extends RefCountMessage {
       }
 
       if (getHeader() != null && getHeader().getDurable() != null) {
-         durable =  getHeader().getDurable().booleanValue();
+         durable = getHeader().getDurable();
          return durable;
       } else {
          return durable != null ? durable : false;
@@ -652,26 +667,35 @@ public class AMQPMessage extends RefCountMessage {
    public int getEncodeSize() {
       checkBuffer();
       // + 20checkBuffer is an estimate for the Header with the deliveryCount
-      return data.array().length - sendFrom + 20;
+      return data.array().length - messagePaylodStart + 20;
    }
 
    @Override
    public void sendBuffer(ByteBuf buffer, int deliveryCount) {
       checkBuffer();
+
+      int amqpDeliveryCount = deliveryCount - 1;
+
       Header header = getHeader();
-      if (header == null && deliveryCount > 0) {
+      if (header == null && (amqpDeliveryCount > 0)) {
          header = new Header();
          header.setDurable(durable);
       }
-      if (header != null) {
+
+      // If the re-delivering the message then the header must be re-encoded
+      // otherwise we want to write the original header if present.
+      if (amqpDeliveryCount > 0) {
          synchronized (header) {
-            header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount - 1));
+            header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount));
             TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer));
             TLSEncode.getEncoder().writeObject(header);
             TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
          }
+      } else if (headerEnds > 0) {
+         buffer.writeBytes(data, 0, headerEnds);
       }
-      buffer.writeBytes(data, sendFrom, data.writerIndex() - sendFrom);
+
+      buffer.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
    }
 
    @Override
@@ -1045,7 +1069,7 @@ public class AMQPMessage extends RefCountMessage {
       int size = record.readInt();
       byte[] recordArray = new byte[size];
       record.readBytes(recordArray);
-      this.sendFrom = 0; // whatever was persisted will be sent
+      this.messagePaylodStart = 0; // whatever was persisted will be sent
       this.data = Unpooled.wrappedBuffer(recordArray);
       this.bufferValid = true;
       this.durable = true; // it's coming from the journal, so it's durable


Mime
View raw message