zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject svn commit: r1685203 - in /zookeeper/branches/branch-3.5: CHANGES.txt src/java/main/org/apache/zookeeper/ClientCnxn.java src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java
Date Sat, 13 Jun 2015 00:44:00 GMT
Author: rgs
Date: Sat Jun 13 00:43:59 2015
New Revision: 1685203

URL: http://svn.apache.org/r1685203
Log:
ZOOKEEPER-706: Large numbers of watches can cause session re-establishment to fail
(Chris Thunes via rgs)

Modified:
    zookeeper/branches/branch-3.5/CHANGES.txt
    zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/ClientCnxn.java
    zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java

Modified: zookeeper/branches/branch-3.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/CHANGES.txt?rev=1685203&r1=1685202&r2=1685203&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.5/CHANGES.txt Sat Jun 13 00:43:59 2015
@@ -125,6 +125,9 @@ BUGFIXES:
   ZOOKEEPER-2213: Empty path in Set crashes server and prevents restart
   (Hongchao Deng via rgs)
 
+  ZOOKEEPER-706: Large numbers of watches can cause session re-establishment to fail
+  (Chris Thunes via rgs)
+
 IMPROVEMENTS:
   ZOOKEEPER-1660 Documentation for Dynamic Reconfiguration (Reed Wanderman-Milne via shralex)
 

Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=1685203&r1=1685202&r2=1685203&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/ClientCnxn.java Sat Jun
13 00:43:59 2015
@@ -28,6 +28,7 @@ import java.net.Socket;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -99,6 +100,16 @@ public class ClientCnxn {
     private static final String ZK_SASL_CLIENT_USERNAME =
         "zookeeper.sasl.client.username";
 
+    /* ZOOKEEPER-706: If a session has a large number of watches set then
+     * attempting to re-establish those watches after a connection loss may
+     * fail due to the SetWatches request exceeding the server's configured
+     * jute.maxBuffer value. To avoid this we instead split the watch
+     * re-establishement across multiple SetWatches calls. This constant
+     * controls the size of each call. It is set to 128kB to be conservative
+     * with respect to the server's 1MB default for jute.maxBuffer.
+     */
+    private static final int SET_WATCHES_MAX_LENGTH = 128 * 1024;
+
     /** This controls whether automatic watch resetting is enabled.
      * Clients automatically reset watches during session reconnect, this
      * option allows the client to turn off this behavior by setting
@@ -983,15 +994,45 @@ public class ClientCnxn {
                 List<String> childWatches = zooKeeper.getChildWatches();
                 if (!dataWatches.isEmpty()
                         || !existWatches.isEmpty() || !childWatches.isEmpty()) {
-                    SetWatches sw = new SetWatches(lastZxid,
-                            prependChroot(dataWatches),
-                            prependChroot(existWatches),
-                            prependChroot(childWatches));
-                    RequestHeader h = new RequestHeader();
-                    h.setType(ZooDefs.OpCode.setWatches);
-                    h.setXid(-8);
-                    Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
-                    outgoingQueue.addFirst(packet);
+                    Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
+                    Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
+                    Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
+                    long setWatchesLastZxid = lastZxid;
+
+                    while (dataWatchesIter.hasNext()
+                           || existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
+                        List<String> dataWatchesBatch = new ArrayList<String>();
+                        List<String> existWatchesBatch = new ArrayList<String>();
+                        List<String> childWatchesBatch = new ArrayList<String>();
+                        int batchLength = 0;
+
+                        // Note, we may exceed our max length by a bit when we add the last
+                        // watch in the batch. This isn't ideal, but it makes the code simpler.
+                        while (batchLength < SET_WATCHES_MAX_LENGTH) {
+                            final String watch;
+                            if (dataWatchesIter.hasNext()) {
+                                watch = dataWatchesIter.next();
+                                dataWatchesBatch.add(watch);
+                            } else if (existWatchesIter.hasNext()) {
+                                watch = existWatchesIter.next();
+                                existWatchesBatch.add(watch);
+                            } else if (childWatchesIter.hasNext()) {
+                                watch = childWatchesIter.next();
+                                childWatchesBatch.add(watch);
+                            } else {
+                                break;
+                            }
+                            batchLength += watch.length();
+                        }
+
+                        SetWatches sw = new SetWatches(setWatchesLastZxid,
+                                                       dataWatchesBatch,
+                                                       existWatchesBatch,
+                                                       childWatchesBatch);
+                        RequestHeader header = new RequestHeader(-8, OpCode.setWatches);
+                        Packet packet = new Packet(header, new ReplyHeader(), sw, null, null);
+                        outgoingQueue.addFirst(packet);
+                    }
                 }
             }
 

Modified: zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java?rev=1685203&r1=1685202&r2=1685203&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java
(original)
+++ zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java
Sat Jun 13 00:43:59 2015
@@ -18,6 +18,8 @@
 
 package org.apache.zookeeper.test;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -173,4 +175,80 @@ public class DisconnectedWatcherTest ext
         Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
         Assert.assertEquals("/are", e.getPath());
     }
+
+    // @see jira issue ZOOKEEPER-706. Test auto reset of a large number of
+    // watches which require multiple SetWatches calls.
+    @Test
+    public void testManyChildWatchersAutoReset() throws Exception {
+        ZooKeeper zk1 = createClient();
+
+        MyWatcher watcher = new MyWatcher();
+        ZooKeeper zk2 = createClient(watcher);
+
+        // 110 character base path
+        String pathBase = "/long-path-000000000-111111111-222222222-333333333-444444444-"
+                          + "555555555-666666666-777777777-888888888-999999999";
+
+        zk1.create(pathBase, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        // Create 10,000 nodes. This should ensure the length of our
+        // watches set below exceeds 1MB.
+        List<String> paths = new ArrayList<String>();
+        for (int i = 0; i < 10000; i++) {
+            String path = zk1.create(pathBase + "/ch-", null, Ids.OPEN_ACL_UNSAFE,
+                                     CreateMode.PERSISTENT_SEQUENTIAL);
+            paths.add(path);
+        }
+
+        MyWatcher childWatcher = new MyWatcher();
+
+        // Set a combination of child/exists/data watches
+        int i = 0;
+        for (String path : paths) {
+            if (i % 3 == 0) {
+                zk2.getChildren(path, childWatcher);
+            } else if (i % 3 == 1) {
+                zk2.exists(path + "/foo", childWatcher);
+            } else if (i % 3 == 2) {
+                zk2.getData(path, childWatcher, null);
+            }
+
+            i++;
+        }
+
+        stopServer();
+        watcher.waitForDisconnected(30000);
+        startServer();
+        watcher.waitForConnected(30000);
+
+        // Trigger the watches and ensure they properly propagate to the client
+        i = 0;
+        for (String path : paths) {
+            if (i % 3 == 0) {
+                zk1.create(path + "/ch", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+                WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+                Assert.assertNotNull(e);
+                Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+                Assert.assertEquals(path, e.getPath());
+            } else if (i % 3 == 1) {
+                zk1.create(path + "/foo", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+                WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+                Assert.assertNotNull(e);
+                Assert.assertEquals(EventType.NodeCreated, e.getType());
+                Assert.assertEquals(path + "/foo", e.getPath());
+            } else if (i % 3 == 2) {
+                zk1.setData(path, new byte[]{1, 2, 3}, -1);
+
+                WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+                Assert.assertNotNull(e);
+                Assert.assertEquals(EventType.NodeDataChanged, e.getType());
+                Assert.assertEquals(path, e.getPath());
+            }
+
+            i++;
+        }
+    }
+
 }



Mime
View raw message