activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1095 Netty's WriteBufferWaterMark configuration via TransportConstants
Date Thu, 06 Apr 2017 13:31:53 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 2edc972c5 -> d0ae3f25a


ARTEMIS-1095 Netty's WriteBufferWaterMark configuration via TransportConstants


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f53449b9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f53449b9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f53449b9

Branch: refs/heads/master
Commit: f53449b945d2c7ddbd185063cd1ece051ed93990
Parents: 2edc972
Author: Francesco Nigro <nigro.fra@gmail.com>
Authored: Thu Apr 6 11:33:29 2017 +0200
Committer: Francesco Nigro <nigro.fra@gmail.com>
Committed: Thu Apr 6 15:26:13 2017 +0200

----------------------------------------------------------------------
 .../core/remoting/impl/netty/NettyConnector.java        | 12 +++++++++++-
 .../core/remoting/impl/netty/TransportConstants.java    | 12 ++++++++++++
 .../artemis/core/remoting/impl/netty/NettyAcceptor.java | 12 +++++++++++-
 docs/user-manual/en/configuring-transports.md           | 11 +++++++++++
 4 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f53449b9/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index f6935be..d31bdb2 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -58,6 +58,7 @@ import io.netty.channel.ChannelPipeline;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollSocketChannel;
@@ -211,6 +212,10 @@ public class NettyConnector extends AbstractConnector {
 
    private int tcpReceiveBufferSize;
 
+   private final int writeBufferLowWaterMark;
+
+   private final int writeBufferHighWaterMark;
+
    private long batchDelay;
 
    private ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<>();
@@ -336,7 +341,8 @@ public class NettyConnector extends AbstractConnector {
       tcpNoDelay = ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME,
TransportConstants.DEFAULT_TCP_NODELAY, configuration);
       tcpSendBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME,
TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, configuration);
       tcpReceiveBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME,
TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration);
-
+      this.writeBufferLowWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME,
TransportConstants.DEFAULT_WRITE_BUFFER_LOW_WATER_MARK, configuration);
+      this.writeBufferHighWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME,
TransportConstants.DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK, configuration);
       batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY, TransportConstants.DEFAULT_BATCH_DELAY,
configuration);
 
       connectTimeoutMillis = ConfigurationHelper.getIntProperty(TransportConstants.NETTY_CONNECT_TIMEOUT,
TransportConstants.DEFAULT_NETTY_CONNECT_TIMEOUT, configuration);
@@ -423,6 +429,10 @@ public class NettyConnector extends AbstractConnector {
       if (tcpSendBufferSize != -1) {
          bootstrap.option(ChannelOption.SO_SNDBUF, tcpSendBufferSize);
       }
+      final int writeBufferLowWaterMark = this.writeBufferLowWaterMark != -1 ? this.writeBufferLowWaterMark
: WriteBufferWaterMark.DEFAULT.low();
+      final int writeBufferHighWaterMark = this.writeBufferHighWaterMark != -1 ? this.writeBufferHighWaterMark
: WriteBufferWaterMark.DEFAULT.high();
+      final WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(writeBufferLowWaterMark,
writeBufferHighWaterMark);
+      bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark);
       bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
       bootstrap.option(ChannelOption.SO_REUSEADDR, true);
       channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f53449b9/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index 4317a68..69eaa94 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -125,6 +125,10 @@ public class TransportConstants {
     */
    public static final String NIO_REMOTING_THREADS_PROPNAME = "nioRemotingThreads";
 
+   public static final String WRITE_BUFFER_LOW_WATER_MARK_PROPNAME = "writeBufferLowWaterMark";
+
+   public static final String WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME = "writeBufferHighWaterMark";
+
    public static final String REMOTING_THREADS_PROPNAME = "RemotingThreads";
 
    public static final String BATCH_DELAY = "batchDelay";
@@ -183,6 +187,10 @@ public class TransportConstants {
 
    public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 1024 * 1024;
 
+   public static final int DEFAULT_WRITE_BUFFER_LOW_WATER_MARK = 32 * 1024;
+
+   public static final int DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK = 128 * 1024;
+
    public static final boolean DEFAULT_HTTP_ENABLED = false;
 
    public static final long DEFAULT_HTTP_CLIENT_IDLE_TIME = 500;
@@ -253,6 +261,8 @@ public class TransportConstants {
       allowableAcceptorKeys.add(TransportConstants.TCP_NODELAY_PROPNAME);
       allowableAcceptorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME);
       allowableAcceptorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
+      allowableAcceptorKeys.add(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME);
+      allowableAcceptorKeys.add(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME);
       allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
       allowableAcceptorKeys.add(TransportConstants.REMOTING_THREADS_PROPNAME);
       allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY);
@@ -303,6 +313,8 @@ public class TransportConstants {
       allowableConnectorKeys.add(TransportConstants.TCP_NODELAY_PROPNAME);
       allowableConnectorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME);
       allowableConnectorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
+      allowableConnectorKeys.add(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME);
+      allowableConnectorKeys.add(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME);
       allowableConnectorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
       allowableConnectorKeys.add(TransportConstants.REMOTING_THREADS_PROPNAME);
       allowableConnectorKeys.add(TransportConstants.BATCH_DELAY);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f53449b9/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index 7b17694..4f248f5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -44,6 +44,7 @@ import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.ServerChannel;
+import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollServerSocketChannel;
@@ -161,6 +162,10 @@ public class NettyAcceptor extends AbstractAcceptor {
 
    private final int tcpReceiveBufferSize;
 
+   private final int writeBufferLowWaterMark;
+
+   private final int writeBufferHighWaterMark;
+
    private int remotingThreads;
 
    private final ConcurrentMap<Object, NettyServerConnection> connections = new ConcurrentHashMap<>();
@@ -260,7 +265,8 @@ public class NettyAcceptor extends AbstractAcceptor {
       tcpNoDelay = ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME,
TransportConstants.DEFAULT_TCP_NODELAY, configuration);
       tcpSendBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME,
TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, configuration);
       tcpReceiveBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME,
TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration);
-
+      this.writeBufferLowWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME,
TransportConstants.DEFAULT_WRITE_BUFFER_LOW_WATER_MARK, configuration);
+      this.writeBufferHighWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME,
TransportConstants.DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK, configuration);
       this.scheduledThreadPool = scheduledThreadPool;
 
       batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY, TransportConstants.DEFAULT_BATCH_DELAY,
configuration);
@@ -341,6 +347,10 @@ public class NettyAcceptor extends AbstractAcceptor {
       if (tcpSendBufferSize != -1) {
          bootstrap.childOption(ChannelOption.SO_SNDBUF, tcpSendBufferSize);
       }
+      final int writeBufferLowWaterMark = this.writeBufferLowWaterMark != -1 ? this.writeBufferLowWaterMark
: WriteBufferWaterMark.DEFAULT.low();
+      final int writeBufferHighWaterMark = this.writeBufferHighWaterMark != -1 ? this.writeBufferHighWaterMark
: WriteBufferWaterMark.DEFAULT.high();
+      final WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(writeBufferLowWaterMark,
writeBufferHighWaterMark);
+      bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark);
       if (backlog != -1) {
          bootstrap.option(ChannelOption.SO_BACKLOG, backlog);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f53449b9/docs/user-manual/en/configuring-transports.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/configuring-transports.md b/docs/user-manual/en/configuring-transports.md
index eec4e09..d3adfef 100644
--- a/docs/user-manual/en/configuring-transports.md
+++ b/docs/user-manual/en/configuring-transports.md
@@ -239,6 +239,17 @@ Netty for simple TCP:
 -   `tcpReceiveBufferSize`. This parameter determines the size of the
     TCP receive buffer in bytes. The default value for this property is
     `32768` bytes (32KiB).
+    
+-   `writeBufferLowWaterMark`. This parameter determines the low water mark of 
+    the Netty write buffer. Once the number of bytes queued in the write buffer exceeded

+    the high water mark and then dropped down below this value, Netty's channel 
+    will start to be writable again. The default value for this property is 
+    `32768` bytes (32KiB).
+ 
+-   `writeBufferHighWaterMark`. This parameter determines the high water mark of 
+    the Netty write buffer. If the number of bytes queued in the write buffer exceeds 
+    this value, Netty's channel will start to be not writable. The default value for 
+    this property is `131072` bytes (128KiB).
 
 -   `batchDelay`. Before writing packets to the transport, Apache ActiveMQ Artemis can
     be configured to batch up writes for a maximum of `batchDelay`


Mime
View raw message