Repository: zookeeper
Updated Branches:
refs/heads/master 4ebb847bc -> fdde8b006
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java b/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java
new file mode 100644
index 0000000..83b186f
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.util.BitHashSet;
+
+public class WatcherOrBitSet {
+
+ private Set<Watcher> watchers;
+ private BitHashSet watcherBits;
+
+ public WatcherOrBitSet(final Set<Watcher> watchers) {
+ this.watchers = watchers;
+ }
+
+ public WatcherOrBitSet(final BitHashSet watcherBits) {
+ this.watcherBits = watcherBits;
+ }
+
+ public boolean contains(Watcher watcher) {
+ if (watchers == null) {
+ return false;
+ }
+ return watchers.contains(watcher);
+ }
+
+ public boolean contains(int watcherBit) {
+ if (watcherBits == null) {
+ return false;
+ }
+ return watcherBits.contains(watcherBit);
+ }
+
+ public int size() {
+ if (watchers != null) {
+ return watchers.size();
+ }
+ if (watcherBits != null) {
+ return watcherBits.size();
+ }
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java b/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java
new file mode 100644
index 0000000..38f02de
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A watch report, essentially a mapping of path to session IDs of sessions that
+ * have set a watch on that path. This class is immutable.
+ */
+public class WatchesPathReport {
+
+ private final Map<String, Set<Long>> path2Ids;
+
+ /**
+ * Creates a new report.
+ *
+ * @param path2Ids map of paths to session IDs of sessions that have set a
+ * watch on that path
+ */
+ WatchesPathReport(Map<String, Set<Long>> path2Ids) {
+ this.path2Ids = Collections.unmodifiableMap(deepCopy(path2Ids));
+ }
+
+ private static Map<String, Set<Long>> deepCopy(Map<String, Set<Long>> m) {
+ Map<String, Set<Long>> m2 = new HashMap<String, Set<Long>>();
+ for (Map.Entry<String, Set<Long>> e : m.entrySet()) {
+ m2.put(e.getKey(), new HashSet<Long>(e.getValue()));
+ }
+ return m2;
+ }
+
+ /**
+ * Checks if the given path has watches set.
+ *
+ * @param path path
+ * @return true if path has watch set
+ */
+ public boolean hasSessions(String path) {
+ return path2Ids.containsKey(path);
+ }
+ /**
+ * Gets the session IDs of sessions that have set watches on the given path.
+ * The returned set is immutable.
+ *
+ * @param path session ID
+ * @return session IDs of sessions that have set watches on the path, or
+ * null if none
+ */
+ public Set<Long> getSessions(String path) {
+ Set<Long> s = path2Ids.get(path);
+ return s != null ? Collections.unmodifiableSet(s) : null;
+ }
+
+ /**
+ * Converts this report to a map. The returned map is mutable, and changes
+ * to it do not reflect back into this report.
+ *
+ * @return map representation of report
+ */
+ public Map<String, Set<Long>> toMap() {
+ return deepCopy(path2Ids);
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java b/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java
new file mode 100644
index 0000000..ac888d3
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A watch report, essentially a mapping of session ID to paths that the session
+ * has set a watch on. This class is immutable.
+ */
+public class WatchesReport {
+
+ private final Map<Long, Set<String>> id2paths;
+
+ /**
+ * Creates a new report.
+ *
+ * @param id2paths map of session IDs to paths that each session has set
+ * a watch on
+ */
+ WatchesReport(Map<Long, Set<String>> id2paths) {
+ this.id2paths = Collections.unmodifiableMap(deepCopy(id2paths));
+ }
+
+ private static Map<Long, Set<String>> deepCopy(Map<Long, Set<String>> m) {
+ Map<Long, Set<String>> m2 = new HashMap<Long, Set<String>>();
+ for (Map.Entry<Long, Set<String>> e : m.entrySet()) {
+ m2.put(e.getKey(), new HashSet<String>(e.getValue()));
+ }
+ return m2;
+ }
+
+ /**
+ * Checks if the given session has watches set.
+ *
+ * @param sessionId session ID
+ * @return true if session has paths with watches set
+ */
+ public boolean hasPaths(long sessionId) {
+ return id2paths.containsKey(sessionId);
+ }
+
+ /**
+ * Gets the paths that the given session has set watches on. The returned
+ * set is immutable.
+ *
+ * @param sessionId session ID
+ * @return paths that have watches set by the session, or null if none
+ */
+ public Set<String> getPaths(long sessionId) {
+ Set<String> s = id2paths.get(sessionId);
+ return s != null ? Collections.unmodifiableSet(s) : null;
+ }
+
+ /**
+ * Converts this report to a map. The returned map is mutable, and changes
+ * to it do not reflect back into this report.
+ *
+ * @return map representation of report
+ */
+ public Map<Long, Set<String>> toMap() {
+ return deepCopy(id2paths);
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java b/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java
new file mode 100644
index 0000000..b2449ba
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A summary of watch information. This class is immutable.
+ */
+public class WatchesSummary {
+
+ /**
+ * The key in the map returned by {@link #toMap()} for the number of
+ * connections.
+ */
+ public static final String KEY_NUM_CONNECTIONS = "num_connections";
+ /**
+ * The key in the map returned by {@link #toMap()} for the number of paths.
+ */
+ public static final String KEY_NUM_PATHS = "num_paths";
+ /**
+ * The key in the map returned by {@link #toMap()} for the total number of
+ * watches.
+ */
+ public static final String KEY_NUM_TOTAL_WATCHES = "num_total_watches";
+
+ private final int numConnections;
+ private final int numPaths;
+ private final int totalWatches;
+
+ /**
+ * Creates a new summary.
+ *
+ * @param numConnections the number of sessions that have set watches
+ * @param numPaths the number of paths that have watches set on them
+ * @param totalWatches the total number of watches set
+ */
+ WatchesSummary(int numConnections, int numPaths, int totalWatches) {
+ this.numConnections = numConnections;
+ this.numPaths = numPaths;
+ this.totalWatches = totalWatches;
+ }
+
+ /**
+ * Gets the number of connections (sessions) that have set watches.
+ *
+ * @return number of connections
+ */
+ public int getNumConnections() {
+ return numConnections;
+ }
+ /**
+ * Gets the number of paths that have watches set on them.
+ *
+ * @return number of paths
+ */
+ public int getNumPaths() {
+ return numPaths;
+ }
+ /**
+ * Gets the total number of watches set.
+ *
+ * @return total watches
+ */
+ public int getTotalWatches() {
+ return totalWatches;
+ }
+
+ /**
+ * Converts this summary to a map. The returned map is mutable, and changes
+ * to it do not reflect back into this summary.
+ *
+ * @return map representation of summary
+ */
+ public Map<String, Object> toMap() {
+ Map<String, Object> summary = new LinkedHashMap<String, Object>();
+ summary.put(KEY_NUM_CONNECTIONS, numConnections);
+ summary.put(KEY_NUM_PATHS, numPaths);
+ summary.put(KEY_NUM_TOTAL_WATCHES, totalWatches);
+ return summary;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/config/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/src/java/test/config/findbugsExcludeFile.xml b/src/java/test/config/findbugsExcludeFile.xml
index 4ab5a5e..a3f45a6 100644
--- a/src/java/test/config/findbugsExcludeFile.xml
+++ b/src/java/test/config/findbugsExcludeFile.xml
@@ -53,7 +53,13 @@
<Method name="run" />
<Bug pattern="DM_EXIT" />
</Match>
-
+
+ <!-- Failed to create watch manager is a unrecoverable error -->
+ <Match>
+ <Class name="org.apache.zookeeper.server.DataTree" />
+ <Bug pattern="DM_EXIT" />
+ </Match>
+
<Match>
<Package name="org.apache.jute.compiler.generated" />
</Match>
@@ -85,7 +91,7 @@
<Match>
<Class name="org.apache.zookeeper.server.DataNode"/>
- <Field name="children"/>
+ <Field name="children"/>
<Bug code="IS"/>
</Match>
<Match>
@@ -98,6 +104,15 @@
<Field name="serverStats"/>
<Bug code="IS"/>
</Match>
+
+ <!-- The iterate function is non-thread safe, the caller will synchronize
+ on the BitHHashSet object -->
+ <Match>
+ <Class name="org.apache.zookeeper.server.util.BitHashSet" />
+ <Field name="elementCount" />
+ <Bug code="IS" />
+ </Match>
+
<Match>
<Class name="org.apache.zookeeper.server.quorum.LearnerSessionTracker"/>
<Bug code="UrF"/>
@@ -111,7 +126,7 @@
<!-- these are old classes just for upgrading and should go away -->
<Match>
<Class name="org.apache.zookeeper.server.upgrade.DataNodeV1"/>
- </Match>
+ </Match>
<Match>
<Class name="org.apache.zookeeper.server.upgrade.DataTreeV1"/>
@@ -134,6 +149,23 @@
</Or>
</Match>
+ <!-- Synchronize on the AtomicInteger to do wait/notify, but not relying
+ on the synchronization to control the AtomicInteger value update,
+ so it's not a problem -->
+ <Match>
+ <Class name="org.apache.zookeeper.server.watch.WatcherCleaner" />
+ <Bug code="JLM" />
+ <Method name="addDeadWatcher" />
+ </Match>
+
+ <Match>
+ <Class name="org.apache.zookeeper.server.watch.WatcherCleaner$1" />
+ <Bug code="JLM" />
+ <Method name="doWork" />
+ </Match>
+
+
+
<Match>
<Class name="org.apache.zookeeper.server.quorum.QuorumPeer"/>
<Bug pattern="OS_OPEN_STREAM" />
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java b/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java
deleted file mode 100644
index c0b107d..0000000
--- a/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zookeeper.server;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.zookeeper.ZKTestCase;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class WatchesPathReportTest extends ZKTestCase {
- private Map<String, Set<Long>> m;
- private WatchesPathReport r;
- @Before public void setUp() {
- m = new HashMap<String, Set<Long>>();
- Set<Long> s = new HashSet<Long>();
- s.add(101L);
- s.add(102L);
- m.put("path1", s);
- s = new HashSet<Long>();
- s.add(201L);
- m.put("path2", s);
- r = new WatchesPathReport(m);
- }
- @Test public void testHasSessions() {
- assertTrue(r.hasSessions("path1"));
- assertTrue(r.hasSessions("path2"));
- assertFalse(r.hasSessions("path3"));
- }
- @Test public void testGetSessions() {
- Set<Long> s = r.getSessions("path1");
- assertEquals(2, s.size());
- assertTrue(s.contains(101L));
- assertTrue(s.contains(102L));
- s = r.getSessions("path2");
- assertEquals(1, s.size());
- assertTrue(s.contains(201L));
- assertNull(r.getSessions("path3"));
- }
- @Test public void testToMap() {
- assertEquals(m, r.toMap());
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java b/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java
deleted file mode 100644
index 7f0343b..0000000
--- a/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zookeeper.server;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.zookeeper.ZKTestCase;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class WatchesReportTest extends ZKTestCase {
- private Map<Long, Set<String>> m;
- private WatchesReport r;
- @Before public void setUp() {
- m = new HashMap<Long, Set<String>>();
- Set<String> s = new HashSet<String>();
- s.add("path1a");
- s.add("path1b");
- m.put(1L, s);
- s = new HashSet<String>();
- s.add("path2a");
- m.put(2L, s);
- r = new WatchesReport(m);
- }
- @Test public void testHasPaths() {
- assertTrue(r.hasPaths(1L));
- assertTrue(r.hasPaths(2L));
- assertFalse(r.hasPaths(3L));
- }
- @Test public void testGetPaths() {
- Set<String> s = r.getPaths(1L);
- assertEquals(2, s.size());
- assertTrue(s.contains("path1a"));
- assertTrue(s.contains("path1b"));
- s = r.getPaths(2L);
- assertEquals(1, s.size());
- assertTrue(s.contains("path2a"));
- assertNull(r.getPaths(3L));
- }
- @Test public void testToMap() {
- assertEquals(m, r.toMap());
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java b/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java
deleted file mode 100644
index d679065..0000000
--- a/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zookeeper.server;
-
-import java.util.Map;
-import org.apache.zookeeper.ZKTestCase;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class WatchesSummaryTest extends ZKTestCase {
- private WatchesSummary s;
- @Before public void setUp() {
- s = new WatchesSummary(1, 2, 3);
- }
- @Test public void testGetters() {
- assertEquals(1, s.getNumConnections());
- assertEquals(2, s.getNumPaths());
- assertEquals(3, s.getTotalWatches());
- }
- @Test public void testToMap() {
- Map<String, Object> m = s.toMap();
- assertEquals(3, m.size());
- assertEquals(Integer.valueOf(1), m.get(WatchesSummary.KEY_NUM_CONNECTIONS));
- assertEquals(Integer.valueOf(2), m.get(WatchesSummary.KEY_NUM_PATHS));
- assertEquals(Integer.valueOf(3), m.get(WatchesSummary.KEY_NUM_TOTAL_WATCHES));
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java b/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java
new file mode 100644
index 0000000..a70eaa5
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.util;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class BitHashSetTest extends ZKTestCase {
+
+ @Test
+ public void testAddWatchBit() {
+ int watcherCacheSize = 1;
+ BitHashSet ws = new BitHashSet(watcherCacheSize);
+ Assert.assertTrue(ws.add(1));
+ Assert.assertEquals(1, ws.size());
+ Assert.assertEquals(1, ws.cachedSize());
+
+ List<Integer> actualBits = new ArrayList<Integer>();
+
+ for (int bit: ws) {
+ actualBits.add(bit);
+ }
+ Assert.assertArrayEquals(
+ new Integer[] {1},
+ actualBits.toArray(new Integer[actualBits.size()]));
+
+ // add the same bit again
+ Assert.assertFalse(ws.add(1));
+ Assert.assertEquals(1, ws.size());
+ Assert.assertEquals(1, ws.cachedSize());
+
+ // add another bit, make sure there there is only 1 bit cached
+ Assert.assertTrue(ws.add(2));
+ Assert.assertEquals(2, ws.size());
+ Assert.assertEquals(1, ws.cachedSize());
+
+ Assert.assertTrue(ws.contains(1));
+
+ actualBits.clear();
+ for (int bit: ws) {
+ actualBits.add(bit);
+ }
+ Assert.assertArrayEquals(
+ new Integer[] {1, 2},
+ actualBits.toArray(new Integer[actualBits.size()]));
+ }
+
+ @Test
+ public void testRemoveWatchBit() {
+ int watcherCacheSize = 1;
+ BitHashSet ws = new BitHashSet(watcherCacheSize);
+ ws.add(1);
+ ws.add(2);
+
+ Assert.assertTrue(ws.contains(1));
+ Assert.assertTrue(ws.contains(2));
+
+ ws.remove(1);
+ Assert.assertFalse(ws.contains(1));
+ Assert.assertEquals(1, ws.size());
+ Assert.assertEquals(0, ws.cachedSize());
+
+ List<Integer> actualBits = new ArrayList<Integer>();
+
+ for (int bit: ws) {
+ actualBits.add(bit);
+ }
+ Assert.assertArrayEquals(
+ new Integer[] {2},
+ actualBits.toArray(new Integer[actualBits.size()]));
+
+ ws.add(3);
+ Assert.assertEquals(2, ws.size());
+ Assert.assertEquals(1, ws.cachedSize());
+
+ actualBits.clear();
+ for (int bit: ws) {
+ actualBits.add(bit);
+ }
+ Assert.assertArrayEquals(
+ new Integer[] {2, 3},
+ actualBits.toArray(new Integer[actualBits.size()]));
+
+ ws.remove(2);
+ ws.remove(3);
+
+ Assert.assertEquals(0, ws.size());
+ Assert.assertEquals(0, ws.cachedSize());
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java b/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java
new file mode 100644
index 0000000..eca0f2d
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.util;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class BitMapTest extends ZKTestCase {
+
+ @Test
+ public void testAddAndRemove() {
+ BitMap<String> bitMap = new BitMap<String>();
+ String v1 = new String("v1");
+ Integer bit = bitMap.add(v1);
+
+ Assert.assertEquals(1, bitMap.size());
+ Assert.assertTrue(bit >= 0);
+ Assert.assertEquals(v1, bitMap.get(bit));
+ Assert.assertEquals(bit, bitMap.getBit(v1));
+
+ // add the same value again
+ Integer newBit = bitMap.add(v1);
+ Assert.assertEquals(bit, newBit);
+ Assert.assertEquals(1, bitMap.size());
+
+ String v2 = new String("v2");
+ Integer v2Bit = bitMap.add(v2);
+ Assert.assertEquals(2, bitMap.size());
+ Assert.assertNotEquals(v2Bit, bit);
+
+ // remove by value
+ bitMap.remove(v1);
+ Assert.assertEquals(1, bitMap.size());
+ Assert.assertNull(bitMap.get(bit));
+ Assert.assertNull(bitMap.getBit(v1));
+
+ // remove by bit
+ bitMap.remove(v2Bit);
+ Assert.assertEquals(0, bitMap.size());
+ Assert.assertNull(bitMap.get(v2Bit));
+ Assert.assertNull(bitMap.getBit(v2));
+ }
+
+ @Test
+ public void testBitReuse() {
+ BitMap<String> bitMap = new BitMap<String>();
+ int v1Bit = bitMap.add("v1");
+ int v2Bit = bitMap.add("v2");
+ int v3Bit = bitMap.add("v3");
+ bitMap.remove(v2Bit);
+
+ int v4Bit = bitMap.add("v4");
+
+ Assert.assertEquals(v4Bit, v2Bit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java
new file mode 100644
index 0000000..f6a229b
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.server.DumbWatcher;
+import org.apache.zookeeper.server.ServerCnxn;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class WatchManagerTest extends ZKTestCase {
+ protected static final Logger LOG = LoggerFactory.getLogger(WatchManagerTest.class);
+
+ private static final String PATH_PREFIX = "path";
+
+ private ConcurrentHashMap<Integer, DumbWatcher> watchers;
+ private Random r;
+ private String className;
+
+ public WatchManagerTest(String className) {
+ this.className = className;
+ }
+
+ @Parameterized.Parameters
+ public static List<Object []> data() {
+ return Arrays.asList(new Object [][] {
+ {WatchManager.class.getName()},
+ {WatchManagerOptimized.class.getName()}
+ });
+ }
+
+ @Before
+ public void setUp() {
+ watchers = new ConcurrentHashMap<Integer, DumbWatcher>();
+ r = new Random(System.nanoTime());
+ }
+
+ public IWatchManager getWatchManager() throws IOException {
+ System.setProperty(WatchManagerFactory.ZOOKEEPER_WATCH_MANAGER_NAME, className);
+ return WatchManagerFactory.createWatchManager();
+ }
+
+ public DumbWatcher createOrGetWatcher(int watcherId) {
+ if (!watchers.containsKey(watcherId)) {
+ DumbWatcher watcher = new DumbWatcher(watcherId);
+ watchers.putIfAbsent(watcherId, watcher);
+ }
+ return watchers.get(watcherId);
+ }
+
+ public class AddWatcherWorker extends Thread {
+
+ private final IWatchManager manager;
+ private final int paths;
+ private final int watchers;
+ private final AtomicInteger watchesAdded;
+ private volatile boolean stopped = false;
+
+ public AddWatcherWorker(IWatchManager manager,
+ int paths, int watchers, AtomicInteger watchesAdded) {
+ this.manager = manager;
+ this.paths = paths;
+ this.watchers = watchers;
+ this.watchesAdded = watchesAdded;
+ }
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ String path = PATH_PREFIX + r.nextInt(paths);
+ Watcher watcher = createOrGetWatcher(r.nextInt(watchers));
+ if (manager.addWatch(path, watcher)) {
+ watchesAdded.addAndGet(1);
+ }
+ }
+ }
+
+ public void shutdown() {
+ stopped = true;
+ }
+ }
+
+ public class WatcherTriggerWorker extends Thread {
+
+ private final IWatchManager manager;
+ private final int paths;
+ private final AtomicInteger triggeredCount;
+ private volatile boolean stopped = false;
+
+ public WatcherTriggerWorker(IWatchManager manager,
+ int paths, AtomicInteger triggeredCount) {
+ this.manager = manager;
+ this.paths = paths;
+ this.triggeredCount = triggeredCount;
+ }
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ String path = PATH_PREFIX + r.nextInt(paths);
+ WatcherOrBitSet s = manager.triggerWatch(
+ path, EventType.NodeDeleted);
+ if (s != null) {
+ triggeredCount.addAndGet(s.size());
+ }
+ try {
+ Thread.sleep(r.nextInt(10));
+ } catch (InterruptedException e) {}
+ }
+ }
+
+ public void shutdown() {
+ stopped = true;
+ }
+ }
+
+ public class RemoveWatcherWorker extends Thread {
+
+ private final IWatchManager manager;
+ private final int paths;
+ private final int watchers;
+ private final AtomicInteger watchesRemoved;
+ private volatile boolean stopped = false;
+
+ public RemoveWatcherWorker(IWatchManager manager,
+ int paths, int watchers, AtomicInteger watchesRemoved) {
+ this.manager = manager;
+ this.paths = paths;
+ this.watchers = watchers;
+ this.watchesRemoved = watchesRemoved;
+ }
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ String path = PATH_PREFIX + r.nextInt(paths);
+ Watcher watcher = createOrGetWatcher(r.nextInt(watchers));
+ if (manager.removeWatcher(path, watcher)) {
+ watchesRemoved.addAndGet(1);
+ }
+ try {
+ Thread.sleep(r.nextInt(10));
+ } catch (InterruptedException e) {}
+ }
+ }
+
+ public void shutdown() {
+ stopped = true;
+ }
+
+ }
+
+ public class CreateDeadWatchersWorker extends Thread {
+
+ private final IWatchManager manager;
+ private final int watchers;
+ private final Set<Watcher> removedWatchers;
+ private volatile boolean stopped = false;
+
+ public CreateDeadWatchersWorker(IWatchManager manager,
+ int watchers, Set<Watcher> removedWatchers) {
+ this.manager = manager;
+ this.watchers = watchers;
+ this.removedWatchers = removedWatchers;
+ }
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ DumbWatcher watcher = createOrGetWatcher(r.nextInt(watchers));
+ watcher.setStale();
+ manager.removeWatcher(watcher);
+ synchronized (removedWatchers) {
+ removedWatchers.add(watcher);
+ }
+ try {
+ Thread.sleep(r.nextInt(10));
+ } catch (InterruptedException e) {}
+ }
+ }
+
+ public void shutdown() {
+ stopped = true;
+ }
+
+ }
+
+ /**
+ * Concurrently add and trigger watch, make sure the watches triggered
+ * are the same as the number added.
+ */
+ @Test(timeout = 90000)
+ public void testAddAndTriggerWatcher() throws IOException {
+ IWatchManager manager = getWatchManager();
+ int paths = 1;
+ int watchers = 10000;
+
+ // 1. start 5 workers to trigger watchers on that path
+ // count all the watchers have been fired
+ AtomicInteger watchTriggered = new AtomicInteger();
+ List<WatcherTriggerWorker> triggerWorkers =
+ new ArrayList<WatcherTriggerWorker>();
+ for (int i = 0; i < 5; i++) {
+ WatcherTriggerWorker worker =
+ new WatcherTriggerWorker(manager, paths, watchTriggered);
+ triggerWorkers.add(worker);
+ worker.start();
+ }
+
+ // 2. start 5 workers to add different watchers on the same path
+ // count all the watchers being added
+ AtomicInteger watchesAdded = new AtomicInteger();
+ List<AddWatcherWorker> addWorkers = new ArrayList<AddWatcherWorker>();
+ for (int i = 0; i < 5; i++) {
+ AddWatcherWorker worker = new AddWatcherWorker(
+ manager, paths, watchers, watchesAdded);
+ addWorkers.add(worker);
+ worker.start();
+ }
+
+ while (watchesAdded.get() < 100000) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {}
+ }
+
+ // 3. stop all the addWorkers
+ for (AddWatcherWorker worker: addWorkers) {
+ worker.shutdown();
+ }
+
+ // 4. running the trigger worker a bit longer to make sure
+ // all watchers added are fired
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {}
+
+ // 5. stop all triggerWorkers
+ for (WatcherTriggerWorker worker: triggerWorkers) {
+ worker.shutdown();
+ }
+
+ // 6. make sure the total watch triggered is same as added
+ Assert.assertTrue(watchesAdded.get() > 0);
+ Assert.assertEquals(watchesAdded.get(), watchTriggered.get());
+ }
+
+ /**
+ * Concurrently add and remove watch, make sure the watches left +
+ * the watches removed are equal to the total added watches.
+ */
+ @Test(timeout = 90000)
+ public void testRemoveWatcherOnPath() throws IOException {
+ IWatchManager manager = getWatchManager();
+ int paths = 10;
+ int watchers = 10000;
+
+ // 1. start 5 workers to remove watchers on those path
+ // record the watchers have been removed
+ AtomicInteger watchesRemoved = new AtomicInteger();
+ List<RemoveWatcherWorker> removeWorkers =
+ new ArrayList<RemoveWatcherWorker>();
+ for (int i = 0; i < 5; i++) {
+ RemoveWatcherWorker worker =
+ new RemoveWatcherWorker(manager, paths, watchers, watchesRemoved);
+ removeWorkers.add(worker);
+ worker.start();
+ }
+
+ // 2. start 5 workers to add different watchers on different path
+ // record the watchers have been added
+ AtomicInteger watchesAdded = new AtomicInteger();
+ List<AddWatcherWorker> addWorkers = new ArrayList<AddWatcherWorker>();
+ for (int i = 0; i < 5; i++) {
+ AddWatcherWorker worker = new AddWatcherWorker(
+ manager, paths, watchers, watchesAdded);
+ addWorkers.add(worker);
+ worker.start();
+ }
+
+ while (watchesAdded.get() < 100000) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {}
+ }
+
+ // 3. stop all workers
+ for (RemoveWatcherWorker worker: removeWorkers) {
+ worker.shutdown();
+ }
+ for (AddWatcherWorker worker: addWorkers) {
+ worker.shutdown();
+ }
+
+ // 4. sleep for a while to make sure all the thread exited
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {}
+
+ // 5. make sure left watches + removed watches = added watches
+ Assert.assertTrue(watchesAdded.get() > 0);
+ Assert.assertTrue(watchesRemoved.get() > 0);
+ Assert.assertTrue(manager.size() > 0);
+ Assert.assertEquals(
+ watchesAdded.get(), watchesRemoved.get() + manager.size());
+ }
+
+ /**
+ * Concurrently add watch while close the watcher to simulate the
+ * client connections closed on prod.
+ */
+ @Test(timeout = 90000)
+ public void testDeadWatchers() throws IOException {
+ System.setProperty("zookeeper.watcherCleanThreshold", "10");
+ System.setProperty("zookeeper.watcherCleanIntervalInSeconds", "1");
+
+ IWatchManager manager = getWatchManager();
+ int paths = 1;
+ int watchers = 100000;
+
+ // 1. start 5 workers to randomly mark those watcher as dead
+ // and remove them from watch manager
+ Set<Watcher> deadWatchers = new HashSet<Watcher>();
+ List<CreateDeadWatchersWorker> deadWorkers =
+ new ArrayList<CreateDeadWatchersWorker>();
+ for (int i = 0; i < 5; i++) {
+ CreateDeadWatchersWorker worker = new CreateDeadWatchersWorker(
+ manager, watchers, deadWatchers);
+ deadWorkers.add(worker);
+ worker.start();
+ }
+
+ // 2. start 5 workers to add different watchers on the same path
+ AtomicInteger watchesAdded = new AtomicInteger();
+ List<AddWatcherWorker> addWorkers = new ArrayList<AddWatcherWorker>();
+ for (int i = 0; i < 5; i++) {
+ AddWatcherWorker worker = new AddWatcherWorker(
+ manager, paths, watchers, watchesAdded);
+ addWorkers.add(worker);
+ worker.start();
+ }
+
+ while (watchesAdded.get() < 50000) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {}
+ }
+
+ // 3. stop all workers
+ for (CreateDeadWatchersWorker worker: deadWorkers) {
+ worker.shutdown();
+ }
+ for (AddWatcherWorker worker: addWorkers) {
+ worker.shutdown();
+ }
+
+ // 4. sleep for a while to make sure all the thread exited
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+
+ // 5. make sure the dead watchers are not in the existing watchers
+ WatchesReport existingWatchers = manager.getWatches();
+ for (Watcher w: deadWatchers) {
+ Assert.assertFalse(
+ existingWatchers.hasPaths(((ServerCnxn) w).getSessionId()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java
new file mode 100644
index 0000000..d315232
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.common.Time;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class WatcherCleanerTest extends ZKTestCase {
+
+ public static class MyDeadWatcherListener implements IDeadWatcherListener {
+
+ private CountDownLatch latch;
+ private int delayMs;
+ private Set<Integer> deadWatchers = new HashSet<Integer>();
+
+ public void setCountDownLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ public void setDelayMs(int delayMs) {
+ this.delayMs = delayMs;
+ }
+
+ @Override
+ public void processDeadWatchers(Set<Integer> deadWatchers) {
+ if (delayMs > 0) {
+ try {
+ Thread.sleep(delayMs);
+ } catch (InterruptedException e) {}
+ }
+ this.deadWatchers.clear();
+ this.deadWatchers.addAll(deadWatchers);
+ latch.countDown();
+ }
+
+ public Set<Integer> getDeadWatchers() {
+ return deadWatchers;
+ }
+
+ public boolean wait(int maxWaitMs) {
+ try {
+ return latch.await(maxWaitMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {}
+ return false;
+ }
+ }
+
+ @Test
+ public void testProcessDeadWatchersBasedOnThreshold() {
+ MyDeadWatcherListener listener = new MyDeadWatcherListener();
+ int threshold = 3;
+ WatcherCleaner cleaner = new WatcherCleaner(listener, threshold, 60, 1, 10);
+ cleaner.start();
+
+ int i = 0;
+ while (i++ < threshold - 1) {
+ cleaner.addDeadWatcher(i);
+ }
+ // not trigger processDeadWatchers yet
+ Assert.assertEquals(0, listener.getDeadWatchers().size());
+
+ listener.setCountDownLatch(new CountDownLatch(1));
+ // add another dead watcher to trigger the process
+ cleaner.addDeadWatcher(i);
+ Assert.assertTrue(listener.wait(1000));
+ Assert.assertEquals(threshold, listener.getDeadWatchers().size());
+ }
+
+ @Test
+ public void testProcessDeadWatchersBasedOnTime() {
+ MyDeadWatcherListener listener = new MyDeadWatcherListener();
+ WatcherCleaner cleaner = new WatcherCleaner(listener, 10, 1, 1, 10);
+ cleaner.start();
+
+ cleaner.addDeadWatcher(1);
+ // not trigger processDeadWatchers yet
+ Assert.assertEquals(0, listener.getDeadWatchers().size());
+
+ listener.setCountDownLatch(new CountDownLatch(1));
+ Assert.assertTrue(listener.wait(2000));
+ Assert.assertEquals(1, listener.getDeadWatchers().size());
+
+ // won't trigger event if there is no dead watchers
+ listener.setCountDownLatch(new CountDownLatch(1));
+ Assert.assertFalse(listener.wait(2000));
+ }
+
+ @Test
+ public void testMaxInProcessingDeadWatchers() {
+ MyDeadWatcherListener listener = new MyDeadWatcherListener();
+ int delayMs = 1000;
+ listener.setDelayMs(delayMs);
+ WatcherCleaner cleaner = new WatcherCleaner(listener, 1, 60, 1, 1);
+ cleaner.start();
+
+ listener.setCountDownLatch(new CountDownLatch(2));
+
+ long startTime = Time.currentElapsedTime();
+ cleaner.addDeadWatcher(1);
+ cleaner.addDeadWatcher(2);
+ long time = Time.currentElapsedTime() - startTime;
+ System.out.println("time used " + time);
+ Assert.assertTrue(Time.currentElapsedTime() - startTime >= delayMs);
+ Assert.assertTrue(listener.wait(5000));
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java
new file mode 100644
index 0000000..4b7fbd5
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.DumbWatcher;
+import org.apache.zookeeper.server.util.BitHashSet;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class WatcherOrBitSetTest extends ZKTestCase {
+
+ @Test
+ public void testWatcherSet() {
+ Set<Watcher> wset = new HashSet<Watcher>();
+ WatcherOrBitSet hashSet = new WatcherOrBitSet(wset);
+ Assert.assertEquals(0, hashSet.size());
+
+ DumbWatcher w1 = new DumbWatcher();
+ Assert.assertFalse(hashSet.contains(w1));
+ wset.add(w1);
+ Assert.assertTrue(hashSet.contains(w1));
+ Assert.assertEquals(1, hashSet.size());
+ Assert.assertFalse(hashSet.contains(1));
+ }
+
+ @Test
+ public void testBitSet() {
+ BitHashSet bset = new BitHashSet(0);
+ WatcherOrBitSet bitSet = new WatcherOrBitSet(bset);
+ Assert.assertEquals(0, bitSet.size());
+
+ Integer bit = new Integer(1);
+ Assert.assertFalse(bitSet.contains(1));
+ Assert.assertFalse(bitSet.contains(bit));
+
+ bset.add(bit);
+ Assert.assertTrue(bitSet.contains(1));
+ Assert.assertTrue(bitSet.contains(bit));
+ Assert.assertEquals(1, bitSet.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java
new file mode 100644
index 0000000..34e3789
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class WatchesPathReportTest extends ZKTestCase {
+ private Map<String, Set<Long>> m;
+ private WatchesPathReport r;
+ @Before public void setUp() {
+ m = new HashMap<String, Set<Long>>();
+ Set<Long> s = new HashSet<Long>();
+ s.add(101L);
+ s.add(102L);
+ m.put("path1", s);
+ s = new HashSet<Long>();
+ s.add(201L);
+ m.put("path2", s);
+ r = new WatchesPathReport(m);
+ }
+ @Test public void testHasSessions() {
+ assertTrue(r.hasSessions("path1"));
+ assertTrue(r.hasSessions("path2"));
+ assertFalse(r.hasSessions("path3"));
+ }
+ @Test public void testGetSessions() {
+ Set<Long> s = r.getSessions("path1");
+ assertEquals(2, s.size());
+ assertTrue(s.contains(101L));
+ assertTrue(s.contains(102L));
+ s = r.getSessions("path2");
+ assertEquals(1, s.size());
+ assertTrue(s.contains(201L));
+ assertNull(r.getSessions("path3"));
+ }
+ @Test public void testToMap() {
+ assertEquals(m, r.toMap());
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java
new file mode 100644
index 0000000..237583a
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class WatchesReportTest extends ZKTestCase {
+ private Map<Long, Set<String>> m;
+ private WatchesReport r;
+ @Before public void setUp() {
+ m = new HashMap<Long, Set<String>>();
+ Set<String> s = new HashSet<String>();
+ s.add("path1a");
+ s.add("path1b");
+ m.put(1L, s);
+ s = new HashSet<String>();
+ s.add("path2a");
+ m.put(2L, s);
+ r = new WatchesReport(m);
+ }
+ @Test public void testHasPaths() {
+ assertTrue(r.hasPaths(1L));
+ assertTrue(r.hasPaths(2L));
+ assertFalse(r.hasPaths(3L));
+ }
+ @Test public void testGetPaths() {
+ Set<String> s = r.getPaths(1L);
+ assertEquals(2, s.size());
+ assertTrue(s.contains("path1a"));
+ assertTrue(s.contains("path1b"));
+ s = r.getPaths(2L);
+ assertEquals(1, s.size());
+ assertTrue(s.contains("path2a"));
+ assertNull(r.getPaths(3L));
+ }
+ @Test public void testToMap() {
+ assertEquals(m, r.toMap());
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java
new file mode 100644
index 0000000..35956f1
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.Map;
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class WatchesSummaryTest extends ZKTestCase {
+ private WatchesSummary s;
+ @Before public void setUp() {
+ s = new WatchesSummary(1, 2, 3);
+ }
+ @Test public void testGetters() {
+ assertEquals(1, s.getNumConnections());
+ assertEquals(2, s.getNumPaths());
+ assertEquals(3, s.getTotalWatches());
+ }
+ @Test public void testToMap() {
+ Map<String, Object> m = s.toMap();
+ assertEquals(3, m.size());
+ assertEquals(Integer.valueOf(1), m.get(WatchesSummary.KEY_NUM_CONNECTIONS));
+ assertEquals(Integer.valueOf(2), m.get(WatchesSummary.KEY_NUM_PATHS));
+ assertEquals(Integer.valueOf(3), m.get(WatchesSummary.KEY_NUM_TOTAL_WATCHES));
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/test/java/bench/org/apache/zookeeper/BenchMain.java
----------------------------------------------------------------------
diff --git a/src/test/java/bench/org/apache/zookeeper/BenchMain.java b/src/test/java/bench/org/apache/zookeeper/BenchMain.java
new file mode 100644
index 0000000..8e370c0
--- /dev/null
+++ b/src/test/java/bench/org/apache/zookeeper/BenchMain.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+public class BenchMain {
+ public static void main(String args[]) throws Exception {
+ org.openjdk.jmh.Main.main(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java
----------------------------------------------------------------------
diff --git a/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java b/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java
new file mode 100644
index 0000000..0510df7
--- /dev/null
+++ b/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.watch.IWatchManager;
+import org.apache.zookeeper.server.DumbWatcher;
+
+import org.openjdk.jmh.annotations.*;
+
+import java.util.concurrent.TimeUnit;
+
+@Fork(3)
+public class WatchBench {
+
+ static final String pathPrefix = "/reasonably/long/path/";
+ static final EventType event = EventType.NodeDataChanged;
+
+ static IWatchManager createWatchManager(String className) throws Exception {
+ Class clazz = Class.forName(
+ "org.apache.zookeeper.server.watch." + className);
+ return (IWatchManager) clazz.newInstance();
+ }
+
+ static void forceGC() {
+ int gcTimes = 3;
+ for (int i = 0; i < gcTimes; i++) {
+ try {
+ System.gc();
+ Thread.currentThread().sleep(1000);
+
+ System.runFinalization();
+ Thread.currentThread().sleep(1000);
+ } catch (InterruptedException ex) { /* ignore */ }
+ }
+ }
+
+ static long getMemoryUse() {
+ forceGC();
+ long totalMem = Runtime.getRuntime().totalMemory();
+
+ forceGC();
+ long freeMem = Runtime.getRuntime().freeMemory();
+ return totalMem - freeMem;
+ }
+
+ @State(Scope.Benchmark)
+ public static class IterationState {
+
+ @Param({"WatchManager", "WatchManagerOptimized"})
+ public String watchManagerClass;
+
+ @Param({"10000"})
+ public int pathCount;
+
+ String[] paths;
+
+ long watchesAdded = 0;
+ IWatchManager watchManager;
+
+ long memWhenSetup = 0;
+
+ @Setup(Level.Iteration)
+ public void setup() throws Exception {
+ paths = new String[pathCount];
+ for (int i = 0; i < paths.length; i++) {
+ paths[i] = pathPrefix + i;
+ }
+
+ watchesAdded = 0;
+ watchManager = createWatchManager(watchManagerClass);
+
+ memWhenSetup = getMemoryUse();
+ }
+
+ @TearDown(Level.Iteration)
+ public void tearDown() {
+ long memUsed = getMemoryUse() - memWhenSetup;
+ System.out.println("Memory used: " + watchesAdded + " " + memUsed);
+
+ double memPerMillionWatchesMB = memUsed * 1.0 / watchesAdded ;
+ System.out.println(
+ "Memory used per million watches " +
+ String.format("%.2f", memPerMillionWatchesMB) + "MB");
+ }
+ }
+
+ /**
+ * Test concenrate watch case where the watcher watches all paths.
+ *
+ * The output of this test will be the average time used to add the
+ * watch to all paths.
+ */
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+ @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+ public void testAddConcentrateWatch(IterationState state) throws Exception {
+ Watcher watcher = new DumbWatcher();
+
+ // watch all paths
+ for (String path : state.paths) {
+ if (state.watchManager.addWatch(path, watcher)) {
+ state.watchesAdded++;
+ }
+ }
+ }
+
+ @State(Scope.Benchmark)
+ public static class InvocationState {
+
+ @Param({"WatchManager", "WatchManagerOptimized"})
+ public String watchManagerClass;
+
+ @Param({"1", "1000"})
+ public int pathCount;
+
+ @Param({"1", "1000"})
+ public int watcherCount;
+
+ String[] paths;
+ Watcher[] watchers;
+
+ IWatchManager watchManager;
+
+ @Setup(Level.Invocation)
+ public void setup() throws Exception {
+ initialize();
+ prepare();
+ }
+
+ void initialize() throws Exception {
+ if (paths == null || paths.length != pathCount) {
+ paths = new String[pathCount];
+ for (int i = 0; i < pathCount; i++) {
+ paths[i] = pathPrefix + i;
+ }
+ }
+
+ if (watchers == null || watchers.length != watcherCount) {
+ watchers = new Watcher[watcherCount];
+ for (int i = 0; i < watcherCount; i++) {
+ watchers[i] = new DumbWatcher();
+ }
+ }
+ if (watchManager == null ||
+ !watchManager.getClass().getSimpleName().contains(
+ watchManagerClass)) {
+ watchManager = createWatchManager(watchManagerClass);
+ }
+ }
+
+ void prepare() {
+ for (String path : paths) {
+ for (Watcher watcher : watchers) {
+ watchManager.addWatch(path, watcher);
+ }
+ }
+ }
+ }
+
+ /**
+ * Test trigger watches in concenrate case.
+ *
+ * The output of this test is the time used to trigger those watches on
+ * all paths.
+ */
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+ @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+ public void testTriggerConcentrateWatch(InvocationState state) throws Exception {
+ for (String path : state.paths) {
+ state.watchManager.triggerWatch(path, event);
+ }
+ }
+
+ @State(Scope.Benchmark)
+ public static class AddSparseWatchState extends InvocationState {
+
+ @Param({"10000"})
+ public int pathCount;
+
+ @Param({"10000"})
+ public int watcherCount;
+
+ long watchesAdded = 0;
+ long memWhenSetup = 0;
+
+ @Override
+ public void prepare() {
+ watchesAdded = 0;
+ memWhenSetup = getMemoryUse();
+ }
+
+ @TearDown(Level.Invocation)
+ public void tearDown() {
+ long memUsed = getMemoryUse() - memWhenSetup;
+ System.out.println("Memory used: " + watchesAdded + " " + memUsed);
+
+ double memPerMillionWatchesMB = memUsed * 1.0 / watchesAdded ;
+ System.out.println(
+ "Memory used per million sparse watches " +
+ String.format("%.2f", memPerMillionWatchesMB) + "MB");
+
+ // clear all the watches
+ for (String path : paths) {
+ watchManager.triggerWatch(path, event);
+ }
+ }
+ }
+
+ /**
+ * Test sparse watch case where only one watcher watches all paths, and
+ * only one path being watched by all watchers.
+ *
+ * The output of this test will be the average time used to add those
+ * sparse watches.
+ */
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+ @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+ public void testAddSparseWatch(AddSparseWatchState state) throws Exception {
+ // All watchers are watching the 1st path
+ for (Watcher watcher : state.watchers) {
+ if (state.watchManager.addWatch(state.paths[0], watcher)) {
+ state.watchesAdded++;
+ }
+ }
+ // The 1st watcher is watching all paths
+ for (String path : state.paths) {
+ if (state.watchManager.addWatch(path, state.watchers[0])) {
+ state.watchesAdded++;
+ }
+ }
+ }
+
+ @State(Scope.Benchmark)
+ public static class TriggerSparseWatchState extends InvocationState {
+
+ @Param({"10000"})
+ public int pathCount;
+
+ @Param({"10000"})
+ public int watcherCount;
+
+ @Override
+ public void prepare() {
+ // All watchers are watching the 1st path
+ for (Watcher watcher : watchers) {
+ watchManager.addWatch(paths[0], watcher);
+ }
+
+ // The 1st watcher is watching all paths
+ for (String path : paths) {
+ watchManager.addWatch(path, watchers[0]);
+ }
+ }
+ }
+
+
+ /**
+ * Test trigger watches in sparse case.
+ *
+ * The output of this test is the time used to trigger those watches on
+ * all paths.
+ */
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+ @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+ public void testTriggerSparseWatch(TriggerSparseWatchState state) throws Exception {
+ for (String path : state.paths) {
+ state.watchManager.triggerWatch(path, event);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses b/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses
index 2b0fc83..ba29e89 100644
--- a/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses
+++ b/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses
@@ -8,3 +8,4 @@ quorumBench:org.apache.zookeeper.server.QuorumBenchmark:A benchmark of just the
abBench:org.apache.zookeeper.server.quorum.AtomicBroadcastBenchmark:A benchmark of just the atomic broadcast
ic:org.apache.zookeeper.test.system.InstanceContainer:A container that will instantiate classes as directed by an instance manager
systest:org.apache.zookeeper.test.system.BaseSysTest:Start system test
+jmh:org.apache.zookeeper.BenchMain:Run jmh micro benchmarks
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml
----------------------------------------------------------------------
diff --git a/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml b/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml
index 50143d4..b7cd21b 100644
--- a/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml
+++ b/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml
@@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888</programlisting>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term>watchManaggerName</term>
+
+ <listitem>
+ <para>(Java system property only: <emphasis
+ role="bold">zookeeper.watchManagerName</emphasis>)</para>
+
+ <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+ <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> New watcher
+ manager WatchManagerOptimized is added to optimize the memory overhead in heavy watch use cases. This
+ config is used to define which watcher manager to be used. Currently, we only support WatchManager and
+ WatchManagerOptimized.</para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>watcherCleanThreadsNum</term>
+
+ <listitem>
+ <para>(Java system property only: <emphasis
+ role="bold">zookeeper.watcherCleanThreadsNum</emphasis>)</para>
+
+ <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+ <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
+ manager WatchManagerOptimized will clean up the dead watchers lazily, this config is used to decide how
+ many thread is used in the WatcherCleaner. More thread usually means larger clean up throughput. The
+ default value is 2, which is good enough even for heavy and continuous session closing/recreating cases.</para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>watcherCleanThreshold</term>
+
+ <listitem>
+ <para>(Java system property only: <emphasis
+ role="bold">zookeeper.watcherCleanThreshold</emphasis>)</para>
+
+ <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+ <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
+ manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively
+ heavy, batch processing will reduce the cost and improve the performance. This setting is used to decide
+ the batch size. The default one is 1000, we don't need to change it if there is no memory or clean up
+ speed issue.</para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>watcherCleanIntervalInSeconds</term>
+
+ <listitem>
+ <para>(Java system property only: <emphasis
+ role="bold">zookeeper.watcherCleanIntervalInSeconds</emphasis>)</para>
+
+ <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+ <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
+ manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively
+ heavy, batch processing will reduce the cost and improve the performance. Besides watcherCleanThreshold,
+ this setting is used to clean up the dead watchers after certain time even the dead watchers are not larger
+ than watcherCleanThreshold, so that we won't leave the dead watchers there for too long. The default setting
+ is 10 minutes, which usually don't need to be changed.</para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>maxInProcessingDeadWatchers</term>
+
+ <listitem>
+ <para>(Java system property only: <emphasis
+ role="bold">zookeeper.maxInProcessingDeadWatchers</emphasis>)</para>
+
+ <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+ <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> This is used
+ to control how many backlog can we have in the WatcherCleaner, when it reaches this number, it will
+ slow down adding the dead watcher to WatcherCleaner, which will in turn slow down adding and closing
+ watchers, so that we can avoid OOM issue. By default there is no limit, you can set it to values like
+ watcherCleanThreshold * 1000.</para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>bitHashCacheSize</term>
+
+ <listitem>
+ <para>(Java system property only: <emphasis
+ role="bold">zookeeper.bitHashCacheSize</emphasis>)</para>
+
+ <para><emphasis role="bold">New 3.6.0:</emphasis> Added in
+ <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> This is the
+ setting used to decide the HashSet cache size in the BitHashSet implementation. Without HashSet, we
+ need to use O(N) time to get the elements, N is the bit numbers in elementBits. But we need to
+ keep the size small to make sure it doesn't cost too much in memory, there is a trade off between memory
+ and time complexity. The default value is 10, which seems a relatively reasonable cache size.</para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</section>
|