Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 871781885C for ; Sat, 13 Jun 2015 00:44:05 +0000 (UTC) Received: (qmail 45261 invoked by uid 500); 13 Jun 2015 00:44:00 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 45229 invoked by uid 500); 13 Jun 2015 00:44:00 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 45205 invoked by uid 99); 13 Jun 2015 00:44:00 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 13 Jun 2015 00:44:00 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id 503F8AC06A3 for ; Sat, 13 Jun 2015 00:44:00 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1685203 - in /zookeeper/branches/branch-3.5: CHANGES.txt src/java/main/org/apache/zookeeper/ClientCnxn.java src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java Date: Sat, 13 Jun 2015 00:44:00 -0000 To: commits@zookeeper.apache.org From: rgs@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150613004400.503F8AC06A3@hades.apache.org> Author: rgs Date: Sat Jun 13 00:43:59 2015 New Revision: 1685203 URL: http://svn.apache.org/r1685203 Log: ZOOKEEPER-706: Large numbers of watches can cause session re-establishment to fail (Chris Thunes via rgs) Modified: zookeeper/branches/branch-3.5/CHANGES.txt zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/ClientCnxn.java zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java Modified: zookeeper/branches/branch-3.5/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/CHANGES.txt?rev=1685203&r1=1685202&r2=1685203&view=diff ============================================================================== --- zookeeper/branches/branch-3.5/CHANGES.txt (original) +++ zookeeper/branches/branch-3.5/CHANGES.txt Sat Jun 13 00:43:59 2015 @@ -125,6 +125,9 @@ BUGFIXES: ZOOKEEPER-2213: Empty path in Set crashes server and prevents restart (Hongchao Deng via rgs) + ZOOKEEPER-706: Large numbers of watches can cause session re-establishment to fail + (Chris Thunes via rgs) + IMPROVEMENTS: ZOOKEEPER-1660 Documentation for Dynamic Reconfiguration (Reed Wanderman-Milne via shralex) Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/ClientCnxn.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=1685203&r1=1685202&r2=1685203&view=diff ============================================================================== --- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/ClientCnxn.java (original) +++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/ClientCnxn.java Sat Jun 13 00:43:59 2015 @@ -28,6 +28,7 @@ import java.net.Socket; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.HashSet; +import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -99,6 +100,16 @@ public class ClientCnxn { private static final String ZK_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username"; + /* ZOOKEEPER-706: If a session has a large number of watches set then + * attempting to re-establish those watches after a connection loss may + * fail due to the SetWatches request exceeding the server's configured + * jute.maxBuffer value. To avoid this we instead split the watch + * re-establishement across multiple SetWatches calls. This constant + * controls the size of each call. It is set to 128kB to be conservative + * with respect to the server's 1MB default for jute.maxBuffer. + */ + private static final int SET_WATCHES_MAX_LENGTH = 128 * 1024; + /** This controls whether automatic watch resetting is enabled. * Clients automatically reset watches during session reconnect, this * option allows the client to turn off this behavior by setting @@ -983,15 +994,45 @@ public class ClientCnxn { List childWatches = zooKeeper.getChildWatches(); if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()) { - SetWatches sw = new SetWatches(lastZxid, - prependChroot(dataWatches), - prependChroot(existWatches), - prependChroot(childWatches)); - RequestHeader h = new RequestHeader(); - h.setType(ZooDefs.OpCode.setWatches); - h.setXid(-8); - Packet packet = new Packet(h, new ReplyHeader(), sw, null, null); - outgoingQueue.addFirst(packet); + Iterator dataWatchesIter = prependChroot(dataWatches).iterator(); + Iterator existWatchesIter = prependChroot(existWatches).iterator(); + Iterator childWatchesIter = prependChroot(childWatches).iterator(); + long setWatchesLastZxid = lastZxid; + + while (dataWatchesIter.hasNext() + || existWatchesIter.hasNext() || childWatchesIter.hasNext()) { + List dataWatchesBatch = new ArrayList(); + List existWatchesBatch = new ArrayList(); + List childWatchesBatch = new ArrayList(); + int batchLength = 0; + + // Note, we may exceed our max length by a bit when we add the last + // watch in the batch. This isn't ideal, but it makes the code simpler. + while (batchLength < SET_WATCHES_MAX_LENGTH) { + final String watch; + if (dataWatchesIter.hasNext()) { + watch = dataWatchesIter.next(); + dataWatchesBatch.add(watch); + } else if (existWatchesIter.hasNext()) { + watch = existWatchesIter.next(); + existWatchesBatch.add(watch); + } else if (childWatchesIter.hasNext()) { + watch = childWatchesIter.next(); + childWatchesBatch.add(watch); + } else { + break; + } + batchLength += watch.length(); + } + + SetWatches sw = new SetWatches(setWatchesLastZxid, + dataWatchesBatch, + existWatchesBatch, + childWatchesBatch); + RequestHeader header = new RequestHeader(-8, OpCode.setWatches); + Packet packet = new Packet(header, new ReplyHeader(), sw, null, null); + outgoingQueue.addFirst(packet); + } } } Modified: zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java?rev=1685203&r1=1685202&r2=1685203&view=diff ============================================================================== --- zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java (original) +++ zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java Sat Jun 13 00:43:59 2015 @@ -18,6 +18,8 @@ package org.apache.zookeeper.test; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -173,4 +175,80 @@ public class DisconnectedWatcherTest ext Assert.assertEquals(EventType.NodeChildrenChanged, e.getType()); Assert.assertEquals("/are", e.getPath()); } + + // @see jira issue ZOOKEEPER-706. Test auto reset of a large number of + // watches which require multiple SetWatches calls. + @Test + public void testManyChildWatchersAutoReset() throws Exception { + ZooKeeper zk1 = createClient(); + + MyWatcher watcher = new MyWatcher(); + ZooKeeper zk2 = createClient(watcher); + + // 110 character base path + String pathBase = "/long-path-000000000-111111111-222222222-333333333-444444444-" + + "555555555-666666666-777777777-888888888-999999999"; + + zk1.create(pathBase, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // Create 10,000 nodes. This should ensure the length of our + // watches set below exceeds 1MB. + List paths = new ArrayList(); + for (int i = 0; i < 10000; i++) { + String path = zk1.create(pathBase + "/ch-", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL); + paths.add(path); + } + + MyWatcher childWatcher = new MyWatcher(); + + // Set a combination of child/exists/data watches + int i = 0; + for (String path : paths) { + if (i % 3 == 0) { + zk2.getChildren(path, childWatcher); + } else if (i % 3 == 1) { + zk2.exists(path + "/foo", childWatcher); + } else if (i % 3 == 2) { + zk2.getData(path, childWatcher, null); + } + + i++; + } + + stopServer(); + watcher.waitForDisconnected(30000); + startServer(); + watcher.waitForConnected(30000); + + // Trigger the watches and ensure they properly propagate to the client + i = 0; + for (String path : paths) { + if (i % 3 == 0) { + zk1.create(path + "/ch", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertNotNull(e); + Assert.assertEquals(EventType.NodeChildrenChanged, e.getType()); + Assert.assertEquals(path, e.getPath()); + } else if (i % 3 == 1) { + zk1.create(path + "/foo", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertNotNull(e); + Assert.assertEquals(EventType.NodeCreated, e.getType()); + Assert.assertEquals(path + "/foo", e.getPath()); + } else if (i % 3 == 2) { + zk1.setData(path, new byte[]{1, 2, 3}, -1); + + WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertNotNull(e); + Assert.assertEquals(EventType.NodeDataChanged, e.getType()); + Assert.assertEquals(path, e.getPath()); + } + + i++; + } + } + }