curator-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CURATOR-33) Recursive Node Cache
Date Thu, 31 Jul 2014 22:50:39 GMT

    [ https://issues.apache.org/jira/browse/CURATOR-33?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14081629#comment-14081629
] 

ASF GitHub Bot commented on CURATOR-33:
---------------------------------------

Github user dragonsinth commented on a diff in the pull request:

    https://github.com/apache/curator/pull/17#discussion_r15674624
  
    --- Diff: curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
---
    @@ -0,0 +1,600 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.curator.framework.recipes.cache;
    +
    +import com.google.common.base.Function;
    +import com.google.common.collect.ImmutableSortedSet;
    +import com.google.common.collect.Maps;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.api.BackgroundCallback;
    +import org.apache.curator.framework.api.CuratorEvent;
    +import org.apache.curator.framework.listen.ListenerContainer;
    +import org.apache.curator.framework.state.ConnectionState;
    +import org.apache.curator.framework.state.ConnectionStateListener;
    +import org.apache.curator.utils.CloseableExecutorService;
    +import org.apache.curator.utils.ThreadUtils;
    +import org.apache.curator.utils.ZKPaths;
    +import org.apache.zookeeper.KeeperException;
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.data.Stat;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.SortedSet;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * <p>A utility that attempts to keep all data from all children of a ZK path locally
cached. This class
    + * will watch the ZK path, respond to update/create/delete events, pull down the data,
etc. You can
    + * register a listener that will get notified when changes occur.</p>
    + * <p></p>
    + * <p><b>IMPORTANT</b> - it's not possible to stay transactionally
in sync. Users of this class must
    + * be prepared for false-positives and false-negatives. Additionally, always use the
version number
    + * when updating data to avoid overwriting another process' change.</p>
    + */
    +public class TreeCache implements Closeable
    +{
    +    private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
    +
    +    private enum NodeState
    +    {
    +        PENDING, LIVE, DEAD
    +    }
    +
    +    final class TreeNode implements Watcher, BackgroundCallback
    +    {
    +        private final AtomicReference<NodeState> nodeState = new AtomicReference<NodeState>(NodeState.PENDING);
    +        private final String path;
    +        private final TreeNode parent;
    +        private final AtomicReference<Stat> stat = new AtomicReference<Stat>();
    +        private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
    +        private final AtomicReference<ConcurrentMap<String, TreeNode>> children
= new AtomicReference<ConcurrentMap<String, TreeNode>>();
    +
    +        TreeNode(String path, TreeNode parent)
    +        {
    +            this.path = path;
    +            this.parent = parent;
    +        }
    +
    +        private void refreshChildren() throws Exception
    +        {
    +            outstandingOps.incrementAndGet();
    +            client.getChildren().usingWatcher(this).inBackground(this).forPath(path);
    +        }
    +
    +        private void refreshData() throws Exception
    +        {
    +            outstandingOps.incrementAndGet();
    +            if ( dataIsCompressed )
    +            {
    +                client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path);
    +            }
    +            else
    +            {
    +                client.getData().usingWatcher(this).inBackground(this).forPath(path);
    +            }
    +        }
    +
    +        private void wasReconnected() throws Exception
    +        {
    +            refreshData();
    +            refreshChildren();
    +            ConcurrentMap<String, TreeNode> childMap = children.get();
    +            if ( childMap != null )
    +            {
    +                for ( TreeNode child : childMap.values() )
    +                {
    +                    child.wasReconnected();
    +                }
    +            }
    +        }
    +
    +        private void wasCreated() throws Exception
    +        {
    +            refreshData();
    +            refreshChildren();
    +        }
    +
    +        private void wasDeleted() throws Exception
    +        {
    +            stat.set(null);
    +            data.set(null);
    +            client.clearWatcherReferences(this);
    +            ConcurrentMap<String, TreeNode> childMap = children.getAndSet(null);
    +            if ( childMap != null )
    +            {
    +                ArrayList<TreeNode> childCopy = new ArrayList<TreeNode>(childMap.values());
    +                childMap.clear();
    +                for ( TreeNode child : childCopy )
    +                {
    +                    child.wasDeleted();
    +                }
    +            }
    +
    +            if ( treeState.get() == TreeState.CLOSED )
    +            {
    +                return;
    +            }
    +
    +            if ( nodeState.compareAndSet(NodeState.LIVE, NodeState.DEAD) )
    +            {
    +                publishEvent(TreeCacheEvent.Type.NODE_REMOVED, path);
    +            }
    +
    +            if ( parent == null )
    +            {
    +                // Root node; use an exist query to watch for existence.
    +                client.checkExists().usingWatcher(this).inBackground().forPath(path);
    +            }
    +            else
    +            {
    +                // Remove from parent if we're currently a child
    +                ConcurrentMap<String, TreeNode> parentChildMap = parent.children.get();
    +                if ( parentChildMap != null )
    +                {
    +                    parentChildMap.remove(ZKPaths.getNodeFromPath(path), this);
    +                }
    +            }
    +        }
    +
    +        @Override
    +        public void process(WatchedEvent event)
    +        {
    +            System.out.println(event);
    +            try
    +            {
    +                switch ( event.getType() )
    +                {
    +                case NodeCreated:
    +                    assert parent == null;
    +                    wasCreated();
    +                    break;
    +                case NodeChildrenChanged:
    +                    refreshChildren();
    +                    break;
    +                case NodeDataChanged:
    +                    refreshData();
    +                    break;
    +                case NodeDeleted:
    +                    wasDeleted();
    +                    break;
    +                }
    +            }
    +            catch ( Exception e )
    +            {
    +                handleException(e);
    +            }
    +        }
    +
    +        @Override
    +        public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception
    +        {
    +            System.out.println(event);
    +            switch ( event.getType() )
    +            {
    +            case EXISTS:
    +                // TODO: should only happen for root node
    +                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
    +                {
    +                    nodeState.compareAndSet(NodeState.DEAD, NodeState.PENDING);
    +                    wasCreated();
    +                }
    +                else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue()
)
    +                {
    +                    wasDeleted();
    +                }
    +                break;
    +            case CHILDREN:
    +                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
    +                {
    +                    stat.set(event.getStat());
    +
    +                    if ( event.getChildren().isEmpty() )
    +                    {
    +                        break;
    +                    }
    +
    +                    ConcurrentMap<String, TreeNode> childMap = children.get();
    +                    if ( childMap == null )
    +                    {
    +                        childMap = Maps.newConcurrentMap();
    +                        if ( !children.compareAndSet(null, childMap) )
    +                        {
    +                            childMap = children.get();
    +                        }
    +                    }
    +
    +                    for ( String child : event.getChildren() )
    +                    {
    +                        String fullPath = ZKPaths.makePath(path, child);
    +                        if ( !childMap.containsKey(child) )
    +                        {
    +                            TreeNode node = new TreeNode(fullPath, this);
    +                            if ( childMap.putIfAbsent(child, node) == null )
    +                            {
    +                                node.wasCreated();
    +                            }
    +                        }
    +                    }
    +                }
    +                else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue()
)
    +                {
    +                    wasDeleted();
    +                }
    +                break;
    +            case GET_DATA:
    +                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
    +                {
    +                    Stat oldStat = stat.getAndSet(event.getStat());
    +                    if ( cacheData )
    +                    {
    +                        data.set(event.getData());
    +                    }
    +
    +                    if ( nodeState.compareAndSet(NodeState.PENDING, NodeState.LIVE) )
    +                    {
    +                        publishEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData(event.getPath(),
event.getStat(), event.getData()));
    +                    }
    +                    else if ( oldStat.getMzxid() != event.getStat().getMzxid() )
    +                    {
    +                        publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData(event.getPath(),
event.getStat(), event.getData()));
    +                    }
    +                }
    +                else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue()
)
    +                {
    +                    wasDeleted();
    +                }
    +                break;
    +            default:
    +                handleException(new Exception(String.format("Unknown event %s", event)));
    +            }
    +
    +            if ( outstandingOps.decrementAndGet() == 0 )
    +            {
    +                if ( treeState.compareAndSet(TreeState.LATENT, TreeState.STARTED) )
    +                {
    +                    publishEvent(TreeCacheEvent.Type.INITIALIZED);
    +                }
    +            }
    +        }
    +    }
    +
    +    private enum TreeState
    +    {
    +        LATENT,
    +        STARTED,
    +        CLOSED
    +    }
    +
    +    /**
    +     * Detemines when to publish the initialized event.
    +     */
    +    private final AtomicLong outstandingOps = new AtomicLong(0);
    +
    +    private final TreeNode root;
    +    private final CuratorFramework client;
    +    private final CloseableExecutorService executorService;
    +    private final boolean cacheData;
    +    private final boolean dataIsCompressed;
    +    private final ListenerContainer<TreeCacheListener> listeners = new ListenerContainer<TreeCacheListener>();
    +    private final AtomicReference<TreeState> treeState = new AtomicReference<TreeState>(TreeState.LATENT);
    +
    +    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
    +    {
    +        @Override
    +        public void stateChanged(CuratorFramework client, ConnectionState newState)
    +        {
    +            handleStateChange(newState);
    +        }
    +    };
    +
    +    private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("TreeCache");
    +
    +    /**
    +     * @param client    the client
    +     * @param path      path to watch
    +     * @param cacheData if true, node contents are cached in addition to the stat
    +     */
    +    public TreeCache(CuratorFramework client, String path, boolean cacheData)
    +    {
    +        this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory),
true));
    +    }
    +
    +    /**
    +     * @param client        the client
    +     * @param path          path to watch
    +     * @param cacheData     if true, node contents are cached in addition to the stat
    +     * @param threadFactory factory to use when creating internal threads
    +     */
    +    public TreeCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory
threadFactory)
    +    {
    +        this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory),
true));
    +    }
    +
    +    /**
    +     * @param client           the client
    +     * @param path             path to watch
    +     * @param cacheData        if true, node contents are cached in addition to the stat
    +     * @param dataIsCompressed if true, data in the path is compressed
    +     * @param threadFactory    factory to use when creating internal threads
    +     */
    +    public TreeCache(CuratorFramework client, String path, boolean cacheData, boolean
dataIsCompressed, ThreadFactory threadFactory)
    +    {
    +        this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory),
true));
    +    }
    +
    +    /**
    +     * @param client           the client
    +     * @param path             path to watch
    +     * @param cacheData        if true, node contents are cached in addition to the stat
    +     * @param dataIsCompressed if true, data in the path is compressed
    +     * @param executorService  ExecutorService to use for the TreeCache's background
thread
    +     */
    +    public TreeCache(CuratorFramework client, String path, boolean cacheData, boolean
dataIsCompressed, final ExecutorService executorService)
    +    {
    +        this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService));
    +    }
    +
    +    /**
    +     * @param client           the client
    +     * @param path             path to watch
    +     * @param cacheData        if true, node contents are cached in addition to the stat
    +     * @param dataIsCompressed if true, data in the path is compressed
    +     * @param executorService  Closeable ExecutorService to use for the TreeCache's background
thread
    +     */
    +    public TreeCache(CuratorFramework client, String path, boolean cacheData, boolean
dataIsCompressed, final CloseableExecutorService executorService)
    +    {
    +        this.root = new TreeNode(path, null);
    +        this.client = client;
    +        this.cacheData = cacheData;
    +        this.dataIsCompressed = dataIsCompressed;
    +        this.executorService = executorService;
    +    }
    +
    +    /**
    +     * Start the cache. The cache is not started automatically. You must call this method.
    +     *
    +     * @throws Exception errors
    +     */
    +    public void start() throws Exception
    +    {
    +        client.getConnectionStateListenable().addListener(connectionStateListener);
    +        root.wasCreated();
    +    }
    +
    +    /**
    +     * Close/end the cache
    +     *
    +     * @throws java.io.IOException errors
    +     */
    +    @Override
    +    public void close() throws IOException
    +    {
    +        if ( treeState.compareAndSet(TreeState.STARTED, TreeState.CLOSED) )
    +        {
    +            client.getConnectionStateListenable().removeListener(connectionStateListener);
    +            listeners.clear();
    +            executorService.close();
    +            try
    +            {
    +                root.wasDeleted();
    +            }
    +            catch ( Exception e )
    +            {
    +                handleException(e);
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Return the cache listenable
    +     *
    +     * @return listenable
    +     */
    +    public ListenerContainer<TreeCacheListener> getListenable()
    +    {
    +        return listeners;
    +    }
    +
    +    private TreeNode find(String fullPath)
    +    {
    +        if ( !fullPath.startsWith(root.path) )
    +        {
    +            return null;
    +        }
    +
    +        TreeNode current = root;
    +        if ( fullPath.length() > root.path.length() )
    +        {
    +            List<String> split = ZKPaths.split(fullPath.substring(root.path.length()));
    +            for ( String part : split )
    +            {
    +                ConcurrentMap<String, TreeNode> map = current.children.get();
    +                if ( map == null )
    +                {
    +                    return null;
    +                }
    +                current = map.get(part);
    +                if ( current == null )
    +                {
    +                    return null;
    +                }
    +            }
    +        }
    +        return current;
    +    }
    +
    +    /**
    +     * Return the current set of children. There are no guarantees of accuracy. This
is
    +     * merely the most recent view of the data. The data is returned in sorted order.
If there is
    --- End diff --
    
    I went ahead and changed this to return `SortedMap<String, ChildData>` to return
both the names and data of all current children.  I will add another accessor to return an
entire subtree so I can document that it might be expensive.


> Recursive Node Cache
> --------------------
>
>                 Key: CURATOR-33
>                 URL: https://issues.apache.org/jira/browse/CURATOR-33
>             Project: Apache Curator
>          Issue Type: Improvement
>          Components: Recipes
>            Reporter: John Vines
>            Assignee: Jordan Zimmerman
>             Fix For: TBD
>
>         Attachments: CURATOR-33.2.patch, CURATOR-33.patch
>
>
> Currently the PathChildrenCache will trigger listen events for all children at the given
node. However, it would be useful to have a cache that would trigger listen events for the
entire hierarchy below the given node.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message