activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1098 Improve flow control while streaming large messages
Date Mon, 10 Apr 2017 18:00:12 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 73c79de8a -> 359592cf5


ARTEMIS-1098 Improve flow control while streaming large messages


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/da6b851c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/da6b851c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/da6b851c

Branch: refs/heads/master
Commit: da6b851c60329538f5f65ae83c9548c9bd0e40f9
Parents: 73c79de
Author: Francesco Nigro <nigro.fra@gmail.com>
Authored: Mon Apr 10 13:47:54 2017 +0200
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Apr 10 13:58:33 2017 -0400

----------------------------------------------------------------------
 .../core/client/ActiveMQClientLogger.java       |  5 ++
 .../core/impl/ActiveMQSessionContext.java       | 57 ++++++++++++--------
 2 files changed, 40 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/da6b851c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
index 0fe4a5a..748e508 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
@@ -310,6 +310,11 @@ public interface ActiveMQClientLogger extends BasicLogger {
       format = Message.Format.MESSAGE_FORMAT)
    void broadcastGroupBindError(String hostAndPort);
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 212057, value = "Large Message Streaming is taking too long to flush on
back pressure.",
+      format = Message.Format.MESSAGE_FORMAT)
+   void timeoutStreamingLargeMessage();
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT)
    void onMessageError(@Cause Throwable e);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/da6b851c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 6f92330..7799395 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -458,17 +459,7 @@ public class ActiveMQSessionContext extends SessionContext {
                                     byte[] chunk,
                                     int reconnectID,
                                     SendAcknowledgementHandler messageHandler) throws ActiveMQException
{
-      final boolean requiresResponse = lastChunk && sendBlocking;
-      final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI,
chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
-
-      if (requiresResponse) {
-         // When sending it blocking, only the last chunk will be blocking.
-         sessionChannel.sendBlocking(chunkPacket, reconnectID, PacketImpl.NULL_RESPONSE);
-      } else {
-         sessionChannel.send(chunkPacket, reconnectID);
-      }
-
-      return chunkPacket.getPacketSize();
+      return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize,
sendBlocking, lastChunk, chunk, messageHandler);
    }
 
    @Override
@@ -478,17 +469,7 @@ public class ActiveMQSessionContext extends SessionContext {
                                           boolean lastChunk,
                                           byte[] chunk,
                                           SendAcknowledgementHandler messageHandler) throws
ActiveMQException {
-      final boolean requiresResponse = lastChunk && sendBlocking;
-      final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI,
chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
-
-      if (requiresResponse) {
-         // When sending it blocking, only the last chunk will be blocking.
-         sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
-      } else {
-         sessionChannel.send(chunkPacket);
-      }
-
-      return chunkPacket.getPacketSize();
+      return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize,
sendBlocking, lastChunk, chunk, messageHandler);
    }
 
    @Override
@@ -813,6 +794,38 @@ public class ActiveMQSessionContext extends SessionContext {
       }
    }
 
+   private static int sendSessionSendContinuationMessage(Channel channel,
+                                                         Message msgI,
+                                                         long messageBodySize,
+                                                         boolean sendBlocking,
+                                                         boolean lastChunk,
+                                                         byte[] chunk,
+                                                         SendAcknowledgementHandler messageHandler)
throws ActiveMQException {
+      final boolean requiresResponse = lastChunk && sendBlocking;
+      final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI,
chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
+      final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
+      //perform a weak form of flow control to avoid OOM on tight loops
+      final CoreRemotingConnection connection = channel.getConnection();
+      final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout());
+      final long startFlowControl = System.nanoTime();
+      final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis);
+      if (!isWritable) {
+         final long endFlowControl = System.nanoTime();
+         final long elapsedFlowControl = endFlowControl - startFlowControl;
+         final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
+         ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
+         logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis
+ " ms on a not writable connection: [" + connection.getID() + "]");
+      }
+      if (requiresResponse) {
+         // When sending it blocking, only the last chunk will be blocking.
+         channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
+      } else {
+         channel.send(chunkPacket);
+      }
+      return chunkPacket.getPacketSize();
+   }
+
+
    class ClientSessionPacketHandler implements ChannelHandler {
 
       @Override


Mime
View raw message