activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-357 Avoiding possible races on encoding messages
Date Sat, 23 Jan 2016 06:01:57 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 66c1c210e -> 4a33d2d48


ARTEMIS-357 Avoiding possible races on encoding messages

https://issues.apache.org/jira/browse/ARTEMIS-357


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f5ec1521
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f5ec1521
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f5ec1521

Branch: refs/heads/master
Commit: f5ec15216e15e1fe26e1db2934abda928da7a55f
Parents: 66c1c21
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Sat Jan 23 00:06:05 2016 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Sat Jan 23 00:51:03 2016 -0500

----------------------------------------------------------------------
 .../artemis/core/message/impl/MessageImpl.java  | 37 +++-----------------
 .../impl/wireformat/SessionReceiveMessage.java  | 22 +++++++-----
 .../impl/wireformat/SessionSendMessage.java     | 27 +++++++++-----
 3 files changed, 37 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f5ec1521/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
index 783cf00..0c66aef 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
@@ -86,8 +86,6 @@ public abstract class MessageImpl implements MessageInternal {
 
    private boolean copied = true;
 
-   private boolean bufferUsed;
-
    private UUID userID;
 
    // Constructors --------------------------------------------------
@@ -157,8 +155,6 @@ public abstract class MessageImpl implements MessageInternal {
          copied = other.copied;
 
          if (other.buffer != null) {
-            other.bufferUsed = true;
-
             // We need to copy the underlying buffer too, since the different messsages thereafter
might have different
             // properties set on them, making their encoding different
             buffer = other.buffer.copy(0, other.buffer.writerIndex());
@@ -507,21 +503,7 @@ public abstract class MessageImpl implements MessageInternal {
    @Override
    public synchronized ActiveMQBuffer getEncodedBuffer() {
       ActiveMQBuffer buff = encodeToBuffer();
-
-      if (bufferUsed) {
-         ActiveMQBuffer copied = buff.copy(0, buff.capacity());
-
-         copied.setIndex(0, endOfMessagePosition);
-
-         return copied;
-      }
-      else {
-         buffer.setIndex(0, endOfMessagePosition);
-
-         bufferUsed = true;
-
-         return buffer;
-      }
+      return buff.duplicate();
    }
 
    @Override
@@ -935,9 +917,12 @@ public abstract class MessageImpl implements MessageInternal {
          buffer2 = new byte[bodyBuffer.writerIndex() - bodyBuffer.readerIndex()];
          bodyBuffer.readBytes(buffer2);
          bodyBuffer.readerIndex(readerIndex2);
+         return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex="
+ buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition()
+ " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2,
1);
+      }
+      else {
+         return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex="
+ buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition()
+ " buffer=" + ByteUtil.bytesToHex(buffer1, 1);
       }
 
-      return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "["
+ ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) +
", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1);
    }
 
    @Override
@@ -962,18 +947,8 @@ public abstract class MessageImpl implements MessageInternal {
    // many queues - the first caller in this case will actually encode it
    private synchronized ActiveMQBuffer encodeToBuffer() {
       if (!bufferValid) {
-         if (bufferUsed) {
-            // Cannot use same buffer - must copy
-
-            forceCopy();
-         }
-
          int bodySize = getEndOfBodyPosition();
 
-         // Clebert: I've started sending this on encoding due to conversions between protocols
-         //          and making sure we are not losing the buffer start position between
protocols
-         this.endOfBodyPosition = bodySize;
-
          // write it
          buffer.setInt(BUFFER_HEADER_SPACE, bodySize);
 
@@ -1032,8 +1007,6 @@ public abstract class MessageImpl implements MessageInternal {
       if (bodyBuffer != null) {
          bodyBuffer.setBuffer(buffer);
       }
-
-      bufferUsed = false;
    }
 
    // Inner classes -------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f5ec1521/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
index 83fe33c..bccc9df 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
@@ -56,27 +56,31 @@ public class SessionReceiveMessage extends MessagePacket {
    public ActiveMQBuffer encode(final RemotingConnection connection) {
       ActiveMQBuffer buffer = message.getEncodedBuffer();
 
+      ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex());
+      bufferWrite.writeBytes(buffer, 0, bufferWrite.capacity());
+      bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
+
       // Sanity check
-      if (buffer.writerIndex() != message.getEndOfMessagePosition()) {
+      if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) {
          throw new IllegalStateException("Wrong encode position");
       }
 
-      buffer.writeLong(consumerID);
-      buffer.writeInt(deliveryCount);
+      bufferWrite.writeLong(consumerID);
+      bufferWrite.writeInt(deliveryCount);
 
-      size = buffer.writerIndex();
+      size = bufferWrite.writerIndex();
 
       // Write standard headers
 
       int len = size - DataConstants.SIZE_INT;
-      buffer.setInt(0, len);
-      buffer.setByte(DataConstants.SIZE_INT, getType());
-      buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
+      bufferWrite.setInt(0, len);
+      bufferWrite.setByte(DataConstants.SIZE_INT, getType());
+      bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
 
       // Position reader for reading by Netty
-      buffer.setIndex(0, size);
+      bufferWrite.setIndex(0, size);
 
-      return buffer;
+      return bufferWrite;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f5ec1521/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
index 525f00a..300f8ed 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
@@ -63,28 +63,39 @@ public class SessionSendMessage extends MessagePacket {
    public ActiveMQBuffer encode(final RemotingConnection connection) {
       ActiveMQBuffer buffer = message.getEncodedBuffer();
 
+      ActiveMQBuffer bufferWrite;
+      if (connection == null) {
+         // this is for unit tests only
+         bufferWrite = buffer.copy(0, buffer.capacity());
+      }
+      else {
+         bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1); // 1 for
the requireResponse
+      }
+      bufferWrite.writeBytes(buffer, 0, buffer.writerIndex());
+      bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
+
       // Sanity check
-      if (buffer.writerIndex() != message.getEndOfMessagePosition()) {
+      if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) {
          throw new IllegalStateException("Wrong encode position");
       }
 
-      buffer.writeBoolean(requiresResponse);
+      bufferWrite.writeBoolean(requiresResponse);
 
-      size = buffer.writerIndex();
+      size = bufferWrite.writerIndex();
 
       // Write standard headers
 
       int len = size - DataConstants.SIZE_INT;
-      buffer.setInt(0, len);
-      buffer.setByte(DataConstants.SIZE_INT, getType());
-      buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
+      bufferWrite.setInt(0, len);
+      bufferWrite.setByte(DataConstants.SIZE_INT, getType());
+      bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
 
       // Position reader for reading by Netty
-      buffer.readerIndex(0);
+      bufferWrite.readerIndex(0);
 
       message.resetCopied();
 
-      return buffer;
+      return bufferWrite;
    }
 
    @Override


Mime
View raw message