Return-Path: X-Original-To: apmail-curator-dev-archive@minotaur.apache.org Delivered-To: apmail-curator-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0B28B11D65 for ; Wed, 23 Jul 2014 23:44:40 +0000 (UTC) Received: (qmail 60204 invoked by uid 500); 23 Jul 2014 23:44:39 -0000 Delivered-To: apmail-curator-dev-archive@curator.apache.org Received: (qmail 60158 invoked by uid 500); 23 Jul 2014 23:44:39 -0000 Mailing-List: contact dev-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list dev@curator.apache.org Received: (qmail 60146 invoked by uid 99); 23 Jul 2014 23:44:39 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Jul 2014 23:44:39 +0000 Date: Wed, 23 Jul 2014 23:44:39 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@curator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (CURATOR-33) Recursive Node Cache MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/CURATOR-33?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14072560#comment-14072560 ] ASF GitHub Bot commented on CURATOR-33: --------------------------------------- Github user cammckenzie commented on a diff in the pull request: https://github.com/apache/curator/pull/17#discussion_r15323038 --- Diff: curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java --- @@ -0,0 +1,600 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Maps; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.CloseableExecutorService; +import org.apache.curator.utils.ThreadUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + *

A utility that attempts to keep all data from all children of a ZK path locally cached. This class + * will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can + * register a listener that will get notified when changes occur.

+ *

+ *

IMPORTANT - it's not possible to stay transactionally in sync. Users of this class must + * be prepared for false-positives and false-negatives. Additionally, always use the version number + * when updating data to avoid overwriting another process' change.

+ */ +public class TreeCache implements Closeable +{ + private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class); + + private enum NodeState + { + PENDING, LIVE, DEAD + } + + final class TreeNode implements Watcher, BackgroundCallback + { + private final AtomicReference nodeState = new AtomicReference(NodeState.PENDING); + private final String path; + private final TreeNode parent; + private final AtomicReference stat = new AtomicReference(); + private final AtomicReference data = new AtomicReference(); + private final AtomicReference> children = new AtomicReference>(); + + TreeNode(String path, TreeNode parent) + { + this.path = path; + this.parent = parent; + } + + private void refreshChildren() throws Exception + { + outstandingOps.incrementAndGet(); + client.getChildren().usingWatcher(this).inBackground(this).forPath(path); + } + + private void refreshData() throws Exception + { + outstandingOps.incrementAndGet(); + if ( dataIsCompressed ) + { + client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path); + } + else + { + client.getData().usingWatcher(this).inBackground(this).forPath(path); + } + } + + private void wasReconnected() throws Exception + { + refreshData(); + refreshChildren(); + ConcurrentMap childMap = children.get(); + if ( childMap != null ) + { + for ( TreeNode child : childMap.values() ) + { + child.wasReconnected(); + } + } + } + + private void wasCreated() throws Exception + { + refreshData(); + refreshChildren(); + } + + private void wasDeleted() throws Exception + { + stat.set(null); + data.set(null); + client.clearWatcherReferences(this); + ConcurrentMap childMap = children.getAndSet(null); + if ( childMap != null ) + { + ArrayList childCopy = new ArrayList(childMap.values()); + childMap.clear(); + for ( TreeNode child : childCopy ) + { + child.wasDeleted(); + } + } + + if ( treeState.get() == TreeState.CLOSED ) + { + return; + } + + if ( nodeState.compareAndSet(NodeState.LIVE, NodeState.DEAD) ) + { + publishEvent(TreeCacheEvent.Type.NODE_REMOVED, path); + } + + if ( parent == null ) + { + // Root node; use an exist query to watch for existence. + client.checkExists().usingWatcher(this).inBackground().forPath(path); + } + else + { + // Remove from parent if we're currently a child + ConcurrentMap parentChildMap = parent.children.get(); + if ( parentChildMap != null ) + { + parentChildMap.remove(ZKPaths.getNodeFromPath(path), this); + } + } + } + + @Override + public void process(WatchedEvent event) + { + System.out.println(event); + try + { + switch ( event.getType() ) + { + case NodeCreated: + assert parent == null; + wasCreated(); + break; + case NodeChildrenChanged: + refreshChildren(); + break; + case NodeDataChanged: + refreshData(); + break; + case NodeDeleted: + wasDeleted(); + break; + } + } + catch ( Exception e ) + { + handleException(e); + } + } + + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + System.out.println(event); + switch ( event.getType() ) + { + case EXISTS: + // TODO: should only happen for root node + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + nodeState.compareAndSet(NodeState.DEAD, NodeState.PENDING); + wasCreated(); + } + else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) + { + wasDeleted(); + } + break; + case CHILDREN: + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + stat.set(event.getStat()); + + if ( event.getChildren().isEmpty() ) + { + break; + } + + ConcurrentMap childMap = children.get(); + if ( childMap == null ) + { + childMap = Maps.newConcurrentMap(); + if ( !children.compareAndSet(null, childMap) ) + { + childMap = children.get(); + } + } + + for ( String child : event.getChildren() ) + { + String fullPath = ZKPaths.makePath(path, child); + if ( !childMap.containsKey(child) ) + { + TreeNode node = new TreeNode(fullPath, this); + if ( childMap.putIfAbsent(child, node) == null ) + { + node.wasCreated(); + } + } + } + } + else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) + { + wasDeleted(); + } + break; + case GET_DATA: + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + Stat oldStat = stat.getAndSet(event.getStat()); + if ( cacheData ) + { + data.set(event.getData()); + } + + if ( nodeState.compareAndSet(NodeState.PENDING, NodeState.LIVE) ) + { + publishEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData(event.getPath(), event.getStat(), event.getData())); + } + else if ( oldStat.getMzxid() != event.getStat().getMzxid() ) + { + publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData(event.getPath(), event.getStat(), event.getData())); + } + } + else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) + { + wasDeleted(); + } + break; + default: + handleException(new Exception(String.format("Unknown event %s", event))); + } + + if ( outstandingOps.decrementAndGet() == 0 ) + { + if ( treeState.compareAndSet(TreeState.LATENT, TreeState.STARTED) ) + { + publishEvent(TreeCacheEvent.Type.INITIALIZED); + } + } + } + } + + private enum TreeState + { + LATENT, + STARTED, + CLOSED + } + + /** + * Detemines when to publish the initialized event. + */ + private final AtomicLong outstandingOps = new AtomicLong(0); + + private final TreeNode root; + private final CuratorFramework client; + private final CloseableExecutorService executorService; + private final boolean cacheData; + private final boolean dataIsCompressed; + private final ListenerContainer listeners = new ListenerContainer(); + private final AtomicReference treeState = new AtomicReference(TreeState.LATENT); + + private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + handleStateChange(newState); + } + }; + + private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("TreeCache"); + + /** + * @param client the client + * @param path path to watch + * @param cacheData if true, node contents are cached in addition to the stat + */ + public TreeCache(CuratorFramework client, String path, boolean cacheData) + { + this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); + } + + /** + * @param client the client + * @param path path to watch + * @param cacheData if true, node contents are cached in addition to the stat + * @param threadFactory factory to use when creating internal threads + */ + public TreeCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory) + { + this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); + } + + /** + * @param client the client + * @param path path to watch + * @param cacheData if true, node contents are cached in addition to the stat + * @param dataIsCompressed if true, data in the path is compressed + * @param threadFactory factory to use when creating internal threads + */ + public TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory) + { + this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); + } + + /** + * @param client the client + * @param path path to watch + * @param cacheData if true, node contents are cached in addition to the stat + * @param dataIsCompressed if true, data in the path is compressed + * @param executorService ExecutorService to use for the TreeCache's background thread + */ + public TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService) + { + this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService)); + } + + /** + * @param client the client + * @param path path to watch + * @param cacheData if true, node contents are cached in addition to the stat + * @param dataIsCompressed if true, data in the path is compressed + * @param executorService Closeable ExecutorService to use for the TreeCache's background thread + */ + public TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService) + { + this.root = new TreeNode(path, null); + this.client = client; + this.cacheData = cacheData; + this.dataIsCompressed = dataIsCompressed; + this.executorService = executorService; + } + + /** + * Start the cache. The cache is not started automatically. You must call this method. + * + * @throws Exception errors + */ + public void start() throws Exception + { + client.getConnectionStateListenable().addListener(connectionStateListener); + root.wasCreated(); + } + + /** + * Close/end the cache + * + * @throws java.io.IOException errors + */ + @Override + public void close() throws IOException + { + if ( treeState.compareAndSet(TreeState.STARTED, TreeState.CLOSED) ) + { + client.getConnectionStateListenable().removeListener(connectionStateListener); + listeners.clear(); + executorService.close(); + try + { + root.wasDeleted(); + } + catch ( Exception e ) + { + handleException(e); + } + } + } + + /** + * Return the cache listenable + * + * @return listenable + */ + public ListenerContainer getListenable() + { + return listeners; + } + + private TreeNode find(String fullPath) + { + if ( !fullPath.startsWith(root.path) ) + { + return null; + } + + TreeNode current = root; + if ( fullPath.length() > root.path.length() ) + { + List split = ZKPaths.split(fullPath.substring(root.path.length())); + for ( String part : split ) + { + ConcurrentMap map = current.children.get(); + if ( map == null ) + { + return null; + } + current = map.get(part); + if ( current == null ) + { + return null; + } + } + } + return current; + } + + /** + * Return the current set of children. There are no guarantees of accuracy. This is + * merely the most recent view of the data. The data is returned in sorted order. If there is + * no child with that path, null is returned. + * + * @param fullPath full path to the node to check + * @return a possibly-empty list of children if the node is alive, or null + */ + public SortedSet getCurrentChildren(String fullPath) + { + TreeNode node = find(fullPath); + if ( node == null || node.nodeState.get() != NodeState.LIVE ) + { + return null; + } + ConcurrentMap map = node.children.get(); + SortedSet result; + if ( map == null ) + { + result = Collections.emptySortedSet(); --- End diff -- Collections.emptySortedSet() is a new addition to Java 1.8. Curator is still building against Java 1.6. > Recursive Node Cache > -------------------- > > Key: CURATOR-33 > URL: https://issues.apache.org/jira/browse/CURATOR-33 > Project: Apache Curator > Issue Type: Improvement > Components: Recipes > Reporter: John Vines > Fix For: awaiting-response > > Attachments: CURATOR-33.2.patch, CURATOR-33.patch > > > Currently the PathChildrenCache will trigger listen events for all children at the given node. However, it would be useful to have a cache that would trigger listen events for the entire hierarchy below the given node. -- This message was sent by Atlassian JIRA (v6.2#6252)