activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1036 Streaming huge messages between cluster nodes causes java.lang.OutOfMemoryError: Direct buffer memory
Date Wed, 29 Mar 2017 13:59:16 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x 20f5554e8 -> ea01aeb65


ARTEMIS-1036 Streaming huge messages between cluster nodes causes java.lang.OutOfMemoryError:
Direct buffer memory

(cherry picked from commit 1686b3545d14cdf591e00e6d04228b48b2b74a9f)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ec3ed04d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ec3ed04d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ec3ed04d

Branch: refs/heads/1.x
Commit: ec3ed04d0868ab9123926cbff7f3baab5280e5ff
Parents: 20f5554
Author: Francesco Nigro <nigro.fra@gmail.com>
Authored: Wed Mar 15 16:59:57 2017 +0100
Committer: Francesco Nigro <nigro.fra@gmail.com>
Committed: Wed Mar 29 15:09:25 2017 +0200

----------------------------------------------------------------------
 .../wireformat/SessionContinuationMessage.java  | 50 +++++++++++++++++++-
 .../SessionReceiveContinuationMessage.java      |  9 +++-
 .../SessionSendContinuationMessage.java         |  6 +++
 3 files changed, 63 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec3ed04d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
index fcdd943..faeed08 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
@@ -18,8 +18,11 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import java.util.Arrays;
 
+import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.DataConstants;
 
 public abstract class SessionContinuationMessage extends PacketImpl {
@@ -61,6 +64,51 @@ public abstract class SessionContinuationMessage extends PacketImpl {
       return continues;
    }
 
+   /**
+    * Returns the exact expected encoded size of {@code this} packet.
+    * It will be used to allocate the proper encoding buffer in {@link #createPacket}, hence
any
+    * wrong value will result in a thrown exception or a resize of the encoding
+    * buffer during the encoding process, depending to the implementation of {@link #createPacket}.
+    * Any child of {@code this} class are required to override this method if their encoded
size is changed
+    * from the base class.
+    *
+    * @return the size in bytes of the expected encoded packet
+    */
+   protected int expectedEncodedSize() {
+      return SESSION_CONTINUATION_BASE_SIZE + (body == null ? 0 : body.length);
+   }
+
+   @Override
+   public final ActiveMQBuffer encode(final RemotingConnection connection, boolean usePooled)
{
+      final ActiveMQBuffer buffer = createPacket(connection, usePooled);
+
+      // The standard header fields
+
+      buffer.writeInt(0); // The length gets filled in at the end
+      buffer.writeByte(getType());
+      buffer.writeLong(channelID);
+
+      encodeRest(buffer);
+
+      size = buffer.writerIndex();
+
+      // The length doesn't include the actual length byte
+      int len = size - DataConstants.SIZE_INT;
+
+      buffer.setInt(0, len);
+
+      return buffer;
+   }
+
+   protected final ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled)
{
+      final int expectedEncodedSize = expectedEncodedSize();
+      if (connection == null) {
+         return new ChannelBufferWrapper(Unpooled.buffer(expectedEncodedSize));
+      } else {
+         return connection.createTransportBuffer(expectedEncodedSize, usePooled);
+      }
+   }
+
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeInt(body.length);
@@ -110,4 +158,4 @@ public abstract class SessionContinuationMessage extends PacketImpl {
       return true;
    }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec3ed04d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
index 9141ae1..44ad1bb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
@@ -67,6 +67,13 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag
       return consumerID;
    }
 
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected final int expectedEncodedSize() {
+      return super.expectedEncodedSize() + DataConstants.SIZE_LONG;
+   }
+
    // Public --------------------------------------------------------
 
    @Override
@@ -121,4 +128,4 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag
       return true;
    }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec3ed04d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
index b4ec027..e718b3d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 /**
  * A SessionSendContinuationMessage<br>
@@ -92,6 +93,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage
{
    }
 
    @Override
+   protected final int expectedEncodedSize() {
+      return super.expectedEncodedSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN;
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
       if (!continues) {


Mime
View raw message