Repository: zookeeper
Updated Branches:
refs/heads/master 31501b8ab -> e104175bb
ZOOKEEPER-2786: Flaky test: org.apache.zookeeper.test.ClientTest.testNonExistingOpCode
This is the third time this failure has popped up. This time it seemed to only impact tests
run using NettyServerCnxnFactory (so it only impacts 3.5 and master) and I was able to get
it to pop up with sufficient frequency when running the tests locally.
The issue is caused by improper handling of netty's futures. When we call `channel.write(wrappedBuffer(sendBuffer));`
the write is completed asynchronously. The close call `channel.close();` is also asynchronous.
So we can run into the case where the close occurs before a write.
This patch changes our close call to be a callback for the completion of an empty write. This
way we are guaranteed that the channel has "drained" before a close.
My primary concern with this patch is the channel being used while it is closing (between
the write of an empty buffer and the execution of the close callback). I have added a `closingChannel`
boolean to track that, which I believe is sufficient. Let me know if anyone finds a situation
where that is not the case.
Author: Abraham Fine <afine@apache.org>
Reviewers: Michael Han <hanm@apache.org>
Closes #327 from afine/ZOOKEEPER-2786_third_times_a_charm
(cherry picked from commit 23962f12395ada67e689b8ff57573fc1398a54eb)
Signed-off-by: Michael Han <hanm@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/e104175b
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/e104175b
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/e104175b
Branch: refs/heads/master
Commit: e104175bb47baeb800354078c015e78bfcb7c953
Parents: 31501b8
Author: Abraham Fine <afine@apache.org>
Authored: Wed Aug 9 11:31:06 2017 -0700
Committer: Michael Han <hanm@apache.org>
Committed: Wed Aug 9 11:31:21 2017 -0700
----------------------------------------------------------------------
.../org/apache/zookeeper/server/NettyServerCnxn.java | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e104175b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
index 142e916..9ff12e9 100644
--- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -47,6 +47,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +63,7 @@ public class NettyServerCnxn extends ServerCnxn {
int sessionTimeout;
AtomicLong outstandingCount = new AtomicLong();
Certificate[] clientChain;
+ volatile boolean closingChannel;
/** The ZooKeeperServer for this connection. May be null if the server
* is not currently serving requests (for example if the server is not
@@ -74,6 +76,7 @@ public class NettyServerCnxn extends ServerCnxn {
NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory)
{
this.channel = channel;
+ this.closingChannel = false;
this.zkServer = zks;
this.factory = factory;
if (this.factory.login != null) {
@@ -83,6 +86,8 @@ public class NettyServerCnxn extends ServerCnxn {
@Override
public void close() {
+ closingChannel = true;
+
if (LOG.isDebugEnabled()) {
LOG.debug("close called for sessionid:0x"
+ Long.toHexString(sessionId));
@@ -115,7 +120,10 @@ public class NettyServerCnxn extends ServerCnxn {
}
if (channel.isOpen()) {
- channel.close();
+ // Since we don't check on the futures created by write calls to the channel
complete we need to make sure
+ // that all writes have been completed before closing the channel or we risk
data loss
+ // See: http://lists.jboss.org/pipermail/netty-users/2009-August/001122.html
+ channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
@@ -171,7 +179,7 @@ public class NettyServerCnxn extends ServerCnxn {
@Override
public void sendResponse(ReplyHeader h, Record r, String tag)
throws IOException {
- if (!channel.isOpen()) {
+ if (closingChannel || !channel.isOpen()) {
return;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|