activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1036 Streaming huge messages would cause OME
Date Thu, 23 Mar 2017 14:44:10 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master f798178c6 -> 2bcc255f4


ARTEMIS-1036 Streaming huge messages would cause OME


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/759d3b78
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/759d3b78
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/759d3b78

Branch: refs/heads/master
Commit: 759d3b78d98001e3709fdca94676d021909eb328
Parents: f798178
Author: Francesco Nigro <nigro.fra@gmail.com>
Authored: Wed Mar 15 16:59:57 2017 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Mar 23 10:42:44 2017 -0400

----------------------------------------------------------------------
 .../wireformat/SessionContinuationMessage.java  | 29 +++++++++++++++++++-
 .../SessionReceiveContinuationMessage.java      |  9 +++++-
 .../SessionSendContinuationMessage.java         |  8 +++++-
 3 files changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/759d3b78/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
index fcdd943..a57cdb4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
@@ -18,8 +18,11 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import java.util.Arrays;
 
+import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.DataConstants;
 
 public abstract class SessionContinuationMessage extends PacketImpl {
@@ -61,6 +64,30 @@ public abstract class SessionContinuationMessage extends PacketImpl {
       return continues;
    }
 
+   /**
+    * Returns the exact expected encoded size of {@code this} packet.
+    * It will be used to allocate the proper encoding buffer in {@link #createPacket}, hence
any
+    * wrong value will result in a thrown exception or a resize of the encoding
+    * buffer during the encoding process, depending to the implementation of {@link #createPacket}.
+    * Any child of {@code this} class are required to override this method if their encoded
size is changed
+    * from the base class.
+    *
+    * @return the size in bytes of the expected encoded packet
+    */
+   protected int expectedEncodedSize() {
+      return SESSION_CONTINUATION_BASE_SIZE + (body == null ? 0 : body.length);
+   }
+
+   @Override
+   protected final ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled)
{
+      final int expectedEncodedSize = expectedEncodedSize();
+      if (connection == null) {
+         return new ChannelBufferWrapper(Unpooled.buffer(expectedEncodedSize));
+      } else {
+         return connection.createTransportBuffer(expectedEncodedSize, usePooled);
+      }
+   }
+
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeInt(body.length);
@@ -110,4 +137,4 @@ public abstract class SessionContinuationMessage extends PacketImpl {
       return true;
    }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/759d3b78/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
index 9141ae1..44ad1bb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
@@ -67,6 +67,13 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag
       return consumerID;
    }
 
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected final int expectedEncodedSize() {
+      return super.expectedEncodedSize() + DataConstants.SIZE_LONG;
+   }
+
    // Public --------------------------------------------------------
 
    @Override
@@ -121,4 +128,4 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag
       return true;
    }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/759d3b78/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
index 0ecfe33..1c600e9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 /**
  * A SessionSendContinuationMessage<br>
@@ -92,6 +93,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage
{
    }
 
    @Override
+   protected final int expectedEncodedSize() {
+      return super.expectedEncodedSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN;
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
       if (!continues) {
@@ -154,4 +160,4 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage
{
    public SendAcknowledgementHandler getHandler() {
       return handler;
    }
-}
+}
\ No newline at end of file


Mime
View raw message