zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h...@apache.org
Subject zookeeper git commit: ZOOKEEPER-2786: Flaky test: org.apache.zookeeper.test.ClientTest.testNonExistingOpCode
Date Wed, 09 Aug 2017 18:31:11 GMT
Repository: zookeeper
Updated Branches:
  refs/heads/branch-3.5 d7c26e33f -> 23962f123


ZOOKEEPER-2786: Flaky test: org.apache.zookeeper.test.ClientTest.testNonExistingOpCode

This is the third time this failure has popped up. This time it seemed to only impact tests
run using NettyServerCnxnFactory (so it only impacts 3.5 and master) and I was able to get
it to pop up with sufficient frequency when running the tests locally.

The issue is caused by improper handling of netty's futures. When we call `channel.write(wrappedBuffer(sendBuffer));`
the write is completed asynchronously. The close call `channel.close();` is also asynchronous.
So we can run into the case where the close occurs before a write.

This patch changes our close call to be a callback for the completion of an empty write. This
way we are guaranteed that the channel has "drained" before a close.

My primary concern with this patch is the channel being used while it is closing (between
the write of an empty buffer and the execution of the close callback). I have added a `closingChannel`
boolean to track that, which I believe is sufficient. Let me know if anyone finds a situation
where that is not the case.

Author: Abraham Fine <afine@apache.org>

Reviewers: Michael Han <hanm@apache.org>

Closes #327 from afine/ZOOKEEPER-2786_third_times_a_charm


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/23962f12
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/23962f12
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/23962f12

Branch: refs/heads/branch-3.5
Commit: 23962f12395ada67e689b8ff57573fc1398a54eb
Parents: d7c26e3
Author: Abraham Fine <afine@apache.org>
Authored: Wed Aug 9 11:31:06 2017 -0700
Committer: Michael Han <hanm@apache.org>
Committed: Wed Aug 9 11:31:06 2017 -0700

----------------------------------------------------------------------
 .../org/apache/zookeeper/server/NettyServerCnxn.java    | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/23962f12/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
index 142e916..9ff12e9 100644
--- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -47,6 +47,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.MessageEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +63,7 @@ public class NettyServerCnxn extends ServerCnxn {
     int sessionTimeout;
     AtomicLong outstandingCount = new AtomicLong();
     Certificate[] clientChain;
+    volatile boolean closingChannel;
 
     /** The ZooKeeperServer for this connection. May be null if the server
      * is not currently serving requests (for example if the server is not
@@ -74,6 +76,7 @@ public class NettyServerCnxn extends ServerCnxn {
     
     NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory)
{
         this.channel = channel;
+        this.closingChannel = false;
         this.zkServer = zks;
         this.factory = factory;
         if (this.factory.login != null) {
@@ -83,6 +86,8 @@ public class NettyServerCnxn extends ServerCnxn {
     
     @Override
     public void close() {
+        closingChannel = true;
+        
         if (LOG.isDebugEnabled()) {
             LOG.debug("close called for sessionid:0x"
                     + Long.toHexString(sessionId));
@@ -115,7 +120,10 @@ public class NettyServerCnxn extends ServerCnxn {
         }
 
         if (channel.isOpen()) {
-            channel.close();
+            // Since we don't check on the futures created by write calls to the channel
complete we need to make sure
+            // that all writes have been completed before closing the channel or we risk
data loss
+            // See: http://lists.jboss.org/pipermail/netty-users/2009-August/001122.html
+            channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
         }
     }
 
@@ -171,7 +179,7 @@ public class NettyServerCnxn extends ServerCnxn {
     @Override
     public void sendResponse(ReplyHeader h, Record r, String tag)
             throws IOException {
-        if (!channel.isOpen()) {
+        if (closingChannel || !channel.isOpen()) {
             return;
         }
         ByteArrayOutputStream baos = new ByteArrayOutputStream();


Mime
View raw message