zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ph...@apache.org
Subject svn commit: r1404289 - in /zookeeper/branches/branch-3.4/src/java: main/org/apache/zookeeper/ClientCnxnSocketNIO.java test/org/apache/zookeeper/test/ClientTest.java
Date Wed, 31 Oct 2012 18:42:19 GMT
Author: phunt
Date: Wed Oct 31 18:42:18 2012
New Revision: 1404289

URL: http://svn.apache.org/viewvc?rev=1404289&view=rev
Log:
ZOOKEEPER-1560 Zookeeper client hangs on creation of large nodes (Skye Wanderman-Milne via
phunt)

Modified:
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/ClientTest.java

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java?rev=1404289&r1=1404288&r2=1404289&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
(original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
Wed Oct 31 18:42:18 2012
@@ -25,16 +25,16 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
 import org.apache.zookeeper.ClientCnxn.Packet;
 import org.apache.zookeeper.ZooDefs.OpCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ClientCnxnSocketNIO extends ClientCnxnSocket {
     private static final Logger LOG = LoggerFactory
@@ -99,83 +99,85 @@ public class ClientCnxnSocketNIO extends
             }
         }
         if (sockKey.isWritable()) {
-            LinkedList<Packet> pending = new LinkedList<Packet>();
-            Packet p = null;
             synchronized(outgoingQueue) {
-                p = findSendablePacket(outgoingQueue,
+                Packet p = findSendablePacket(outgoingQueue,
                         cnxn.sendThread.clientTunneledAuthenticationInProgress());
 
                 if (p != null) {
-                    outgoingQueue.removeFirstOccurrence(p);
                     updateLastSend();
-                    if ((p.requestHeader != null) &&
-                            (p.requestHeader.getType() != OpCode.ping) &&
-                            (p.requestHeader.getType() != OpCode.auth)) {
-                        p.requestHeader.setXid(cnxn.getXid());
+                    // If we already started writing p, p.bb will already exist
+                    if (p.bb == null) {
+                        if ((p.requestHeader != null) &&
+                                (p.requestHeader.getType() != OpCode.ping) &&
+                                (p.requestHeader.getType() != OpCode.auth)) {
+                            p.requestHeader.setXid(cnxn.getXid());
+                        }
+                        p.createBB();
                     }
-                    p.createBB();
-                    ByteBuffer pbb = p.bb;
-                    sock.write(pbb);
-                    if (!pbb.hasRemaining()) {
+                    sock.write(p.bb);
+                    if (!p.bb.hasRemaining()) {
                         sentCount++;
+                        outgoingQueue.removeFirstOccurrence(p);
                         if (p.requestHeader != null
                                 && p.requestHeader.getType() != OpCode.ping
                                 && p.requestHeader.getType() != OpCode.auth) {
-                            pending.add(p);
+                            synchronized (pendingQueue) {
+                                pendingQueue.add(p);
+                            }
                         }
                     }
-                } else {
-                    // No suitable packet to send: turn off write interest flag.
+                }
+                if (outgoingQueue.isEmpty()) {
+                    // No more packets to send: turn off write interest flag.
                     // Will be turned on later by a later call to enableWrite(),
                     // from within ZooKeeperSaslClient (if client is configured
                     // to attempt SASL authentication), or in either doIO() or
                     // in doTransport() if not.
                     disableWrite();
+                } else {
+                    // Just in case
+                    enableWrite();
                 }
             }
-            synchronized(pendingQueue) {
-                pendingQueue.addAll(pending);
-            }
-
         }
     }
 
     private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
                                       boolean clientTunneledAuthenticationInProgress) {
         synchronized (outgoingQueue) {
-            if (!outgoingQueue.isEmpty()) {
-                if (clientTunneledAuthenticationInProgress) {
-                    Packet p = null;
-                    // Since client's authentication with server is in progress,
-                    // send only the null-header packet queued by primeConnection().
-                    // This packet must be sent so that the SASL authentication process
-                    // can proceed, but all other packets should wait until
-                    // SASL authentication completes.
-                    Iterator<Packet> iter = outgoingQueue.listIterator();
-                    while(iter.hasNext()) {
-                        p = iter.next();
-                        if (p.requestHeader == null) {
-                            // We've found the priming-packet.
-                            return p;
-                        } else {
-                            // Non-priming packet: defer it until later, leaving it in the
queue
-                            // until authentication completes.
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("deferring non-priming packet: " + p +
-                                        "until SASL authentication completes.");
-                            }
-                        }
-                    }
-                    // no sendable packet found.
-                    return null;
+            if (outgoingQueue.isEmpty()) {
+                return null;
+            }
+            if (outgoingQueue.getFirst().bb != null // If we've already starting sending
the first packet, we better finish
+                || !clientTunneledAuthenticationInProgress) {
+                return outgoingQueue.getFirst();
+            }
+
+            // Since client's authentication with server is in progress,
+            // send only the null-header packet queued by primeConnection().
+            // This packet must be sent so that the SASL authentication process
+            // can proceed, but all other packets should wait until
+            // SASL authentication completes.
+            ListIterator<Packet> iter = outgoingQueue.listIterator();
+            while (iter.hasNext()) {
+                Packet p = iter.next();
+                if (p.requestHeader == null) {
+                    // We've found the priming-packet. Move it to the beginning of the queue.
+                    iter.remove();
+                    outgoingQueue.add(0, p);
+                    return p;
                 } else {
-                    // Tunnelled authentication is not in progress: just
-                    // send the first packet in the queue.
-                    return outgoingQueue.getFirst();
+                    // Non-priming packet: defer it until later, leaving it in the queue
+                    // until authentication completes.
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("deferring non-priming packet: " + p +
+                                "until SASL authentication completes.");
+                    }
                 }
             }
+            // no sendable packet found.
+            return null;
         }
-        return null;
     }
 
     @Override

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/ClientTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/ClientTest.java?rev=1404289&r1=1404288&r2=1404289&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/ClientTest.java
(original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/ClientTest.java
Wed Oct 31 18:42:18 2012
@@ -523,6 +523,22 @@ public class ClientTest extends ClientBa
 
     }
 
+    @Test
+    public void testLargeNodeData() throws Exception {
+        ZooKeeper zk= null;
+        String queue_handle = "/large";
+        try {
+            zk = createClient();
+
+            zk.create(queue_handle, new byte[500000], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+
+    }
 
     private void verifyCreateFails(String path, ZooKeeper zk) throws Exception {
         try {



Mime
View raw message