Github user anmolnar commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/590#discussion_r216973199
--- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.Iterator;
+import java.lang.Iterable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.util.BitMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Optimized in memory and time complexity, compared to WatchManager, both the
+ * memory consumption and time complexity improved a lot, but it cannot
+ * efficiently remove the watcher when the session or socket is closed, for
+ * majority usecase this is not a problem.
+ *
+ * Changed made compared to WatchManager:
+ *
+ * - Use HashSet and BitSet to store the watchers to find a balance between
+ * memory usage and time complexity
+ * - Use ReadWriteLock instead of synchronized to reduce lock retention
+ * - Lazily clean up the closed watchers
+ */
+public class WatchManagerOptimized
+ implements IWatchManager, DeadWatcherListener {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WatchManagerOptimized.class);
+
+ private final ConcurrentHashMap<String, BitHashSet> pathWatches =
+ new ConcurrentHashMap<String, BitHashSet>();
+
+ // watcher to bit id mapping
+ private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
+
+ // used to lazily remove the dead watchers
+ private final WatcherCleaner watcherCleaner;
+
+ private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
+
+ public WatchManagerOptimized() {
+ watcherCleaner = new WatcherCleaner(this);
+ watcherCleaner.start();
+ }
+
+ @Override
+ public boolean addWatch(String path, Watcher watcher) {
+ boolean result = false;
+ addRemovePathRWLock.readLock().lock();
+ try {
+ // avoid race condition of adding a on flying dead watcher
+ if (isDeadWatcher(watcher)) {
+ LOG.debug("Ignoring addWatch with closed cnxn");
+ } else {
+ Integer bit = watcherBitIdMap.add(watcher);
+ BitHashSet watchers = pathWatches.get(path);
+ if (watchers == null) {
+ watchers = new BitHashSet();
+ BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
+ if (existingWatchers != null) {
+ watchers = existingWatchers;
+ }
+ }
+ result = watchers.add(bit);
+ }
+ } finally {
+ addRemovePathRWLock.readLock().unlock();
+ }
+ return result;
+ }
+
+ @Override
+ public boolean containsWatcher(String path, Watcher watcher) {
+ BitHashSet watchers = pathWatches.get(path);
+ if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher)))
{
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean removeWatcher(String path, Watcher watcher) {
+ addRemovePathRWLock.writeLock().lock();
+ try {
+ BitHashSet list = pathWatches.get(path);
+ if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
+ return false;
+ }
+ if (list.isEmpty()) {
+ pathWatches.remove(path);
+ }
+ return true;
+ } finally {
+ addRemovePathRWLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void removeWatcher(Watcher watcher) {
+ Integer watcherBit;
+ addRemovePathRWLock.writeLock().lock();
--- End diff --
Where do you mark the watcher as stale inside the critical block?
It only calls a getter on the `BitIdMap`, right?
---
|