curator-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CURATOR-33) Recursive Node Cache
Date Thu, 07 Aug 2014 02:05:13 GMT

    [ https://issues.apache.org/jira/browse/CURATOR-33?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14088661#comment-14088661
] 

ASF GitHub Bot commented on CURATOR-33:
---------------------------------------

Github user Randgalt commented on a diff in the pull request:

    https://github.com/apache/curator/pull/17#discussion_r15915372
  
    --- Diff: curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
---
    @@ -0,0 +1,648 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.curator.framework.recipes.cache;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.ImmutableMap;
    +import com.google.common.collect.Maps;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.api.BackgroundCallback;
    +import org.apache.curator.framework.api.CuratorEvent;
    +import org.apache.curator.framework.listen.ListenerContainer;
    +import org.apache.curator.framework.state.ConnectionState;
    +import org.apache.curator.framework.state.ConnectionStateListener;
    +import org.apache.curator.utils.CloseableExecutorService;
    +import org.apache.curator.utils.ThreadUtils;
    +import org.apache.curator.utils.ZKPaths;
    +import org.apache.zookeeper.KeeperException;
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.data.Stat;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import java.io.Closeable;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * <p>A utility that attempts to keep all data from all children of a ZK path locally
cached. This class
    + * will watch the ZK path, respond to update/create/delete events, pull down the data,
etc. You can
    + * register a listener that will get notified when changes occur.</p>
    + * <p></p>
    + * <p><b>IMPORTANT</b> - it's not possible to stay transactionally
in sync. Users of this class must
    + * be prepared for false-positives and false-negatives. Additionally, always use the
version number
    + * when updating data to avoid overwriting another process' change.</p>
    + */
    +public class TreeCache implements Closeable
    +{
    +    private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
    +
    +    private enum NodeState
    +    {
    +        PENDING, LIVE, DEAD
    +    }
    +
    +    private final class TreeNode implements Watcher, BackgroundCallback
    +    {
    +        final AtomicReference<NodeState> nodeState = new AtomicReference<NodeState>(NodeState.PENDING);
    +        final TreeNode parent;
    +        final String path;
    +        final AtomicReference<Stat> stat = new AtomicReference<Stat>();
    +        final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
    +        final AtomicReference<ConcurrentMap<String, TreeNode>> children =
new AtomicReference<ConcurrentMap<String, TreeNode>>();
    +
    +        TreeNode(String path, TreeNode parent)
    +        {
    +            this.path = path;
    +            this.parent = parent;
    +        }
    +
    +        private void refresh() throws Exception
    +        {
    +            outstandingOps.addAndGet(2);
    +            doRefreshData();
    +            doRefreshChildren();
    +        }
    +
    +        private void refreshChildren() throws Exception
    +        {
    +            outstandingOps.incrementAndGet();
    +            doRefreshChildren();
    +        }
    +
    +        private void refreshData() throws Exception
    +        {
    +            outstandingOps.incrementAndGet();
    +            doRefreshData();
    +        }
    +
    +        private void doRefreshChildren() throws Exception
    +        {
    +            client.getChildren().usingWatcher(this).inBackground(this).forPath(path);
    +        }
    +
    +        private void doRefreshData() throws Exception
    +        {
    +            if ( dataIsCompressed )
    +            {
    +                client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path);
    +            }
    +            else
    +            {
    +                client.getData().usingWatcher(this).inBackground(this).forPath(path);
    +            }
    +        }
    +
    +        void wasReconnected() throws Exception
    +        {
    +            refresh();
    +            ConcurrentMap<String, TreeNode> childMap = children.get();
    +            if ( childMap != null )
    +            {
    +                for ( TreeNode child : childMap.values() )
    +                {
    +                    child.wasReconnected();
    +                }
    +            }
    +        }
    +
    +        void wasCreated() throws Exception
    +        {
    +            refresh();
    +        }
    +
    +        void wasDeleted() throws Exception
    +        {
    +            stat.set(null);
    +            data.set(null);
    +            client.clearWatcherReferences(this);
    +            ConcurrentMap<String, TreeNode> childMap = children.getAndSet(null);
    +            if ( childMap != null )
    +            {
    +                ArrayList<TreeNode> childCopy = new ArrayList<TreeNode>(childMap.values());
    +                childMap.clear();
    +                for ( TreeNode child : childCopy )
    +                {
    +                    child.wasDeleted();
    +                }
    +            }
    +
    +            if ( treeState.get() == TreeState.CLOSED )
    +            {
    +                return;
    +            }
    +
    +            if ( nodeState.compareAndSet(NodeState.LIVE, NodeState.DEAD) )
    +            {
    +                publishEvent(TreeCacheEvent.Type.NODE_REMOVED, path);
    +            }
    +
    +            if ( parent == null )
    +            {
    +                // Root node; use an exist query to watch for existence.
    +                client.checkExists().usingWatcher(this).inBackground().forPath(path);
    +            }
    +            else
    +            {
    +                // Remove from parent if we're currently a child
    +                ConcurrentMap<String, TreeNode> parentChildMap = parent.children.get();
    +                if ( parentChildMap != null )
    +                {
    +                    parentChildMap.remove(ZKPaths.getNodeFromPath(path), this);
    +                }
    +            }
    +        }
    +
    +        @Override
    +        public void process(WatchedEvent event)
    +        {
    +            try
    +            {
    +                switch ( event.getType() )
    +                {
    +                case NodeCreated:
    +                    Preconditions.checkState(parent == null, "unexpected NodeCreated
on non-root node");
    +                    wasCreated();
    +                    break;
    +                case NodeChildrenChanged:
    +                    refreshChildren();
    +                    break;
    +                case NodeDataChanged:
    +                    refreshData();
    +                    break;
    +                case NodeDeleted:
    +                    wasDeleted();
    +                    break;
    +                }
    +            }
    +            catch ( Exception e )
    +            {
    +                handleException(e);
    +            }
    +        }
    +
    +        @Override
    +        public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception
    +        {
    +            Stat newStat = event.getStat();
    +            switch ( event.getType() )
    +            {
    +            case EXISTS:
    +                Preconditions.checkState(parent == null, "unexpected EXISTS on non-root
node");
    +                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
    +                {
    +                    nodeState.compareAndSet(NodeState.DEAD, NodeState.PENDING);
    +                    wasCreated();
    +                }
    +                else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue()
)
    +                {
    +                    wasDeleted();
    +                }
    +                break;
    +            case CHILDREN:
    +                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
    +                {
    +                    Stat oldStat = stat.get();
    +                    if (oldStat != null && oldStat.getMzxid() == newStat.getMzxid())
{
    +                        // Only update stat if mzxid is different, otherwise we might
obscure
    +                        // GET_DATA event updates.
    +                        stat.set(newStat);
    +                    }
    +
    +                    if ( event.getChildren().isEmpty() )
    +                    {
    +                        break;
    +                    }
    +
    +                    ConcurrentMap<String, TreeNode> childMap = children.get();
    +                    if ( childMap == null )
    +                    {
    +                        childMap = Maps.newConcurrentMap();
    +                        if ( !children.compareAndSet(null, childMap) )
    +                        {
    +                            childMap = children.get();
    +                        }
    +                    }
    +
    +                    // Present new children in sorted order for test determinism.
    +                    List<String> newChildren = new ArrayList<String>();
    +                    for ( String child : event.getChildren() )
    +                    {
    +                        if ( !childMap.containsKey(child) )
    +                        {
    +                            newChildren.add(child);
    +                        }
    +                    }
    +
    +                    Collections.sort(newChildren);
    +                    for ( String child : newChildren )
    +                    {
    +                        String fullPath = ZKPaths.makePath(path, child);
    +                        TreeNode node = new TreeNode(fullPath, this);
    +                        if ( childMap.putIfAbsent(child, node) == null )
    +                        {
    +                            node.wasCreated();
    +                        }
    +                    }
    +                }
    +                else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue()
)
    +                {
    +                    wasDeleted();
    +                }
    +                break;
    +            case GET_DATA:
    +                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
    +                {
    +                    if ( cacheData )
    +                    {
    +                        data.set(event.getData());
    +                    }
    +
    +                    Stat oldStat = stat.getAndSet(newStat);
    +                    if ( nodeState.compareAndSet(NodeState.PENDING, NodeState.LIVE) )
    +                    {
    +                        publishEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData(event.getPath(),
newStat, event.getData()));
    +                    }
    +                    else
    +                    {
    +                        if ( oldStat == null || oldStat.getMzxid() != newStat.getMzxid()
)
    +                        {
    +                            publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData(event.getPath(),
newStat, event.getData()));
    +                        }
    +                    }
    +                }
    +                else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue()
)
    +                {
    +                    wasDeleted();
    +                }
    +                break;
    +            default:
    +                handleException(new Exception(String.format("Unknown event %s", event)));
    --- End diff --
    
    Please add a break


> Recursive Node Cache
> --------------------
>
>                 Key: CURATOR-33
>                 URL: https://issues.apache.org/jira/browse/CURATOR-33
>             Project: Apache Curator
>          Issue Type: Improvement
>          Components: Recipes
>            Reporter: John Vines
>            Assignee: Jordan Zimmerman
>             Fix For: TBD
>
>         Attachments: CURATOR-33.2.patch, CURATOR-33.patch
>
>
> Currently the PathChildrenCache will trigger listen events for all children at the given
node. However, it would be useful to have a cache that would trigger listen events for the
entire hierarchy below the given node.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message