From commits-return-7881-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Tue Jul 23 13:56:02 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id C28341802C7 for ; Tue, 23 Jul 2019 15:56:01 +0200 (CEST) Received: (qmail 59648 invoked by uid 500); 23 Jul 2019 13:56:00 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 59633 invoked by uid 99); 23 Jul 2019 13:56:00 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Jul 2019 13:56:00 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 9426E85E6B; Tue, 23 Jul 2019 13:56:00 +0000 (UTC) Date: Tue, 23 Jul 2019 13:56:00 +0000 To: "commits@zookeeper.apache.org" Subject: [zookeeper] branch master updated: ZOOKEEPER-3356: Implement advanced Netty flow control based on feedback from ZK MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <156389016042.1074.5693318440027115192@gitbox.apache.org> From: eolivelli@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: zookeeper X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 4212f865d67b9c7860c1da071f3997b0a5b5387e X-Git-Newrev: ce33b7faed60ea0b7c6f2eb1edbf56eec20a8bc2 X-Git-Rev: ce33b7faed60ea0b7c6f2eb1edbf56eec20a8bc2 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zookeeper.git The following commit(s) were added to refs/heads/master by this push: new ce33b7f ZOOKEEPER-3356: Implement advanced Netty flow control based on feedback from ZK ce33b7f is described below commit ce33b7faed60ea0b7c6f2eb1edbf56eec20a8bc2 Author: Fangmin Lyu AuthorDate: Tue Jul 23 15:55:47 2019 +0200 ZOOKEEPER-3356: Implement advanced Netty flow control based on feedback from ZK The current implementation of enable/disable recv logic may cause the direct buffer OOM because we may enable read a large chunk and disabled again after consume a single ZK request. This implementation disabled AUTO_READ and controls the READ depends on whether the SslHandler has issued a READ and what's the queuedBuffer status. With this implementation, the max Netty queued buffer size (direct memory usage) will be 2 * recv_buffer size. It's not the per message size because in EPoll ET mode it will try to read until the socket is empty, and because of SslHandler will trigger another read when it's not a full encrypt packet and haven't issued any decrypt message. Author: Fangmin Lyu Reviewers: Enrico Olivelli , Andor Molnar Closes #919 from lvfangmin/ZOOKEEPER-3356 --- .../src/main/resources/markdown/zookeeperAdmin.md | 6 + .../apache/zookeeper/server/NettyServerCnxn.java | 19 ++- .../zookeeper/server/NettyServerCnxnFactory.java | 86 ++++++++++++- .../org/apache/zookeeper/server/ServerMetrics.java | 4 + .../zookeeper/server/NettyServerCnxnTest.java | 140 ++++++++++++++++++++- .../org/apache/zookeeper/test/SSLAuthTest.java | 51 ++++---- 6 files changed, 274 insertions(+), 32 deletions(-) diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index 7d3be6a..c887f36 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -887,6 +887,12 @@ property, when available, is noted below. **New in 3.6.0:** The time (in milliseconds) the RequestThrottler waits for the request queue to drain during shutdown before it shuts down forcefully. The default is 10000. +* *advancedFlowControlEnabled* : + (Java system property: **zookeeper.netty.advancedFlowControl.enabled**) + Using accurate flow control in netty based on the status of ZooKeeper + pipeline to avoid direct buffer OOM. It will disable the AUTO_READ in + Netty. + #### Cluster Options diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index 3a3442f..33161e4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -68,6 +68,8 @@ public class NettyServerCnxn extends ServerCnxn { private final NettyServerCnxnFactory factory; private boolean initialized; + public int readIssuedAfterReadComplete; + NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) { super(zks); this.channel = channel; @@ -321,6 +323,7 @@ public class NettyServerCnxn extends ServerCnxn { queuedBuffer.consolidate(); } queuedBuffer.addComponent(true, buf); + ServerMetrics.getMetrics().NETTY_QUEUED_BUFFER.add(queuedBuffer.capacity()); } /** @@ -553,11 +556,11 @@ public class NettyServerCnxn extends ServerCnxn { } /** - * An event that triggers a change in the channel's "Auto Read" setting. + * An event that triggers a change in the channel's read setting. * Used for throttling. By using an enum we can treat the two values as * singletons and compare with ==. */ - enum AutoReadEvent { + enum ReadEvent { DISABLE, ENABLE } @@ -573,7 +576,7 @@ public class NettyServerCnxn extends ServerCnxn { if (LOG.isDebugEnabled()) { LOG.debug("Throttling - disabling recv {}", this); } - channel.pipeline().fireUserEventTriggered(AutoReadEvent.DISABLE); + channel.pipeline().fireUserEventTriggered(ReadEvent.DISABLE); } } @@ -583,7 +586,7 @@ public class NettyServerCnxn extends ServerCnxn { if (LOG.isDebugEnabled()) { LOG.debug("Sending unthrottle event {}", this); } - channel.pipeline().fireUserEventTriggered(AutoReadEvent.ENABLE); + channel.pipeline().fireUserEventTriggered(ReadEvent.ENABLE); } } @@ -659,4 +662,12 @@ public class NettyServerCnxn extends ServerCnxn { Channel getChannel() { return channel; } + + public int getQueuedReadableBytes() { + checkIsInEventLoop("getQueuedReadableBytes"); + if (queuedBuffer != null) { + return queuedBuffer.readableBytes(); + } + return 0; + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java index 9a05df6..e3ae959 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java @@ -104,6 +104,9 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { int listenBacklog = -1; private final ClientX509Util x509Util; + public static final String NETTY_ADVANCED_FLOW_CONTROL = "zookeeper.netty.advancedFlowControl.enabled"; + private boolean advancedFlowControlEnabled = false; + private static final AttributeKey CONNECTION_ATTRIBUTE = AttributeKey.valueOf("NettyServerCnxn"); @@ -239,18 +242,31 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { try { - if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) { - LOG.debug("Received AutoReadEvent.ENABLE"); + if (evt == NettyServerCnxn.ReadEvent.ENABLE) { + LOG.debug("Received ReadEvent.ENABLE"); NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive() // or exceptionCaught() trigger, but it's unclear to me if userEventTriggered() can run // after either of those. Check for null just to be safe ... if (cnxn != null) { - cnxn.processQueuedBuffer(); + if (cnxn.getQueuedReadableBytes() > 0) { + cnxn.processQueuedBuffer(); + if (advancedFlowControlEnabled && + cnxn.getQueuedReadableBytes() == 0) { + // trigger a read if we have consumed all + // backlog + ctx.read(); + if (LOG.isDebugEnabled()) { + LOG.debug("Issued a read after queuedBuffer drained"); + } + } + } + } + if (!advancedFlowControlEnabled) { + ctx.channel().config().setAutoRead(true); } - ctx.channel().config().setAutoRead(true); - } else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) { - LOG.debug("Received AutoReadEvent.DISABLE"); + } else if (evt == NettyServerCnxn.ReadEvent.DISABLE) { + LOG.debug("Received ReadEvent.DISABLE"); ctx.channel().config().setAutoRead(false); } } finally { @@ -283,6 +299,23 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { } } + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + if (advancedFlowControlEnabled) { + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); + if (cnxn != null && cnxn.getQueuedReadableBytes() == 0 && + cnxn.readIssuedAfterReadComplete == 0) { + ctx.read(); + if (LOG.isDebugEnabled()) { + LOG.debug("Issued a read since we don't have " + + "anything to consume after channelReadComplete"); + } + } + } + + ctx.fireChannelReadComplete(); + } + // Use a single listener instance to reduce GC // Note: this listener is only added when LOG.isTraceEnabled() is true, // so it should not do any work other than trace logging. @@ -375,8 +408,33 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { } } } + + @Sharable + static class ReadIssuedTrackingHandler extends ChannelDuplexHandler { + + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); + if (cnxn != null) { + cnxn.readIssuedAfterReadComplete++; + } + + ctx.read(); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); + if (cnxn != null) { + cnxn.readIssuedAfterReadComplete = 0; + } + + ctx.fireChannelReadComplete(); + } + } CnxnChannelHandler channelHandler = new CnxnChannelHandler(); + ReadIssuedTrackingHandler readIssuedTrackingHandler = new ReadIssuedTrackingHandler(); private ServerBootstrap configureBootstrapAllocator(ServerBootstrap bootstrap) { ByteBufAllocator testAllocator = TEST_ALLOCATOR.get(); @@ -404,6 +462,9 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { } this.shouldUsePortUnification = usePortUnification; + this.advancedFlowControlEnabled = Boolean.getBoolean(NETTY_ADVANCED_FLOW_CONTROL); + LOG.info("{} = {}", NETTY_ADVANCED_FLOW_CONTROL, this.advancedFlowControlEnabled); + EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup( NettyUtils.getClientReachableLocalInetAddressCount()); EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup(); @@ -419,6 +480,9 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); + if (advancedFlowControlEnabled) { + pipeline.addLast(readIssuedTrackingHandler); + } if (secure) { initSSL(pipeline, false); } else if (shouldUsePortUnification) { @@ -700,4 +764,14 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { static void clearTestAllocator() { TEST_ALLOCATOR.set(null); } + + // VisibleForTest + public void setAdvancedFlowControlEnabled(boolean advancedFlowControlEnabled) { + this.advancedFlowControlEnabled = advancedFlowControlEnabled; + } + + // VisibleForTest + public void setSecure(boolean secure) { + this.secure = secure; + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java index ca05f94..9573a4a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java @@ -225,6 +225,8 @@ public final class ServerMetrics { STALE_REQUESTS_DROPPED = metricsContext.getCounter("stale_requests_dropped"); STALE_REPLIES = metricsContext.getCounter("stale_replies"); REQUEST_THROTTLE_WAIT_COUNT = metricsContext.getCounter("request_throttle_wait_count"); + + NETTY_QUEUED_BUFFER = metricsContext.getSummary("netty_queued_buffer_capacity", DetailLevel.BASIC); } /** @@ -424,6 +426,8 @@ public final class ServerMetrics { public final Counter STALE_REPLIES; public final Counter REQUEST_THROTTLE_WAIT_COUNT; + public final Summary NETTY_QUEUED_BUFFER; + private final MetricsProvider metricsProvider; public void resetAll() { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java index 068cb29..1ce7e05 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java @@ -18,14 +18,17 @@ package org.apache.zookeeper.server; - +import org.apache.zookeeper.AsyncCallback.DataCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.common.ClientX509Util; +import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.test.TestByteBufAllocator; import org.apache.zookeeper.server.quorum.BufferStats; import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.SSLAuthTest; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -34,7 +37,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.ProtocolException; import java.nio.charset.StandardCharsets; +import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -203,4 +210,135 @@ public class NettyServerCnxnTest extends ClientBase { assertArrayEquals("unexpected data", "test".getBytes(StandardCharsets.UTF_8), contents); } } + + @Test + public void testEnableDisableThrottling_secure_random() throws Exception { + runEnableDisableThrottling(true, true); + } + + @Test + public void testEnableDisableThrottling_secure_sequentially() throws Exception { + runEnableDisableThrottling(true, false); + } + + @Test + public void testEnableDisableThrottling_nonSecure_random() throws Exception { + runEnableDisableThrottling(false, true); + } + + @Test + public void testEnableDisableThrottling_nonSecure_sequentially() throws Exception { + runEnableDisableThrottling(false, false); + } + + private void runEnableDisableThrottling(boolean secure, boolean randomDisableEnable) throws Exception { + ClientX509Util x509Util = null; + if (secure) { + x509Util = SSLAuthTest.setUpSecure(); + } + try { + NettyServerCnxnFactory factory = (NettyServerCnxnFactory) serverFactory; + factory.setAdvancedFlowControlEnabled(true); + if (secure) { + factory.setSecure(true); + } + + final String path = "/testEnableDisableThrottling"; + try (ZooKeeper zk = createClient()) { + zk.create(path, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // meanwhile start another thread to enable and disable recv + AtomicBoolean stopped = new AtomicBoolean(false); + Random random = new Random(); + + Thread enableDisableThread = null; + if (randomDisableEnable) { + enableDisableThread = new Thread() { + @Override + public void run() { + while (!stopped.get()) { + for (final ServerCnxn cnxn : serverFactory.cnxns) { + boolean shouldDisableEnable = random.nextBoolean(); + if (shouldDisableEnable) { + cnxn.disableRecv(); + } else { + cnxn.enableRecv(); + } + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { /* ignore */ } + } + // always enable the recv at end + for (final ServerCnxn cnxn : serverFactory.cnxns) { + cnxn.enableRecv(); + } + } + }; + } else { + enableDisableThread = new Thread() { + @Override + public void run() { + while (!stopped.get()) { + for (final ServerCnxn cnxn : serverFactory.cnxns) { + try { + cnxn.disableRecv(); + Thread.sleep(10); + cnxn.enableRecv(); + Thread.sleep(10); + } catch (InterruptedException e) { /* ignore */ } + } + } + } + }; + } + enableDisableThread.start(); + LOG.info("started thread to enable and disable recv"); + + // start a thread to keep sending requests + int totalRequestsNum = 100000; + AtomicInteger successResponse = new AtomicInteger(); + CountDownLatch responseReceivedLatch = new CountDownLatch(totalRequestsNum); + Thread clientThread = new Thread() { + @Override + public void run() { + int requestIssued = 0; + while (requestIssued++ < totalRequestsNum) { + zk.getData(path, null, new DataCallback() { + @Override + public void processResult(int rc, String path, + Object ctx, byte data[], Stat stat) { + if (rc == 0) { + successResponse.addAndGet(1); + } else { + LOG.info("failed response is {}", rc); + } + responseReceivedLatch.countDown(); + } + }, null); + } + } + }; + clientThread.start(); + LOG.info("started thread to issue {} async requests", totalRequestsNum); + + // and verify the response received is same as what we issued + Assert.assertTrue(responseReceivedLatch.await(60, TimeUnit.SECONDS)); + LOG.info("received all {} responses", totalRequestsNum); + + stopped.set(true); + enableDisableThread.join(); + LOG.info("enable and disable recv thread exited"); + + // wait another second for the left requests to finish + LOG.info("waiting another 1s for the requests to go through"); + Thread.sleep(1000); + Assert.assertEquals(successResponse.get(), totalRequestsNum); + } + } finally { + if (secure) { + SSLAuthTest.clearSecureSetting(x509Util); + } + } + } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SSLAuthTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SSLAuthTest.java index b696a42..cd360c1 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SSLAuthTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SSLAuthTest.java @@ -33,34 +33,24 @@ import org.junit.Test; public class SSLAuthTest extends ClientBase { private ClientX509Util clientX509Util; - - @Before - public void setUp() throws Exception { - clientX509Util = new ClientX509Util(); + + public static ClientX509Util setUpSecure() throws Exception{ + ClientX509Util x509Util = new ClientX509Util(); String testDataPath = System.getProperty("test.data.dir", "src/test/resources/data"); System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, "org.apache.zookeeper.server.NettyServerCnxnFactory"); System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty"); System.setProperty(ZKClientConfig.SECURE_CLIENT, "true"); - System.setProperty(clientX509Util.getSslAuthProviderProperty(), "x509"); - System.setProperty(clientX509Util.getSslKeystoreLocationProperty(), testDataPath + "/ssl/testKeyStore.jks"); - System.setProperty(clientX509Util.getSslKeystorePasswdProperty(), "testpass"); - System.setProperty(clientX509Util.getSslTruststoreLocationProperty(), testDataPath + "/ssl/testTrustStore.jks"); - System.setProperty(clientX509Util.getSslTruststorePasswdProperty(), "testpass"); + System.setProperty(x509Util.getSslAuthProviderProperty(), "x509"); + System.setProperty(x509Util.getSslKeystoreLocationProperty(), testDataPath + "/ssl/testKeyStore.jks"); + System.setProperty(x509Util.getSslKeystorePasswdProperty(), "testpass"); + System.setProperty(x509Util.getSslTruststoreLocationProperty(), testDataPath + "/ssl/testTrustStore.jks"); + System.setProperty(x509Util.getSslTruststorePasswdProperty(), "testpass"); System.setProperty("javax.net.debug", "ssl"); System.setProperty("zookeeper.authProvider.x509", "org.apache.zookeeper.server.auth.X509AuthenticationProvider"); - - String host = "localhost"; - int port = PortAssignment.unique(); - hostPort = host + ":" + port; - - serverFactory = ServerCnxnFactory.createFactory(); - serverFactory.configure(new InetSocketAddress(host, port), maxCnxns, -1, true); - - super.setUp(); + return x509Util; } - @After - public void teardown() throws Exception { + public static void clearSecureSetting(ClientX509Util clientX509Util) { System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY); System.clearProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET); System.clearProperty(ZKClientConfig.SECURE_CLIENT); @@ -73,6 +63,25 @@ public class SSLAuthTest extends ClientBase { System.clearProperty("zookeeper.authProvider.x509"); clientX509Util.close(); } + + @Before + public void setUp() throws Exception { + clientX509Util = setUpSecure(); + + String host = "localhost"; + int port = PortAssignment.unique(); + hostPort = host + ":" + port; + + serverFactory = ServerCnxnFactory.createFactory(); + serverFactory.configure(new InetSocketAddress(host, port), maxCnxns, -1, true); + + super.setUp(); + } + + @After + public void teardown() throws Exception { + clearSecureSetting(clientX509Util); + } @Test public void testRejection() throws Exception { @@ -103,4 +112,4 @@ public class SSLAuthTest extends ClientBase { Assert.assertFalse("Missing SSL configuration should not result in successful connection", watcher.clientConnected.await(1000, TimeUnit.MILLISECONDS)); } -} \ No newline at end of file +}