Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2D83D20049D for ; Wed, 9 Aug 2017 20:31:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2C11D16890C; Wed, 9 Aug 2017 18:31:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 709F9162702 for ; Wed, 9 Aug 2017 20:31:25 +0200 (CEST) Received: (qmail 37318 invoked by uid 500); 9 Aug 2017 18:31:24 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 37307 invoked by uid 99); 9 Aug 2017 18:31:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Aug 2017 18:31:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 83BB4E10F8; Wed, 9 Aug 2017 18:31:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hanm@apache.org To: commits@zookeeper.apache.org Message-Id: <14b2c2f80cb540458346cf76261d09e9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: zookeeper git commit: ZOOKEEPER-2786: Flaky test: org.apache.zookeeper.test.ClientTest.testNonExistingOpCode Date: Wed, 9 Aug 2017 18:31:24 +0000 (UTC) archived-at: Wed, 09 Aug 2017 18:31:26 -0000 Repository: zookeeper Updated Branches: refs/heads/master 31501b8ab -> e104175bb ZOOKEEPER-2786: Flaky test: org.apache.zookeeper.test.ClientTest.testNonExistingOpCode This is the third time this failure has popped up. This time it seemed to only impact tests run using NettyServerCnxnFactory (so it only impacts 3.5 and master) and I was able to get it to pop up with sufficient frequency when running the tests locally. The issue is caused by improper handling of netty's futures. When we call `channel.write(wrappedBuffer(sendBuffer));` the write is completed asynchronously. The close call `channel.close();` is also asynchronous. So we can run into the case where the close occurs before a write. This patch changes our close call to be a callback for the completion of an empty write. This way we are guaranteed that the channel has "drained" before a close. My primary concern with this patch is the channel being used while it is closing (between the write of an empty buffer and the execution of the close callback). I have added a `closingChannel` boolean to track that, which I believe is sufficient. Let me know if anyone finds a situation where that is not the case. Author: Abraham Fine Reviewers: Michael Han Closes #327 from afine/ZOOKEEPER-2786_third_times_a_charm (cherry picked from commit 23962f12395ada67e689b8ff57573fc1398a54eb) Signed-off-by: Michael Han Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/e104175b Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/e104175b Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/e104175b Branch: refs/heads/master Commit: e104175bb47baeb800354078c015e78bfcb7c953 Parents: 31501b8 Author: Abraham Fine Authored: Wed Aug 9 11:31:06 2017 -0700 Committer: Michael Han Committed: Wed Aug 9 11:31:21 2017 -0700 ---------------------------------------------------------------------- .../org/apache/zookeeper/server/NettyServerCnxn.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e104175b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java index 142e916..9ff12e9 100644 --- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java @@ -47,6 +47,7 @@ import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.MessageEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +63,7 @@ public class NettyServerCnxn extends ServerCnxn { int sessionTimeout; AtomicLong outstandingCount = new AtomicLong(); Certificate[] clientChain; + volatile boolean closingChannel; /** The ZooKeeperServer for this connection. May be null if the server * is not currently serving requests (for example if the server is not @@ -74,6 +76,7 @@ public class NettyServerCnxn extends ServerCnxn { NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) { this.channel = channel; + this.closingChannel = false; this.zkServer = zks; this.factory = factory; if (this.factory.login != null) { @@ -83,6 +86,8 @@ public class NettyServerCnxn extends ServerCnxn { @Override public void close() { + closingChannel = true; + if (LOG.isDebugEnabled()) { LOG.debug("close called for sessionid:0x" + Long.toHexString(sessionId)); @@ -115,7 +120,10 @@ public class NettyServerCnxn extends ServerCnxn { } if (channel.isOpen()) { - channel.close(); + // Since we don't check on the futures created by write calls to the channel complete we need to make sure + // that all writes have been completed before closing the channel or we risk data loss + // See: http://lists.jboss.org/pipermail/netty-users/2009-August/001122.html + channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } } @@ -171,7 +179,7 @@ public class NettyServerCnxn extends ServerCnxn { @Override public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { - if (!channel.isOpen()) { + if (closingChannel || !channel.isOpen()) { return; } ByteArrayOutputStream baos = new ByteArrayOutputStream();