curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [1/3] Initial pass at integrating DescendantHandlingMode.ALL_DESCENDANTS patch
Date Thu, 26 Dec 2013 16:00:58 GMT
Updated Branches:
  refs/heads/CURATOR-33 [created] 03bc3bee0


http://git-wip-us.apache.org/repos/asf/curator/blob/03bc3bee/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index d66f7f3..87eb525 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -1,20 +1,12 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
  */
 
 package org.apache.curator.framework.recipes.cache;
@@ -56,13 +48,17 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * <p>A utility that attempts to keep all data from all children of a ZK path locally cached. This class
- * will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can
- * register a listener that will get notified when changes occur.</p>
+ * <p>
+ * A utility that attempts to keep all data from all children of a ZK path locally cached. This
+ * class will watch the ZK path, respond to update/create/delete events, pull down the data, etc.
+ * You can register a listener that will get notified when changes occur.
+ * </p>
  * <p/>
- * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
- * be prepared for false-positives and false-negatives. Additionally, always use the version number
- * when updating data to avoid overwriting another process' change.</p>
+ * <p>
+ * <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must be
+ * prepared for false-positives and false-negatives. Additionally, always use the version number
+ * when updating data to avoid overwriting another process' change.
+ * </p>
  */
 @SuppressWarnings("NullableProblems")
 public class PathChildrenCache implements Closeable
@@ -72,6 +68,7 @@ public class PathChildrenCache implements Closeable
     private final String path;
     private final CloseableExecutorService executorService;
     private final boolean cacheData;
+    private final DescendantHandlingMode descendantHandlingMode;
     private final boolean dataIsCompressed;
     private final EnsurePath ensurePath;
     private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
@@ -91,6 +88,7 @@ public class PathChildrenCache implements Closeable
 
     private final Watcher childrenWatcher = new Watcher()
     {
+
         @Override
         public void process(WatchedEvent event)
         {
@@ -100,6 +98,7 @@ public class PathChildrenCache implements Closeable
 
     private final Watcher dataWatcher = new Watcher()
     {
+
         @Override
         public void process(WatchedEvent event)
         {
@@ -126,6 +125,7 @@ public class PathChildrenCache implements Closeable
 
     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
     {
+
         @Override
         public void stateChanged(CuratorFramework client, ConnectionState newState)
         {
@@ -135,11 +135,29 @@ public class PathChildrenCache implements Closeable
     private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
 
     /**
+     * Method of processing children of the root node. Whether all children under the root node
+     * should be considered or whether only the first level should be considered.
+     */
+    public enum DescendantHandlingMode
+    {
+        /**
+         * Only children of the root node will be considered by the cache. This is default behaviour
+         */
+        DIRECT_DESCENDANTS_ONLY,
+
+        /**
+         * The root nodes children, and its children etc. will all be considered by the cache.
+         */
+        ALL_DESCENDANTS
+    }
+
+    /**
      * @param client the client
      * @param path   path to watch
      * @param mode   caching mode
      * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean)} instead
      */
+    @Deprecated
     @SuppressWarnings("deprecation")
     public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode)
     {
@@ -151,8 +169,10 @@ public class PathChildrenCache implements Closeable
      * @param path          path to watch
      * @param mode          caching mode
      * @param threadFactory factory to use when creating internal threads
-     * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean, ThreadFactory)} instead
+     * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean, ThreadFactory)}
+     * instead
      */
+    @Deprecated
     @SuppressWarnings("deprecation")
     public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory)
     {
@@ -170,6 +190,18 @@ public class PathChildrenCache implements Closeable
     }
 
     /**
+     * @param client                 the client
+     * @param path                   path to watch
+     * @param cacheData              if true, node contents are cached in addition to the stat
+     * @param descendantHandlingMode Mode defining if only descendants of the root node will be considered or whether
+     *                               the entire tree will be.
+     */
+    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, DescendantHandlingMode descendantHandlingMode)
+    {
+        this(client, path, cacheData, descendantHandlingMode, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
+    }
+
+    /**
      * @param client        the client
      * @param path          path to watch
      * @param cacheData     if true, node contents are cached in addition to the stat
@@ -213,9 +245,24 @@ public class PathChildrenCache implements Closeable
      */
     public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)
     {
+        this(client, path, cacheData, DescendantHandlingMode.DIRECT_DESCENDANTS_ONLY, dataIsCompressed, executorService);
+    }
+
+    /**
+     * @param client                 the client
+     * @param path                   path to watch
+     * @param cacheData              if true, node contents are cached in addition to the stat
+     * @param descendantHandlingMode Mode defining if only descendants of the root node will be considered or whether
+     *                               the entire tree will be.
+     * @param dataIsCompressed       if true, data in the path is compressed
+     * @param executorService        Closeable ExecutorService to use for the PathChildrenCache's background thread
+     */
+    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, DescendantHandlingMode descendantHandlingMode, boolean dataIsCompressed, final CloseableExecutorService executorService)
+    {
         this.client = client;
         this.path = path;
         this.cacheData = cacheData;
+        this.descendantHandlingMode = descendantHandlingMode;
         this.dataIsCompressed = dataIsCompressed;
         this.executorService = executorService;
         ensurePath = client.newNamespaceAwareEnsurePath(path);
@@ -234,12 +281,13 @@ public class PathChildrenCache implements Closeable
     /**
      * Same as {@link #start()} but gives the option of doing an initial build
      *
-     * @param buildInitial if true, {@link #rebuild()} will be called before this method
-     *                     returns in order to get an initial view of the node; otherwise,
-     *                     the cache will be initialized asynchronously
+     * @param buildInitial if true, {@link #rebuild()} will be called before this method returns in order to
+     *                     get an initial view of the node; otherwise, the cache will be initialized
+     *                     asynchronously
      * @throws Exception errors
      * @deprecated use {@link #start(StartMode)}
      */
+    @Deprecated
     public void start(boolean buildInitial) throws Exception
     {
         start(buildInitial ? StartMode.BUILD_INITIAL_CACHE : StartMode.NORMAL);
@@ -251,14 +299,14 @@ public class PathChildrenCache implements Closeable
     public enum StartMode
     {
         /**
-         * cache will _not_ be primed. i.e. it will start empty and you will receive
-         * events for all nodes added, etc.
+         * cache will _not_ be primed. i.e. it will start empty and you will receive events for all
+         * nodes added, etc.
          */
         NORMAL,
 
         /**
-         * {@link PathChildrenCache#rebuild()} will be called before this method returns in
-         * order to get an initial view of the node.
+         * {@link PathChildrenCache#rebuild()} will be called before this method returns in order to
+         * get an initial view of the node.
          */
         BUILD_INITIAL_CACHE,
 
@@ -284,30 +332,30 @@ public class PathChildrenCache implements Closeable
 
         switch ( mode )
         {
-            case NORMAL:
-            {
-                offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
-                break;
-            }
+        case NORMAL:
+        {
+            offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
+            break;
+        }
 
-            case BUILD_INITIAL_CACHE:
-            {
-                rebuild();
-                break;
-            }
+        case BUILD_INITIAL_CACHE:
+        {
+            rebuild();
+            break;
+        }
 
-            case POST_INITIALIZED_EVENT:
-            {
-                initialSet.set(Maps.<String, ChildData>newConcurrentMap());
-                offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED));
-                break;
-            }
+        case POST_INITIALIZED_EVENT:
+        {
+            initialSet.set(Maps.<String, ChildData>newConcurrentMap());
+            offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED));
+            break;
+        }
         }
     }
 
     /**
-     * NOTE: this is a BLOCKING method. Completely rebuild the internal cache by querying
-     * for all needed data WITHOUT generating any events to send to listeners.
+     * NOTE: this is a BLOCKING method. Completely rebuild the internal cache by querying for all
+     * needed data WITHOUT generating any events to send to listeners.
      *
      * @throws Exception errors
      */
@@ -381,8 +429,8 @@ public class PathChildrenCache implements Closeable
     }
 
     /**
-     * Return the current data. There are no guarantees of accuracy. This is
-     * merely the most recent view of the data. The data is returned in sorted order.
+     * Return the current data. There are no guarantees of accuracy. This is merely the most recent
+     * view of the data. The data is returned in sorted order.
      *
      * @return list of children and data
      */
@@ -393,8 +441,8 @@ public class PathChildrenCache implements Closeable
 
     /**
      * Return the current data for the given path. There are no guarantees of accuracy. This is
-     * merely the most recent view of the data. If there is no child with that path, <code>null</code>
-     * is returned.
+     * merely the most recent view of the data. If there is no child with that path,
+     * <code>null</code> is returned.
      *
      * @param fullPath full path to the node to check
      * @return data or null
@@ -405,8 +453,8 @@ public class PathChildrenCache implements Closeable
     }
 
     /**
-     * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
-     * calls to {@link ChildData#getData()} for this node will return <code>null</code>.
+     * As a memory optimization, you can clear the cached data bytes for a node. Subsequent calls to
+     * {@link ChildData#getData()} for this node will return <code>null</code>.
      *
      * @param fullPath the path of the node to clear
      */
@@ -416,8 +464,8 @@ public class PathChildrenCache implements Closeable
     }
 
     /**
-     * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
-     * calls to {@link ChildData#getData()} for this node will return <code>null</code>.
+     * As a memory optimization, you can clear the cached data bytes for a node. Subsequent calls to
+     * {@link ChildData#getData()} for this node will return <code>null</code>.
      *
      * @param fullPath  the path of the node to clear
      * @param ifVersion if non-negative, only clear the data if the data's version matches this version
@@ -449,8 +497,8 @@ public class PathChildrenCache implements Closeable
     }
 
     /**
-     * Clears the current data without beginning a new query and without generating any events
-     * for listeners.
+     * Clears the current data without beginning a new query and without generating any events for
+     * listeners.
      */
     public void clear()
     {
@@ -464,49 +512,37 @@ public class PathChildrenCache implements Closeable
         POST_INITIALIZED
     }
 
-    void refresh(final RefreshMode mode) throws Exception
+    void refresh(RefreshMode mode) throws Exception
     {
-        ensurePath.ensure(client.getZookeeperClient());
-
-        final BackgroundCallback callback = new BackgroundCallback()
-        {
-            @Override
-            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
-            {
-                processChildren(event.getChildren(), mode);
-            }
-        };
-
-        client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);
+        internalRefresh(mode, path);
     }
 
     void callListeners(final PathChildrenCacheEvent event)
     {
-        listeners.forEach
-            (
-                new Function<PathChildrenCacheListener, Void>()
+        listeners.forEach(new Function<PathChildrenCacheListener, Void>()
+        {
+
+            @Override
+            public Void apply(PathChildrenCacheListener listener)
+            {
+                try
                 {
-                    @Override
-                    public Void apply(PathChildrenCacheListener listener)
-                    {
-                        try
-                        {
-                            listener.childEvent(client, event);
-                        }
-                        catch ( Exception e )
-                        {
-                            handleException(e);
-                        }
-                        return null;
-                    }
+                    listener.childEvent(client, event);
                 }
-            );
+                catch ( Exception e )
+                {
+                    handleException(e);
+                }
+                return null;
+            }
+        });
     }
 
     void getDataAndStat(final String fullPath) throws Exception
     {
         BackgroundCallback existsCallback = new BackgroundCallback()
         {
+
             @Override
             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
             {
@@ -516,6 +552,7 @@ public class PathChildrenCache implements Closeable
 
         BackgroundCallback getDataCallback = new BackgroundCallback()
         {
+
             @Override
             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
             {
@@ -567,6 +604,22 @@ public class PathChildrenCache implements Closeable
         }
     }
 
+    private void internalRefresh(final RefreshMode mode, String nodePath) throws Exception
+    {
+        ensurePath.ensure(client.getZookeeperClient());
+
+        final BackgroundCallback callback = new BackgroundCallback()
+        {
+            @Override
+            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+            {
+                processChildren(event.getPath(), event.getChildren(), mode);
+            }
+        };
+
+        client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(nodePath);
+    }
+
     private void internalRebuildNode(String fullPath) throws Exception
     {
         if ( cacheData )
@@ -630,45 +683,67 @@ public class PathChildrenCache implements Closeable
         }
     }
 
-    private void processChildren(List<String> children, RefreshMode mode) throws Exception
+    private void processChildren(final String root, List<String> children, RefreshMode mode) throws Exception
     {
-        List<String> fullPaths = Lists.newArrayList(Lists.transform
-            (
-                children,
-                new Function<String, String>()
-                {
-                    @Override
-                    public String apply(String child)
-                    {
-                        return ZKPaths.makePath(path, child);
-                    }
-                }
-            ));
+        List<String> fullPaths = Lists.newArrayList(Lists.transform(children, new Function<String, String>()
+        {
+            @Override
+            public String apply(String child)
+            {
+                return ZKPaths.makePath(root, child);
+            }
+        }));
         Set<String> removedNodes = Sets.newHashSet(currentData.keySet());
         removedNodes.removeAll(fullPaths);
+        removedNodes.remove(root);
+
+        Set<String> nodesToKeep = Sets.newHashSet();
+        for ( String removedNode : removedNodes )
+        {
+            // Don't remove the current node being processed, or any of its parent nodes
+            if ( removedNode.length() <= root.length() && root.startsWith(removedNode) )
+            {
+                nodesToKeep.add(removedNode);
+            }
+        }
 
         for ( String fullPath : removedNodes )
         {
-            remove(fullPath);
+            if ( !nodesToKeep.contains(fullPath) )
+            {
+                remove(fullPath);
+            }
         }
 
         for ( String name : children )
         {
-            String fullPath = ZKPaths.makePath(path, name);
+            String fullPath = ZKPaths.makePath(root, name);
+
+            boolean exists = currentData.containsKey(fullPath);
 
-            if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) )
+            if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !exists )
             {
                 getDataAndStat(fullPath);
             }
 
             updateInitialSet(name, NULL_CHILD_DATA);
+
+            if ( !exists && (descendantHandlingMode == DescendantHandlingMode.ALL_DESCENDANTS) )
+            {
+                if ( !removedNodes.contains(fullPath) )
+                {
+                    internalRefresh(mode, fullPath);
+                }
+            }
         }
         maybeOfferInitializedEvent(initialSet.get());
     }
 
     private void applyNewData(String fullPath, int resultCode, Stat stat, byte[] bytes)
     {
-        if ( resultCode == KeeperException.Code.OK.intValue() ) // otherwise - node must have dropped or something - we should be getting another event
+        if ( resultCode == KeeperException.Code.OK.intValue() ) // otherwise - node must have dropped
+        // or something - we should be getting
+        // another event
         {
             ChildData data = new ChildData(fullPath, stat, bytes);
             ChildData previousData = currentData.put(fullPath, data);
@@ -700,11 +775,13 @@ public class PathChildrenCache implements Closeable
         {
             // all initial children have been processed - send initialized message
 
-            if ( initialSet.getAndSet(null) != null )   // avoid edge case - don't send more than 1 INITIALIZED event
+            if ( initialSet.getAndSet(null) != null ) // avoid edge case - don't send more than 1
+            // INITIALIZED event
             {
                 final List<ChildData> children = ImmutableList.copyOf(localInitialSet.values());
                 PathChildrenCacheEvent event = new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED, null)
                 {
+
                     @Override
                     public List<ChildData> getInitialData()
                     {
@@ -723,18 +800,15 @@ public class PathChildrenCache implements Closeable
             return false;
         }
 
-        Map<String, ChildData> uninitializedChildren = Maps.filterValues
-            (
-                localInitialSet,
-                new Predicate<ChildData>()
-                {
-                    @Override
-                    public boolean apply(ChildData input)
-                    {
-                        return (input == NULL_CHILD_DATA);  // check against ref intentional
-                    }
-                }
-            );
+        Map<String, ChildData> uninitializedChildren = Maps.filterValues(localInitialSet, new Predicate<ChildData>()
+        {
+
+            @Override
+            public boolean apply(ChildData input)
+            {
+                return (input == NULL_CHILD_DATA); // check against ref intentional
+            }
+        });
         return (uninitializedChildren.size() != 0);
     }
 
@@ -742,38 +816,37 @@ public class PathChildrenCache implements Closeable
     {
         if ( operationsQuantizer.add(operation) )
         {
-            submitToExecutor
-            (
-                new Runnable()
+            submitToExecutor(new Runnable()
+            {
+
+                @Override
+                public void run()
                 {
-                    @Override
-                    public void run()
+                    try
+                    {
+                        operationsQuantizer.remove(operation);
+                        operation.invoke();
+                    }
+                    catch ( Exception e )
                     {
-                        try
-                        {
-                            operationsQuantizer.remove(operation);
-                            operation.invoke();
-                        }
-                        catch ( Exception e )
-                        {
-                            handleException(e);
-                        }
+                        handleException(e);
                     }
                 }
-            );
+            });
         }
     }
 
     /**
      * Submits a runnable to the executor.
      * <p/>
-     * This method is synchronized because it has to check state about whether this instance is still open.  Without this check
-     * there is a race condition with the dataWatchers that get set.  Even after this object is closed() it can still be
-     * called by those watchers, because the close() method cannot actually disable the watcher.
+     * This method is synchronized because it has to check state about whether this instance is
+     * still open. Without this check there is a race condition with the dataWatchers that get set.
+     * Even after this object is closed() it can still be called by those watchers, because the
+     * close() method cannot actually disable the watcher.
      * <p/>
-     * The synchronization overhead should be minimal if non-existant as this is generally only called from the
-     * ZK client thread and will only contend if close() is called in parallel with an update, and that's the exact state
-     * we want to protect from.
+     * The synchronization overhead should be minimal if non-existant as this is generally only
+     * called from the ZK client thread and will only contend if close() is called in parallel with
+     * an update, and that's the exact state we want to protect from.
      *
      * @param command The runnable to run
      */

http://git-wip-us.apache.org/repos/asf/curator/blob/03bc3bee/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 4b117fb..4903b06 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -1,21 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
  */
+
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.collect.Lists;
@@ -24,6 +17,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache.DescendantHandlingMode;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.KillSession;
 import org.apache.curator.test.Timing;
@@ -39,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class TestPathChildrenCache extends BaseClassForTests
 {
+
     @Test
     public void testPostInitializedForEmpty() throws Exception
     {
@@ -51,20 +46,19 @@ public class TestPathChildrenCache extends BaseClassForTests
 
             final CountDownLatch latch = new CountDownLatch(1);
             cache = new PathChildrenCache(client, "/test", true);
-            cache.getListenable().addListener
-            (
-                new PathChildrenCacheListener()
+            cache.getListenable().addListener(new PathChildrenCacheListener()
+            {
+
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                    throws Exception
                 {
-                    @Override
-                    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+                    if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED )
                     {
-                        if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED )
-                        {
-                            latch.countDown();
-                        }
+                        latch.countDown();
                     }
                 }
-            );
+            });
             cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
             Assert.assertTrue(timing.awaitLatch(latch));
         }
@@ -79,7 +73,8 @@ public class TestPathChildrenCache extends BaseClassForTests
     public void testAsyncInitialPopulation() throws Exception
     {
         PathChildrenCache cache = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+            server.getConnectString(), new RetryOneTime(1));
         try
         {
             client.start();
@@ -89,17 +84,16 @@ public class TestPathChildrenCache extends BaseClassForTests
 
             final BlockingQueue<PathChildrenCacheEvent> events = new LinkedBlockingQueue<PathChildrenCacheEvent>();
             cache = new PathChildrenCache(client, "/test", true);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
-                    {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                        {
-                            events.offer(event);
-                        }
-                    }
-                );
+            cache.getListenable().addListener(new PathChildrenCacheListener()
+            {
+
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                    throws Exception
+                {
+                    events.offer(event);
+                }
+            });
             cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
 
             PathChildrenCacheEvent event = events.poll(10, TimeUnit.SECONDS);
@@ -121,7 +115,9 @@ public class TestPathChildrenCache extends BaseClassForTests
     {
         Timing timing = new Timing();
         PathChildrenCache cache = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+            server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(
+            1));
         try
         {
             client.start();
@@ -131,24 +127,23 @@ public class TestPathChildrenCache extends BaseClassForTests
 
             final CountDownLatch addedLatch = new CountDownLatch(3);
             final CountDownLatch initLatch = new CountDownLatch(1);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
+            cache.getListenable().addListener(new PathChildrenCacheListener()
+            {
+
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                    throws Exception
+                {
+                    if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
                     {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                        {
-                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
-                            {
-                                addedLatch.countDown();
-                            }
-                            else if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED )
-                            {
-                                initLatch.countDown();
-                            }
-                        }
+                        addedLatch.countDown();
                     }
-                );
+                    else if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED )
+                    {
+                        initLatch.countDown();
+                    }
+                }
+            });
 
             client.create().forPath("/test/1", "1".getBytes());
             client.create().forPath("/test/2", "2".getBytes());
@@ -175,7 +170,9 @@ public class TestPathChildrenCache extends BaseClassForTests
     {
         Timing timing = new Timing();
 
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+            server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(
+            1));
         client.start();
         try
         {
@@ -183,24 +180,23 @@ public class TestPathChildrenCache extends BaseClassForTests
             final CountDownLatch addedLatch = new CountDownLatch(1);
             client.create().creatingParentsIfNeeded().forPath("/test");
             PathChildrenCache cache = new PathChildrenCache(client, "/test", false);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
+            cache.getListenable().addListener(new PathChildrenCacheListener()
+            {
+
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                    throws Exception
+                {
+                    if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED )
                     {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                        {
-                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED )
-                            {
-                                updatedLatch.countDown();
-                            }
-                            else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
-                            {
-                                addedLatch.countDown();
-                            }
-                        }
+                        updatedLatch.countDown();
                     }
-                );
+                    else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+                    {
+                        addedLatch.countDown();
+                    }
+                }
+            });
             cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
 
             client.create().forPath("/test/foo", "first".getBytes());
@@ -220,7 +216,8 @@ public class TestPathChildrenCache extends BaseClassForTests
     {
         Timing timing = new Timing();
 
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+            server.getConnectString(), new RetryOneTime(1));
         client.start();
         try
         {
@@ -246,7 +243,8 @@ public class TestPathChildrenCache extends BaseClassForTests
     @Test
     public void testDeleteThenCreate() throws Exception
     {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+            server.getConnectString(), new RetryOneTime(1));
         client.start();
         try
         {
@@ -254,48 +252,45 @@ public class TestPathChildrenCache extends BaseClassForTests
             client.create().forPath("/test/foo", "one".getBytes());
 
             final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-            client.getUnhandledErrorListenable().addListener
-                (
-                    new UnhandledErrorListener()
-                    {
-                        @Override
-                        public void unhandledError(String message, Throwable e)
-                        {
-                            error.set(e);
-                        }
-                    }
-                );
+            client.getUnhandledErrorListenable().addListener(new UnhandledErrorListener()
+            {
+
+                @Override
+                public void unhandledError(String message, Throwable e)
+                {
+                    error.set(e);
+                }
+            });
 
             final CountDownLatch removedLatch = new CountDownLatch(1);
             final CountDownLatch postRemovedLatch = new CountDownLatch(1);
             final CountDownLatch dataLatch = new CountDownLatch(1);
             PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
+            cache.getListenable().addListener(new PathChildrenCacheListener()
+            {
+
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                    throws Exception
+                {
+                    if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
                     {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+                        removedLatch.countDown();
+                        Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS));
+                    }
+                    else
+                    {
+                        try
+                        {
+                            Assert.assertEquals(event.getData().getData(), "two".getBytes());
+                        }
+                        finally
                         {
-                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
-                            {
-                                removedLatch.countDown();
-                                Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS));
-                            }
-                            else
-                            {
-                                try
-                                {
-                                    Assert.assertEquals(event.getData().getData(), "two".getBytes());
-                                }
-                                finally
-                                {
-                                    dataLatch.countDown();
-                                }
-                            }
+                            dataLatch.countDown();
                         }
                     }
-                );
+                }
+            });
             cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
 
             client.delete().forPath("/test/foo");
@@ -321,7 +316,8 @@ public class TestPathChildrenCache extends BaseClassForTests
     @Test
     public void testRebuildAgainstOtherProcesses() throws Exception
     {
-        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(
+            server.getConnectString(), new RetryOneTime(1));
         client.start();
         try
         {
@@ -332,69 +328,66 @@ public class TestPathChildrenCache extends BaseClassForTests
 
             final CountDownLatch addedLatch = new CountDownLatch(2);
             final PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
+            cache.getListenable().addListener(new PathChildrenCacheListener()
+            {
+
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                    throws Exception
+                {
+                    if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
                     {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+                        if ( event.getData().getPath().equals("/test/test") )
                         {
-                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
-                            {
-                                if ( event.getData().getPath().equals("/test/test") )
-                                {
-                                    addedLatch.countDown();
-                                }
-                            }
-                            else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED )
-                            {
-                                if ( event.getData().getPath().equals("/test/snafu") )
-                                {
-                                    addedLatch.countDown();
-                                }
-                            }
+                            addedLatch.countDown();
                         }
                     }
-                );
+                    else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED )
+                    {
+                        if ( event.getData().getPath().equals("/test/snafu") )
+                        {
+                            addedLatch.countDown();
+                        }
+                    }
+                }
+            });
             cache.rebuildTestExchanger = new Exchanger<Object>();
             ExecutorService service = Executors.newSingleThreadExecutor();
             final AtomicReference<String> deletedPath = new AtomicReference<String>();
-            Future<Object> future = service.submit
-                (
-                    new Callable<Object>()
-                    {
-                        @Override
-                        public Object call() throws Exception
-                        {
-                            cache.rebuildTestExchanger.exchange(new Object());
-
-                            // simulate another process adding a node while we're rebuilding
-                            client.create().forPath("/test/test");
+            Future<Object> future = service.submit(new Callable<Object>()
+            {
 
-                            List<ChildData> currentData = cache.getCurrentData();
-                            Assert.assertTrue(currentData.size() > 0);
+                @Override
+                public Object call() throws Exception
+                {
+                    cache.rebuildTestExchanger.exchange(new Object());
 
-                            // simulate another process removing a node while we're rebuilding
-                            client.delete().forPath(currentData.get(0).getPath());
-                            deletedPath.set(currentData.get(0).getPath());
+                    // simulate another process adding a node while we're rebuilding
+                    client.create().forPath("/test/test");
 
-                            cache.rebuildTestExchanger.exchange(new Object());
+                    List<ChildData> currentData = cache.getCurrentData();
+                    Assert.assertTrue(currentData.size() > 0);
 
-                            ChildData childData = null;
-                            while ( childData == null )
-                            {
-                                childData = cache.getCurrentData("/test/snafu");
-                                Thread.sleep(1000);
-                            }
-                            Assert.assertEquals(childData.getData(), "original".getBytes());
-                            client.setData().forPath("/test/snafu", "grilled".getBytes());
+                    // simulate another process removing a node while we're rebuilding
+                    client.delete().forPath(currentData.get(0).getPath());
+                    deletedPath.set(currentData.get(0).getPath());
 
-                            cache.rebuildTestExchanger.exchange(new Object());
+                    cache.rebuildTestExchanger.exchange(new Object());
 
-                            return null;
-                        }
+                    ChildData childData = null;
+                    while ( childData == null )
+                    {
+                        childData = cache.getCurrentData("/test/snafu");
+                        Thread.sleep(1000);
                     }
-                );
+                    Assert.assertEquals(childData.getData(), "original".getBytes());
+                    client.setData().forPath("/test/snafu", "grilled".getBytes());
+
+                    cache.rebuildTestExchanger.exchange(new Object());
+
+                    return null;
+                }
+            });
             cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
             future.get();
 
@@ -415,7 +408,8 @@ public class TestPathChildrenCache extends BaseClassForTests
     @Test
     public void testIssue27() throws Exception
     {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+            server.getConnectString(), new RetryOneTime(1));
         client.start();
         try
         {
@@ -429,18 +423,17 @@ public class TestPathChildrenCache extends BaseClassForTests
             final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList();
             final Semaphore semaphore = new Semaphore(0);
             PathChildrenCache cache = new PathChildrenCache(client, "/base", true);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
-                    {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                        {
-                            events.add(event.getType());
-                            semaphore.release();
-                        }
-                    }
-                );
+            cache.getListenable().addListener(new PathChildrenCacheListener()
+            {
+
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                    throws Exception
+                {
+                    events.add(event.getType());
+                    semaphore.release();
+                }
+            });
             cache.start();
 
             Assert.assertTrue(semaphore.tryAcquire(3, 10, TimeUnit.SECONDS));
@@ -451,14 +444,12 @@ public class TestPathChildrenCache extends BaseClassForTests
             client.create().forPath("/base/a");
             Assert.assertTrue(semaphore.tryAcquire(1, 10, TimeUnit.SECONDS));
 
-            List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList
-                (
-                    PathChildrenCacheEvent.Type.CHILD_ADDED,
-                    PathChildrenCacheEvent.Type.CHILD_ADDED,
-                    PathChildrenCacheEvent.Type.CHILD_ADDED,
-                    PathChildrenCacheEvent.Type.CHILD_REMOVED,
-                    PathChildrenCacheEvent.Type.CHILD_ADDED
-                );
+            List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList(
+                PathChildrenCacheEvent.Type.CHILD_ADDED,
+                PathChildrenCacheEvent.Type.CHILD_ADDED,
+                PathChildrenCacheEvent.Type.CHILD_ADDED,
+                PathChildrenCacheEvent.Type.CHILD_REMOVED,
+                PathChildrenCacheEvent.Type.CHILD_ADDED);
             Assert.assertEquals(expected, events);
         }
         finally
@@ -471,7 +462,8 @@ public class TestPathChildrenCache extends BaseClassForTests
     @Test
     public void testIssue27Alt() throws Exception
     {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+            server.getConnectString(), new RetryOneTime(1));
         client.start();
         try
         {
@@ -485,18 +477,17 @@ public class TestPathChildrenCache extends BaseClassForTests
             final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList();
             final Semaphore semaphore = new Semaphore(0);
             PathChildrenCache cache = new PathChildrenCache(client, "/base", true);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
-                    {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                        {
-                            events.add(event.getType());
-                            semaphore.release();
-                        }
-                    }
-                );
+            cache.getListenable().addListener(new PathChildrenCacheListener()
+            {
+
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                    throws Exception
+                {
+                    events.add(event.getType());
+                    semaphore.release();
+                }
+            });
             cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
 
             client.delete().forPath("/base/a");
@@ -505,11 +496,9 @@ public class TestPathChildrenCache extends BaseClassForTests
             client.create().forPath("/base/a");
             Assert.assertTrue(semaphore.tryAcquire(1, 10, TimeUnit.SECONDS));
 
-            List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList
-                (
-                    PathChildrenCacheEvent.Type.CHILD_REMOVED,
-                    PathChildrenCacheEvent.Type.CHILD_ADDED
-                );
+            List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList(
+                PathChildrenCacheEvent.Type.CHILD_REMOVED,
+                PathChildrenCacheEvent.Type.CHILD_ADDED);
             Assert.assertEquals(expected, events);
         }
         finally
@@ -525,7 +514,9 @@ public class TestPathChildrenCache extends BaseClassForTests
         CuratorFramework client = null;
         try
         {
-            client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+            client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(), timing.connection(),
+                new RetryOneTime(1));
             client.start();
             client.create().forPath("/test");
 
@@ -536,32 +527,31 @@ public class TestPathChildrenCache extends BaseClassForTests
             final CountDownLatch lostLatch = new CountDownLatch(1);
             final CountDownLatch reconnectedLatch = new CountDownLatch(1);
             final CountDownLatch removedLatch = new CountDownLatch(1);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
+            cache.getListenable().addListener(new PathChildrenCacheListener()
+            {
+
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                    throws Exception
+                {
+                    if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
                     {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                        {
-                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
-                            {
-                                childAddedLatch.countDown();
-                            }
-                            else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST )
-                            {
-                                lostLatch.countDown();
-                            }
-                            else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED )
-                            {
-                                reconnectedLatch.countDown();
-                            }
-                            else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
-                            {
-                                removedLatch.countDown();
-                            }
-                        }
+                        childAddedLatch.countDown();
+                    }
+                    else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST )
+                    {
+                        lostLatch.countDown();
+                    }
+                    else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED )
+                    {
+                        reconnectedLatch.countDown();
+                    }
+                    else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
+                    {
+                        removedLatch.countDown();
                     }
-                );
+                }
+            });
 
             client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
             Assert.assertTrue(timing.awaitLatch(childAddedLatch));
@@ -580,13 +570,16 @@ public class TestPathChildrenCache extends BaseClassForTests
     @Test
     public void testModes() throws Exception
     {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+            server.getConnectString(), new RetryOneTime(1));
         client.start();
         try
         {
             client.create().forPath("/test");
 
-            for ( boolean cacheData : new boolean[]{false, true} )
+            for ( boolean cacheData : new boolean[]{
+                false, true
+            } )
             {
                 internalTestMode(client, cacheData);
 
@@ -604,7 +597,8 @@ public class TestPathChildrenCache extends BaseClassForTests
     public void testRebuildNode() throws Exception
     {
         PathChildrenCache cache = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+            server.getConnectString(), new RetryOneTime(1));
         client.start();
         try
         {
@@ -615,6 +609,7 @@ public class TestPathChildrenCache extends BaseClassForTests
             final Semaphore semaphore = new Semaphore(1);
             cache = new PathChildrenCache(client, "/test", true)
             {
+
                 @Override
                 void getDataAndStat(String fullPath) throws Exception
                 {
@@ -648,20 +643,19 @@ public class TestPathChildrenCache extends BaseClassForTests
         PathChildrenCache cache = new PathChildrenCache(client, "/test", cacheData);
 
         final CountDownLatch latch = new CountDownLatch(2);
-        cache.getListenable().addListener
-            (
-                new PathChildrenCacheListener()
+        cache.getListenable().addListener(new PathChildrenCacheListener()
+        {
+
+            @Override
+            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                throws Exception
+            {
+                if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
                 {
-                    @Override
-                    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                    {
-                        if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
-                        {
-                            latch.countDown();
-                        }
-                    }
+                    latch.countDown();
                 }
-            );
+            }
+        });
         cache.start();
 
         client.create().forPath("/test/one", "one".getBytes());
@@ -688,7 +682,8 @@ public class TestPathChildrenCache extends BaseClassForTests
     @Test
     public void testBasics() throws Exception
     {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+            server.getConnectString(), new RetryOneTime(1));
         client.start();
         try
         {
@@ -696,31 +691,33 @@ public class TestPathChildrenCache extends BaseClassForTests
 
             final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
             PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
+            cache.getListenable().addListener(new PathChildrenCacheListener()
+            {
+
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                    throws Exception
+                {
+                    if ( event.getData().getPath().equals("/test/one") )
                     {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                        {
-                            if ( event.getData().getPath().equals("/test/one") )
-                            {
-                                events.offer(event.getType());
-                            }
-                        }
+                        events.offer(event.getType());
                     }
-                );
+                }
+            });
             cache.start();
 
             client.create().forPath("/test/one", "hey there".getBytes());
-            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+            Assert.assertEquals(
+                events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
 
             client.setData().forPath("/test/one", "sup!".getBytes());
-            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+            Assert.assertEquals(
+                events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
             Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
 
             client.delete().forPath("/test/one");
-            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+            Assert.assertEquals(
+                events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
 
             cache.close();
         }
@@ -733,7 +730,8 @@ public class TestPathChildrenCache extends BaseClassForTests
     @Test
     public void testBasicsOnTwoCachesWithSameExecutor() throws Exception
     {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+            server.getConnectString(), new RetryOneTime(1));
         client.start();
         try
         {
@@ -742,52 +740,57 @@ public class TestPathChildrenCache extends BaseClassForTests
             final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
             final ExecutorService exec = Executors.newSingleThreadExecutor();
             PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
+            cache.getListenable().addListener(new PathChildrenCacheListener()
+            {
+
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                    throws Exception
+                {
+                    if ( event.getData().getPath().equals("/test/one") )
                     {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                        {
-                            if ( event.getData().getPath().equals("/test/one") )
-                            {
-                                events.offer(event.getType());
-                            }
-                        }
+                        events.offer(event.getType());
                     }
-                );
+                }
+            });
             cache.start();
 
             final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
             PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec);
-            cache2.getListenable().addListener(
-                    new PathChildrenCacheListener() {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
-                                throws Exception
-                        {
-                            if ( event.getData().getPath().equals("/test/one") )
-                            {
-                                events2.offer(event.getType());
-                            }
-                        }
+            cache2.getListenable().addListener(new PathChildrenCacheListener()
+            {
+
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                    throws Exception
+                {
+                    if ( event.getData().getPath().equals("/test/one") )
+                    {
+                        events2.offer(event.getType());
                     }
-            );
+                }
+            });
             cache2.start();
 
             client.create().forPath("/test/one", "hey there".getBytes());
-            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
-            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+            Assert.assertEquals(
+                events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+            Assert.assertEquals(
+                events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
 
             client.setData().forPath("/test/one", "sup!".getBytes());
-            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
-            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+            Assert.assertEquals(
+                events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+            Assert.assertEquals(
+                events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
             Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
             Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!");
 
             client.delete().forPath("/test/one");
-            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
-            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+            Assert.assertEquals(
+                events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+            Assert.assertEquals(
+                events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
 
             cache.close();
             cache2.close();
@@ -799,23 +802,25 @@ public class TestPathChildrenCache extends BaseClassForTests
     }
 
     @Test
-    public void testDeleteNodeAfterCloseDoesntCallExecutor()
-            throws Exception
+    public void testDeleteNodeAfterCloseDoesntCallExecutor() throws Exception
     {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+            server.getConnectString(), new RetryOneTime(1));
         client.start();
         try
         {
             client.create().forPath("/test");
 
-            final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
+            final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(
+                Executors.newSingleThreadExecutor());
             PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec);
 
             cache.start();
             client.create().forPath("/test/one", "hey there".getBytes());
 
             cache.rebuild();
-            Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
+            Assert.assertEquals(
+                new String(cache.getCurrentData("/test/one").getData()), "hey there");
             Assert.assertTrue(exec.isExecuteCalled());
 
             exec.setExecuteCalled(false);
@@ -826,14 +831,165 @@ public class TestPathChildrenCache extends BaseClassForTests
             Thread.sleep(100);
             Assert.assertFalse(exec.isExecuteCalled());
         }
-        finally {
+        finally
+        {
             client.close();
         }
 
     }
 
+    /**
+     * Test the case where there is a whole tree but we're only interested in updates of direct
+     * descendants
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testCacheDirectDesendantsOnly() throws Exception
+    {
+        Timing timing = new Timing();
+        PathChildrenCache cache = null;
+
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+            server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(
+            1));
+
+        try
+        {
+            client.start();
+
+            final CountDownLatch initLatch = new CountDownLatch(1);
+
+            final CountDownLatch createLatch = new CountDownLatch(1);
+            final CountDownLatch updateLatch = new CountDownLatch(1);
+            final CountDownLatch deleteLatch = new CountDownLatch(1);
+
+            final String root = "/cachetest";
+
+            cache = new PathChildrenCache(
+                client, root, true, DescendantHandlingMode.DIRECT_DESCENDANTS_ONLY);
+            cache.getListenable().addListener(new PathChildrenCacheListener()
+            {
+
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                    throws Exception
+                {
+                    switch ( event.getType() )
+                    {
+                    case INITIALIZED:
+                        Assert.assertTrue(initLatch.getCount() >= 1);
+                        initLatch.countDown();
+                        break;
+                    case CHILD_ADDED:
+                        Assert.assertTrue(createLatch.getCount() >= 1);
+                        createLatch.countDown();
+                        break;
+                    case CHILD_REMOVED:
+                        Assert.assertTrue(deleteLatch.getCount() >= 1);
+                        deleteLatch.countDown();
+                        break;
+                    case CHILD_UPDATED:
+                        Assert.assertTrue(updateLatch.getCount() >= 1);
+                        updateLatch.countDown();
+                        break;
+                    }
+                }
+            });
+
+            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+            Assert.assertTrue(timing.awaitLatch(initLatch));
+
+            client.create().creatingParentsIfNeeded().forPath(
+                root + "/oneLevel/secondLevel/thirdLevel");
+            Assert.assertTrue(timing.awaitLatch(createLatch));
+            client.setData().forPath(root + "/oneLevel", "NewData1".getBytes());
+            client.setData().forPath(root + "/oneLevel/secondLevel", "NewData2".getBytes());
+            client.setData().forPath(
+                root + "/oneLevel/secondLevel/thirdLevel", "NewData3".getBytes());
+            Assert.assertTrue(timing.awaitLatch(updateLatch));
+            client.delete().deletingChildrenIfNeeded().forPath(root + "/oneLevel");
+            Assert.assertTrue(timing.awaitLatch(deleteLatch));
+        }
+        finally
+        {
+            Closeables.closeQuietly(cache);
+            Closeables.closeQuietly(client);
+        }
+    }
+
+    /**
+     * Test the case where there is a whole tree and we're interested in all updates. See CURATOR-33
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testCacheWholeTree() throws Exception
+    {
+        Timing timing = new Timing();
+        PathChildrenCache cache = null;
+
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            final CountDownLatch initLatch = new CountDownLatch(1);
+
+            final CountDownLatch createLatch = new CountDownLatch(3);
+            final CountDownLatch updateLatch = new CountDownLatch(3);
+            final CountDownLatch deleteLatch = new CountDownLatch(3);
+
+            final String root = "/cachetest";
+
+            cache = new PathChildrenCache(client, root, true, DescendantHandlingMode.ALL_DESCENDANTS);
+            cache.getListenable().addListener(new PathChildrenCacheListener()
+            {
+
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                    throws Exception
+                {
+                    switch ( event.getType() )
+                    {
+                    case INITIALIZED:
+                        initLatch.countDown();
+                        break;
+                    case CHILD_ADDED:
+                        createLatch.countDown();
+                        break;
+                    case CHILD_REMOVED:
+                        deleteLatch.countDown();
+                        break;
+                    case CHILD_UPDATED:
+                        updateLatch.countDown();
+                        break;
+                    }
+                }
+            });
+
+            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+            Assert.assertTrue(timing.awaitLatch(initLatch));
+
+            client.create().creatingParentsIfNeeded().forPath(root + "/oneLevel/secondLevel/thirdLevel");
+            Assert.assertTrue(timing.awaitLatch(createLatch));
+            client.setData().forPath(root + "/oneLevel", "NewData1".getBytes());
+            client.setData().forPath(root + "/oneLevel/secondLevel", "NewData2".getBytes());
+            client.setData().forPath(root + "/oneLevel/secondLevel/thirdLevel", "NewData3".getBytes());
+            Assert.assertTrue(timing.awaitLatch(updateLatch));
+            client.delete().deletingChildrenIfNeeded().forPath(root + "/oneLevel");
+            Assert.assertTrue(timing.awaitLatch(deleteLatch));
+        }
+        finally
+        {
+            Closeables.closeQuietly(cache);
+            Closeables.closeQuietly(client);
+        }
+    }
+
     public static class ExecuteCalledWatchingExecutorService extends DelegatingExecutorService
     {
+
         boolean executeCalled = false;
 
         public ExecuteCalledWatchingExecutorService(ExecutorService delegate)
@@ -861,16 +1017,14 @@ public class TestPathChildrenCache extends BaseClassForTests
 
     public static class DelegatingExecutorService implements ExecutorService
     {
+
         private final ExecutorService delegate;
 
-        public DelegatingExecutorService(
-                ExecutorService delegate
-        )
+        public DelegatingExecutorService(ExecutorService delegate)
         {
             this.delegate = delegate;
         }
 
-
         @Override
         public void shutdown()
         {
@@ -896,8 +1050,7 @@ public class TestPathChildrenCache extends BaseClassForTests
         }
 
         @Override
-        public boolean awaitTermination(long timeout, TimeUnit unit)
-                throws InterruptedException
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
         {
             return delegate.awaitTermination(timeout, unit);
         }
@@ -922,28 +1075,29 @@ public class TestPathChildrenCache extends BaseClassForTests
 
         @Override
         public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-                throws InterruptedException
+            throws InterruptedException
         {
             return delegate.invokeAll(tasks);
         }
 
         @Override
-        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-                throws InterruptedException
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+                                             long timeout,
+                                             TimeUnit unit) throws InterruptedException
         {
             return delegate.invokeAll(tasks, timeout, unit);
         }
 
         @Override
         public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-                throws InterruptedException, ExecutionException
+            throws InterruptedException, ExecutionException
         {
             return delegate.invokeAny(tasks);
         }
 
         @Override
         public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-                throws InterruptedException, ExecutionException, TimeoutException
+            throws InterruptedException, ExecutionException, TimeoutException
         {
             return delegate.invokeAny(tasks, timeout, unit);
         }


Mime
View raw message