zookeeper-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hanm <...@git.apache.org>
Subject [GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Date Tue, 25 Sep 2018 04:26:27 GMT
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r220055378
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher)))
{
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            // do nothing if the watcher is not tracked
    +            watcherBit = watcherBitIdMap.getBit(watcher);
    +            if (watcherBit == null) {
    +                return;
    +            }
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +        watcherCleaner.addDeadWatcher(watcherBit);
    +    }
    +
    +    /**
    +     * Entry for WatcherCleaner to remove dead watchers
    +     *
    +     * @param deadWatchers the watchers need to be removed
    +     */
    +    @Override
    +    public void processDeadWatchers(Set<Integer> deadWatchers) {
    +        BitSet bits = new BitSet();
    +        for (int dw: deadWatchers) {
    +            bits.set(dw);
    +        }
    +        // The value iterator will reflect the state when it was
    +        // created, don't need to synchronize.
    +        for (BitHashSet watchers: pathWatches.values()) {
    +            watchers.remove(deadWatchers, bits);
    +        }
    +        // Better to remove the empty path from pathWatches, but it will add
    +        // lot of lock contention and affect the throughput of addWatch,
    +        // let's rely on the triggerWatch to delete it.
    +        for (Integer wbit: deadWatchers) {
    +            watcherBitIdMap.remove(wbit);
    +        }
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(String path, EventType type) {
    +        return triggerWatch(path, type, null);
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(
    +            String path, EventType type, WatcherOrBitSet supress) {
    +        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    +
    +        BitHashSet watchers = remove(path);
    +        if (watchers == null) {
    +            return null;
    +        }
    +
    +        int triggeredWatches = 0;
    --- End diff --
    
    I knew it's the metrics. It's fine to leave this variable and we can add metrics in another
patch, since this patch is already big enough and almost ready to land.


---

Mime
View raw message