curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [2/3] Initial pass at integrating DescendantHandlingMode.ALL_DESCENDANTS patch
Date Thu, 26 Dec 2013 16:00:59 GMT
http://git-wip-us.apache.org/repos/asf/curator/blob/03bc3bee/CURATOR-33.patch
----------------------------------------------------------------------
diff --git a/CURATOR-33.patch b/CURATOR-33.patch
new file mode 100644
index 0000000..dbcb9d4
--- /dev/null
+++ b/CURATOR-33.patch
@@ -0,0 +1,2664 @@
+diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+index d66f7f3..03b169e 100644
+--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
++++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+@@ -1,32 +1,28 @@
+ /**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements.  See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership.  The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License.  You may obtain a copy of the License at
+- *
+- *   http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing,
+- * software distributed under the License is distributed on an
+- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+- * KIND, either express or implied.  See the License for the
+- * specific language governing permissions and limitations
+- * under the License.
++ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
++ * agreements. See the NOTICE file distributed with this work for additional information regarding
++ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance with the License. You may obtain a
++ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
++ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
++ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
++ * for the specific language governing permissions and limitations under the License.
+  */
+ 
+ package org.apache.curator.framework.recipes.cache;
+ 
+-import com.google.common.annotations.VisibleForTesting;
+-import com.google.common.base.Function;
+-import com.google.common.base.Preconditions;
+-import com.google.common.base.Predicate;
+-import com.google.common.collect.ImmutableList;
+-import com.google.common.collect.Lists;
+-import com.google.common.collect.Maps;
+-import com.google.common.collect.Sets;
++import java.io.Closeable;
++import java.io.IOException;
++import java.util.List;
++import java.util.Map;
++import java.util.Set;
++import java.util.concurrent.ConcurrentMap;
++import java.util.concurrent.Exchanger;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++import java.util.concurrent.ThreadFactory;
++import java.util.concurrent.atomic.AtomicReference;
++
+ import org.apache.curator.framework.CuratorFramework;
+ import org.apache.curator.framework.api.BackgroundCallback;
+ import org.apache.curator.framework.api.CuratorEvent;
+@@ -43,79 +39,71 @@ import org.apache.zookeeper.Watcher;
+ import org.apache.zookeeper.data.Stat;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+-import java.io.Closeable;
+-import java.io.IOException;
+-import java.util.List;
+-import java.util.Map;
+-import java.util.Set;
+-import java.util.concurrent.ConcurrentMap;
+-import java.util.concurrent.Exchanger;
+-import java.util.concurrent.ExecutorService;
+-import java.util.concurrent.Executors;
+-import java.util.concurrent.ThreadFactory;
+-import java.util.concurrent.atomic.AtomicReference;
++
++import com.google.common.annotations.VisibleForTesting;
++import com.google.common.base.Function;
++import com.google.common.base.Preconditions;
++import com.google.common.base.Predicate;
++import com.google.common.collect.ImmutableList;
++import com.google.common.collect.Lists;
++import com.google.common.collect.Maps;
++import com.google.common.collect.Sets;
+ 
+ /**
+- * <p>A utility that attempts to keep all data from all children of a ZK path locally cached. This class
+- * will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can
+- * register a listener that will get notified when changes occur.</p>
++ * <p>
++ * A utility that attempts to keep all data from all children of a ZK path locally cached. This
++ * class will watch the ZK path, respond to update/create/delete events, pull down the data, etc.
++ * You can register a listener that will get notified when changes occur.
++ * </p>
+  * <p/>
+- * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
+- * be prepared for false-positives and false-negatives. Additionally, always use the version number
+- * when updating data to avoid overwriting another process' change.</p>
++ * <p>
++ * <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must be
++ * prepared for false-positives and false-negatives. Additionally, always use the version number
++ * when updating data to avoid overwriting another process' change.
++ * </p>
+  */
+ @SuppressWarnings("NullableProblems")
+-public class PathChildrenCache implements Closeable
+-{
++public class PathChildrenCache implements Closeable {
++
+     private final Logger log = LoggerFactory.getLogger(getClass());
+     private final CuratorFramework client;
+     private final String path;
+     private final CloseableExecutorService executorService;
+     private final boolean cacheData;
++    private final DescendantHandlingMode descendantHandlingMode;
+     private final boolean dataIsCompressed;
+     private final EnsurePath ensurePath;
+     private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
+     private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();
+     private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();
+-    private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());
++    private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean> newConcurrentMap());
+     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+ 
+-    private enum State
+-    {
+-        LATENT,
+-        STARTED,
+-        CLOSED
++    private enum State {
++        LATENT, STARTED, CLOSED
+     }
+ 
+     private static final ChildData NULL_CHILD_DATA = new ChildData(null, null, null);
+ 
+-    private final Watcher childrenWatcher = new Watcher()
+-    {
++    private final Watcher childrenWatcher = new Watcher() {
++
+         @Override
+-        public void process(WatchedEvent event)
+-        {
++        public void process(WatchedEvent event) {
+             offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));
+         }
+     };
+ 
+-    private final Watcher dataWatcher = new Watcher()
+-    {
++    private final Watcher dataWatcher = new Watcher() {
++
+         @Override
+-        public void process(WatchedEvent event)
+-        {
+-            try
+-            {
+-                if ( event.getType() == Event.EventType.NodeDeleted )
+-                {
++        public void process(WatchedEvent event) {
++            try {
++                if (event.getType() == Event.EventType.NodeDeleted) {
+                     remove(event.getPath());
+-                }
+-                else if ( event.getType() == Event.EventType.NodeDataChanged )
+-                {
++                } else if (event.getType() == Event.EventType.NodeDataChanged) {
+                     offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));
+                 }
+-            }
+-            catch ( Exception e )
+-            {
++            } catch (Exception e) {
+                 handleException(e);
+             }
+         }
+@@ -124,98 +112,208 @@ public class PathChildrenCache implements Closeable
+     @VisibleForTesting
+     volatile Exchanger<Object> rebuildTestExchanger;
+ 
+-    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+-    {
++    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
++
+         @Override
+-        public void stateChanged(CuratorFramework client, ConnectionState newState)
+-        {
++        public void stateChanged(CuratorFramework client, ConnectionState newState) {
+             handleStateChange(newState);
+         }
+     };
+     private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
+ 
+     /**
+-     * @param client the client
+-     * @param path   path to watch
+-     * @param mode   caching mode
++     * Method of processing children of the root node. Whether all children under the root node
++     * should be considered or whether only the first level should be considered.
++     */
++    public enum DescendantHandlingMode {
++        /**
++         * Only children of the root node will be considered by the cache. This is default behaviour
++         */
++        DIRECT_DESCENDANTS_ONLY,
++
++        /**
++         * The root nodes children, and its children etc. will all be considered by the cache.
++         */
++        ALL_DESCENDANTS
++    }
++
++    /**
++     * @param client
++     *            the client
++     * @param path
++     *            path to watch
++     * @param mode
++     *            caching mode
+      * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean)} instead
+      */
++    @Deprecated
+     @SuppressWarnings("deprecation")
+-    public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode)
+-    {
+-        this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
++    public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode) {
++        this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false,
++             new CloseableExecutorService(
++                     Executors.newSingleThreadExecutor(defaultThreadFactory), true));
+     }
+ 
+     /**
+-     * @param client        the client
+-     * @param path          path to watch
+-     * @param mode          caching mode
+-     * @param threadFactory factory to use when creating internal threads
+-     * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean, ThreadFactory)} instead
++     * @param client
++     *            the client
++     * @param path
++     *            path to watch
++     * @param mode
++     *            caching mode
++     * @param threadFactory
++     *            factory to use when creating internal threads
++     * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean, ThreadFactory)}
++     *             instead
+      */
++    @Deprecated
+     @SuppressWarnings("deprecation")
+-    public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory)
+-    {
+-        this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
++    public PathChildrenCache(CuratorFramework client,
++                             String path,
++                             PathChildrenCacheMode mode,
++                             ThreadFactory threadFactory) {
++        this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false,
++             new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
+     }
+ 
+     /**
+-     * @param client    the client
+-     * @param path      path to watch
+-     * @param cacheData if true, node contents are cached in addition to the stat
++     * @param client
++     *            the client
++     * @param path
++     *            path to watch
++     * @param cacheData
++     *            if true, node contents are cached in addition to the stat
+      */
+-    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
+-    {
+-        this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
++    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData) {
++        this(client, path, cacheData, false, new CloseableExecutorService(
++                Executors.newSingleThreadExecutor(defaultThreadFactory), true));
+     }
+ 
+     /**
+-     * @param client        the client
+-     * @param path          path to watch
+-     * @param cacheData     if true, node contents are cached in addition to the stat
+-     * @param threadFactory factory to use when creating internal threads
++     * @param client
++     *            the client
++     * @param path
++     *            path to watch
++     * @param cacheData
++     *            if true, node contents are cached in addition to the stat
++     * @param descendantHandlingMode
++     *            Mode defining if only descendants of the root node will be considered or whether
++     *            the entire tree will be.
+      */
+-    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)
+-    {
+-        this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
++    public PathChildrenCache(CuratorFramework client,
++                             String path,
++                             boolean cacheData,
++                             DescendantHandlingMode descendantHandlingMode) {
++        this(client, path, cacheData, descendantHandlingMode, false, new CloseableExecutorService(
++                Executors.newSingleThreadExecutor(defaultThreadFactory), true));
+     }
+ 
+     /**
+-     * @param client           the client
+-     * @param path             path to watch
+-     * @param cacheData        if true, node contents are cached in addition to the stat
+-     * @param dataIsCompressed if true, data in the path is compressed
+-     * @param threadFactory    factory to use when creating internal threads
++     * @param client
++     *            the client
++     * @param path
++     *            path to watch
++     * @param cacheData
++     *            if true, node contents are cached in addition to the stat
++     * @param threadFactory
++     *            factory to use when creating internal threads
+      */
+-    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)
+-    {
+-        this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
++    public PathChildrenCache(CuratorFramework client,
++                             String path,
++                             boolean cacheData,
++                             ThreadFactory threadFactory) {
++        this(client, path, cacheData, false, new CloseableExecutorService(
++                Executors.newSingleThreadExecutor(threadFactory), true));
+     }
+ 
+     /**
+-     * @param client           the client
+-     * @param path             path to watch
+-     * @param cacheData        if true, node contents are cached in addition to the stat
+-     * @param dataIsCompressed if true, data in the path is compressed
+-     * @param executorService  ExecutorService to use for the PathChildrenCache's background thread
++     * @param client
++     *            the client
++     * @param path
++     *            path to watch
++     * @param cacheData
++     *            if true, node contents are cached in addition to the stat
++     * @param dataIsCompressed
++     *            if true, data in the path is compressed
++     * @param threadFactory
++     *            factory to use when creating internal threads
+      */
+-    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)
+-    {
+-        this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService));
++    public PathChildrenCache(CuratorFramework client,
++                             String path,
++                             boolean cacheData,
++                             boolean dataIsCompressed,
++                             ThreadFactory threadFactory) {
++        this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(
++                Executors.newSingleThreadExecutor(threadFactory), true));
+     }
+ 
+     /**
+-     * @param client           the client
+-     * @param path             path to watch
+-     * @param cacheData        if true, node contents are cached in addition to the stat
+-     * @param dataIsCompressed if true, data in the path is compressed
+-     * @param executorService  Closeable ExecutorService to use for the PathChildrenCache's background thread
++     * @param client
++     *            the client
++     * @param path
++     *            path to watch
++     * @param cacheData
++     *            if true, node contents are cached in addition to the stat
++     * @param dataIsCompressed
++     *            if true, data in the path is compressed
++     * @param executorService
++     *            ExecutorService to use for the PathChildrenCache's background thread
+      */
+-    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)
+-    {
++    public PathChildrenCache(CuratorFramework client,
++                             String path,
++                             boolean cacheData,
++                             boolean dataIsCompressed,
++                             final ExecutorService executorService) {
++        this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(
++                executorService));
++    }
++
++    /**
++     * @param client
++     *            the client
++     * @param path
++     *            path to watch
++     * @param cacheData
++     *            if true, node contents are cached in addition to the stat
++     * @param dataIsCompressed
++     *            if true, data in the path is compressed
++     * @param executorService
++     *            Closeable ExecutorService to use for the PathChildrenCache's background thread
++     */
++    public PathChildrenCache(CuratorFramework client,
++                             String path,
++                             boolean cacheData,
++                             boolean dataIsCompressed,
++                             final CloseableExecutorService executorService) {
++        this(client, path, cacheData, DescendantHandlingMode.DIRECT_DESCENDANTS_ONLY,
++             dataIsCompressed, executorService);
++    }
++
++    /**
++     * @param client
++     *            the client
++     * @param path
++     *            path to watch
++     * @param cacheData
++     *            if true, node contents are cached in addition to the stat
++     * @param descendantHandlingMode
++     *            Mode defining if only descendants of the root node will be considered or whether
++     *            the entire tree will be.
++     * @param dataIsCompressed
++     *            if true, data in the path is compressed
++     * @param executorService
++     *            Closeable ExecutorService to use for the PathChildrenCache's background thread
++     */
++    public PathChildrenCache(CuratorFramework client,
++                             String path,
++                             boolean cacheData,
++                             DescendantHandlingMode descendantHandlingMode,
++                             boolean dataIsCompressed,
++                             final CloseableExecutorService executorService) {
+         this.client = client;
+         this.path = path;
+         this.cacheData = cacheData;
++        this.descendantHandlingMode = descendantHandlingMode;
+         this.dataIsCompressed = dataIsCompressed;
+         this.executorService = executorService;
+         ensurePath = client.newNamespaceAwareEnsurePath(path);
+@@ -223,42 +321,43 @@ public class PathChildrenCache implements Closeable
+ 
+     /**
+      * Start the cache. The cache is not started automatically. You must call this method.
+-     *
+-     * @throws Exception errors
++     * 
++     * @throws Exception
++     *             errors
+      */
+-    public void start() throws Exception
+-    {
++    public void start() throws Exception {
+         start(StartMode.NORMAL);
+     }
+ 
+     /**
+      * Same as {@link #start()} but gives the option of doing an initial build
+-     *
+-     * @param buildInitial if true, {@link #rebuild()} will be called before this method
+-     *                     returns in order to get an initial view of the node; otherwise,
+-     *                     the cache will be initialized asynchronously
+-     * @throws Exception errors
++     * 
++     * @param buildInitial
++     *            if true, {@link #rebuild()} will be called before this method returns in order to
++     *            get an initial view of the node; otherwise, the cache will be initialized
++     *            asynchronously
++     * @throws Exception
++     *             errors
+      * @deprecated use {@link #start(StartMode)}
+      */
+-    public void start(boolean buildInitial) throws Exception
+-    {
++    @Deprecated
++    public void start(boolean buildInitial) throws Exception {
+         start(buildInitial ? StartMode.BUILD_INITIAL_CACHE : StartMode.NORMAL);
+     }
+ 
+     /**
+      * Method of priming cache on {@link PathChildrenCache#start(StartMode)}
+      */
+-    public enum StartMode
+-    {
++    public enum StartMode {
+         /**
+-         * cache will _not_ be primed. i.e. it will start empty and you will receive
+-         * events for all nodes added, etc.
++         * cache will _not_ be primed. i.e. it will start empty and you will receive events for all
++         * nodes added, etc.
+          */
+         NORMAL,
+ 
+         /**
+-         * {@link PathChildrenCache#rebuild()} will be called before this method returns in
+-         * order to get an initial view of the node.
++         * {@link PathChildrenCache#rebuild()} will be called before this method returns in order to
++         * get an initial view of the node.
+          */
+         BUILD_INITIAL_CACHE,
+ 
+@@ -271,34 +370,32 @@ public class PathChildrenCache implements Closeable
+ 
+     /**
+      * Start the cache. The cache is not started automatically. You must call this method.
+-     *
+-     * @param mode Method for priming the cache
+-     * @throws Exception errors
++     * 
++     * @param mode
++     *            Method for priming the cache
++     * @throws Exception
++     *             errors
+      */
+-    public void start(StartMode mode) throws Exception
+-    {
+-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
++    public void start(StartMode mode) throws Exception {
++        Preconditions.checkState(
++                state.compareAndSet(State.LATENT, State.STARTED), "already started");
+         mode = Preconditions.checkNotNull(mode, "mode cannot be null");
+ 
+         client.getConnectionStateListenable().addListener(connectionStateListener);
+ 
+-        switch ( mode )
+-        {
+-            case NORMAL:
+-            {
++        switch (mode) {
++            case NORMAL: {
+                 offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
+                 break;
+             }
+ 
+-            case BUILD_INITIAL_CACHE:
+-            {
++            case BUILD_INITIAL_CACHE: {
+                 rebuild();
+                 break;
+             }
+ 
+-            case POST_INITIALIZED_EVENT:
+-            {
+-                initialSet.set(Maps.<String, ChildData>newConcurrentMap());
++            case POST_INITIALIZED_EVENT: {
++                initialSet.set(Maps.<String, ChildData> newConcurrentMap());
+                 offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED));
+                 break;
+             }
+@@ -306,13 +403,13 @@ public class PathChildrenCache implements Closeable
+     }
+ 
+     /**
+-     * NOTE: this is a BLOCKING method. Completely rebuild the internal cache by querying
+-     * for all needed data WITHOUT generating any events to send to listeners.
+-     *
+-     * @throws Exception errors
++     * NOTE: this is a BLOCKING method. Completely rebuild the internal cache by querying for all
++     * needed data WITHOUT generating any events to send to listeners.
++     * 
++     * @throws Exception
++     *             errors
+      */
+-    public void rebuild() throws Exception
+-    {
++    public void rebuild() throws Exception {
+         Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");
+ 
+         ensurePath.ensure(client.getZookeeperClient());
+@@ -320,13 +417,11 @@ public class PathChildrenCache implements Closeable
+         clear();
+ 
+         List<String> children = client.getChildren().forPath(path);
+-        for ( String child : children )
+-        {
++        for (String child : children) {
+             String fullPath = ZKPaths.makePath(path, child);
+             internalRebuildNode(fullPath);
+ 
+-            if ( rebuildTestExchanger != null )
+-            {
++            if (rebuildTestExchanger != null) {
+                 rebuildTestExchanger.exchange(new Object());
+             }
+         }
+@@ -335,16 +430,34 @@ public class PathChildrenCache implements Closeable
+         offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
+     }
+ 
++    private void rebuild(String root) throws Exception {
++        Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");
++
++        ensurePath.ensure(client.getZookeeperClient());
++
++        List<String> children = client.getChildren().forPath(path);
++        if (children != null && !children.isEmpty()) {
++            for (String child : children) {
++                rebuild(child);
++            }
++
++        }
++
++    }
++
+     /**
+      * NOTE: this is a BLOCKING method. Rebuild the internal cache for the given node by querying
+      * for all needed data WITHOUT generating any events to send to listeners.
+-     *
+-     * @param fullPath full path of the node to rebuild
+-     * @throws Exception errors
++     * 
++     * @param fullPath
++     *            full path of the node to rebuild
++     * @throws Exception
++     *             errors
+      */
+-    public void rebuildNode(String fullPath) throws Exception
+-    {
+-        Preconditions.checkArgument(ZKPaths.getPathAndNode(fullPath).getPath().equals(path), "Node is not part of this cache: " + fullPath);
++    public void rebuildNode(String fullPath) throws Exception {
++        Preconditions.checkArgument(
++                ZKPaths.getPathAndNode(fullPath).getPath().equals(path),
++                "Node is not part of this cache: " + fullPath);
+         Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");
+ 
+         ensurePath.ensure(client.getZookeeperClient());
+@@ -357,14 +470,13 @@ public class PathChildrenCache implements Closeable
+ 
+     /**
+      * Close/end the cache
+-     *
+-     * @throws IOException errors
++     * 
++     * @throws IOException
++     *             errors
+      */
+     @Override
+-    public void close() throws IOException
+-    {
+-        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+-        {
++    public void close() throws IOException {
++        if (state.compareAndSet(State.STARTED, State.CLOSED)) {
+             client.getConnectionStateListenable().removeListener(connectionStateListener);
+             executorService.close();
+         }
+@@ -372,64 +484,61 @@ public class PathChildrenCache implements Closeable
+ 
+     /**
+      * Return the cache listenable
+-     *
++     * 
+      * @return listenable
+      */
+-    public ListenerContainer<PathChildrenCacheListener> getListenable()
+-    {
++    public ListenerContainer<PathChildrenCacheListener> getListenable() {
+         return listeners;
+     }
+ 
+     /**
+-     * Return the current data. There are no guarantees of accuracy. This is
+-     * merely the most recent view of the data. The data is returned in sorted order.
+-     *
++     * Return the current data. There are no guarantees of accuracy. This is merely the most recent
++     * view of the data. The data is returned in sorted order.
++     * 
+      * @return list of children and data
+      */
+-    public List<ChildData> getCurrentData()
+-    {
+-        return ImmutableList.copyOf(Sets.<ChildData>newTreeSet(currentData.values()));
++    public List<ChildData> getCurrentData() {
++        return ImmutableList.copyOf(Sets.<ChildData> newTreeSet(currentData.values()));
+     }
+ 
+     /**
+      * Return the current data for the given path. There are no guarantees of accuracy. This is
+-     * merely the most recent view of the data. If there is no child with that path, <code>null</code>
+-     * is returned.
+-     *
+-     * @param fullPath full path to the node to check
++     * merely the most recent view of the data. If there is no child with that path,
++     * <code>null</code> is returned.
++     * 
++     * @param fullPath
++     *            full path to the node to check
+      * @return data or null
+      */
+-    public ChildData getCurrentData(String fullPath)
+-    {
++    public ChildData getCurrentData(String fullPath) {
+         return currentData.get(fullPath);
+     }
+ 
+     /**
+-     * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
+-     * calls to {@link ChildData#getData()} for this node will return <code>null</code>.
+-     *
+-     * @param fullPath the path of the node to clear
++     * As a memory optimization, you can clear the cached data bytes for a node. Subsequent calls to
++     * {@link ChildData#getData()} for this node will return <code>null</code>.
++     * 
++     * @param fullPath
++     *            the path of the node to clear
+      */
+-    public void clearDataBytes(String fullPath)
+-    {
++    public void clearDataBytes(String fullPath) {
+         clearDataBytes(fullPath, -1);
+     }
+ 
+     /**
+-     * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
+-     * calls to {@link ChildData#getData()} for this node will return <code>null</code>.
+-     *
+-     * @param fullPath  the path of the node to clear
+-     * @param ifVersion if non-negative, only clear the data if the data's version matches this version
++     * As a memory optimization, you can clear the cached data bytes for a node. Subsequent calls to
++     * {@link ChildData#getData()} for this node will return <code>null</code>.
++     * 
++     * @param fullPath
++     *            the path of the node to clear
++     * @param ifVersion
++     *            if non-negative, only clear the data if the data's version matches this version
+      * @return true if the data was cleared
+      */
+-    public boolean clearDataBytes(String fullPath, int ifVersion)
+-    {
++    public boolean clearDataBytes(String fullPath, int ifVersion) {
+         ChildData data = currentData.get(fullPath);
+-        if ( data != null )
+-        {
+-            if ( (ifVersion < 0) || (ifVersion == data.getStat().getVersion()) )
+-            {
++        if (data != null) {
++            if ((ifVersion < 0) || (ifVersion == data.getStat().getVersion())) {
+                 data.clearData();
+                 return true;
+             }
+@@ -439,275 +548,254 @@ public class PathChildrenCache implements Closeable
+ 
+     /**
+      * Clear out current data and begin a new query on the path
+-     *
+-     * @throws Exception errors
++     * 
++     * @throws Exception
++     *             errors
+      */
+-    public void clearAndRefresh() throws Exception
+-    {
++    public void clearAndRefresh() throws Exception {
+         currentData.clear();
+         offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
+     }
+ 
+     /**
+-     * Clears the current data without beginning a new query and without generating any events
+-     * for listeners.
++     * Clears the current data without beginning a new query and without generating any events for
++     * listeners.
+      */
+-    public void clear()
+-    {
++    public void clear() {
+         currentData.clear();
+     }
+ 
+-    enum RefreshMode
+-    {
+-        STANDARD,
+-        FORCE_GET_DATA_AND_STAT,
+-        POST_INITIALIZED
++    enum RefreshMode {
++        STANDARD, FORCE_GET_DATA_AND_STAT, POST_INITIALIZED
+     }
+ 
+-    void refresh(final RefreshMode mode) throws Exception
+-    {
++    void refresh(final RefreshMode mode, final String nodePath) throws Exception {
+         ensurePath.ensure(client.getZookeeperClient());
+ 
+-        final BackgroundCallback callback = new BackgroundCallback()
+-        {
++        final BackgroundCallback callback = new BackgroundCallback() {
++
+             @Override
+-            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+-            {
+-                processChildren(event.getChildren(), mode);
++            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
++                processChildren(event.getPath(), event.getChildren(), mode);
+             }
+         };
+ 
+-        client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);
++        client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(nodePath);
+     }
+ 
+-    void callListeners(final PathChildrenCacheEvent event)
+-    {
+-        listeners.forEach
+-            (
+-                new Function<PathChildrenCacheListener, Void>()
+-                {
+-                    @Override
+-                    public Void apply(PathChildrenCacheListener listener)
+-                    {
+-                        try
+-                        {
+-                            listener.childEvent(client, event);
+-                        }
+-                        catch ( Exception e )
+-                        {
+-                            handleException(e);
+-                        }
+-                        return null;
+-                    }
++    void refresh(final RefreshMode mode) throws Exception {
++        refresh(mode, path);
++    }
++
++    void callListeners(final PathChildrenCacheEvent event) {
++        listeners.forEach(new Function<PathChildrenCacheListener, Void>() {
++
++            @Override
++            public Void apply(PathChildrenCacheListener listener) {
++                try {
++                    listener.childEvent(client, event);
++                } catch (Exception e) {
++                    handleException(e);
+                 }
+-            );
++                return null;
++            }
++        });
+     }
+ 
+-    void getDataAndStat(final String fullPath) throws Exception
+-    {
+-        BackgroundCallback existsCallback = new BackgroundCallback()
+-        {
++    void getDataAndStat(final String fullPath) throws Exception {
++        BackgroundCallback existsCallback = new BackgroundCallback() {
++
+             @Override
+-            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+-            {
++            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+                 applyNewData(fullPath, event.getResultCode(), event.getStat(), null);
+             }
+         };
+ 
+-        BackgroundCallback getDataCallback = new BackgroundCallback()
+-        {
++        BackgroundCallback getDataCallback = new BackgroundCallback() {
++
+             @Override
+-            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+-            {
++            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+                 applyNewData(fullPath, event.getResultCode(), event.getStat(), event.getData());
+             }
+         };
+ 
+-        if ( cacheData )
+-        {
+-            if ( dataIsCompressed )
+-            {
+-                client.getData().decompressed().usingWatcher(dataWatcher).inBackground(getDataCallback).forPath(fullPath);
+-            }
+-            else
+-            {
+-                client.getData().usingWatcher(dataWatcher).inBackground(getDataCallback).forPath(fullPath);
++        if (cacheData) {
++            if (dataIsCompressed) {
++                client.getData().decompressed().usingWatcher(dataWatcher).inBackground(
++                        getDataCallback).forPath(fullPath);
++            } else {
++                client.getData().usingWatcher(dataWatcher).inBackground(getDataCallback).forPath(
++                        fullPath);
+             }
+-        }
+-        else
+-        {
+-            client.checkExists().usingWatcher(dataWatcher).inBackground(existsCallback).forPath(fullPath);
++        } else {
++            client.checkExists().usingWatcher(dataWatcher).inBackground(existsCallback).forPath(
++                    fullPath);
+         }
+     }
+ 
+     /**
+      * Default behavior is just to log the exception
+-     *
+-     * @param e the exception
++     * 
++     * @param e
++     *            the exception
+      */
+-    protected void handleException(Throwable e)
+-    {
++    protected void handleException(Throwable e) {
+         log.error("", e);
+     }
+ 
+     @VisibleForTesting
+-    protected void remove(String fullPath)
+-    {
++    protected void remove(String fullPath) {
+         ChildData data = currentData.remove(fullPath);
+-        if ( data != null )
+-        {
+-            offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED, data)));
++        if (data != null) {
++            offerOperation(new EventOperation(this, new PathChildrenCacheEvent(
++                    PathChildrenCacheEvent.Type.CHILD_REMOVED, data)));
+         }
+ 
+         Map<String, ChildData> localInitialSet = initialSet.get();
+-        if ( localInitialSet != null )
+-        {
++        if (localInitialSet != null) {
+             localInitialSet.remove(fullPath);
+             maybeOfferInitializedEvent(localInitialSet);
+         }
+     }
+ 
+-    private void internalRebuildNode(String fullPath) throws Exception
+-    {
+-        if ( cacheData )
+-        {
+-            try
+-            {
++    private void internalRebuildNode(String fullPath) throws Exception {
++        if (cacheData) {
++            try {
+                 Stat stat = new Stat();
+-                byte[] bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(fullPath) : client.getData().storingStatIn(stat).forPath(fullPath);
++                byte[] bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(
++                        stat).forPath(fullPath) : client.getData().storingStatIn(stat).forPath(
++                        fullPath);
+                 currentData.put(fullPath, new ChildData(fullPath, stat, bytes));
+-            }
+-            catch ( KeeperException.NoNodeException ignore )
+-            {
++            } catch (KeeperException.NoNodeException ignore) {
+                 // node no longer exists - remove it
+                 currentData.remove(fullPath);
+             }
+-        }
+-        else
+-        {
++        } else {
+             Stat stat = client.checkExists().forPath(fullPath);
+-            if ( stat != null )
+-            {
++            if (stat != null) {
+                 currentData.put(fullPath, new ChildData(fullPath, stat, null));
+-            }
+-            else
+-            {
++            } else {
+                 // node no longer exists - remove it
+                 currentData.remove(fullPath);
+             }
+         }
+     }
+ 
+-    private void handleStateChange(ConnectionState newState)
+-    {
+-        switch ( newState )
+-        {
+-        case SUSPENDED:
+-        {
+-            offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null)));
+-            break;
+-        }
+-
+-        case LOST:
+-        {
+-            offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_LOST, null)));
+-            break;
+-        }
++    private void handleStateChange(ConnectionState newState) {
++        switch (newState) {
++            case SUSPENDED: {
++                offerOperation(new EventOperation(this, new PathChildrenCacheEvent(
++                        PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null)));
++                break;
++            }
+ 
+-        case RECONNECTED:
+-        {
+-            try
+-            {
+-                offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
+-                offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null)));
++            case LOST: {
++                offerOperation(new EventOperation(this, new PathChildrenCacheEvent(
++                        PathChildrenCacheEvent.Type.CONNECTION_LOST, null)));
++                break;
+             }
+-            catch ( Exception e )
+-            {
+-                handleException(e);
++
++            case RECONNECTED: {
++                try {
++                    offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
++                    offerOperation(new EventOperation(this, new PathChildrenCacheEvent(
++                            PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null)));
++                } catch (Exception e) {
++                    handleException(e);
++                }
++                break;
+             }
+-            break;
+-        }
+         }
+     }
+ 
+-    private void processChildren(List<String> children, RefreshMode mode) throws Exception
+-    {
+-        List<String> fullPaths = Lists.newArrayList(Lists.transform
+-            (
+-                children,
+-                new Function<String, String>()
+-                {
++    private void processChildren(final String root, List<String> children, RefreshMode mode)
++            throws Exception {
++        List<String> fullPaths = Lists.newArrayList(Lists.transform(
++                children, new Function<String, String>() {
++
+                     @Override
+-                    public String apply(String child)
+-                    {
+-                        return ZKPaths.makePath(path, child);
++                    public String apply(String child) {
++                        return ZKPaths.makePath(root, child);
+                     }
+-                }
+-            ));
++                }));
+         Set<String> removedNodes = Sets.newHashSet(currentData.keySet());
+         removedNodes.removeAll(fullPaths);
++        removedNodes.remove(root);
+ 
+-        for ( String fullPath : removedNodes )
+-        {
+-            remove(fullPath);
++        Set<String> nodesToKeep = Sets.newHashSet();
++        for (String removedNode : removedNodes) {
++            // Don't remove the current node being processed, or any of its parent nodes
++            if (removedNode.length() <= root.length() && root.startsWith(removedNode)) {
++                nodesToKeep.add(removedNode);
++            }
+         }
+ 
+-        for ( String name : children )
+-        {
+-            String fullPath = ZKPaths.makePath(path, name);
++        for (String fullPath : removedNodes) {
++            if (!nodesToKeep.contains(fullPath)) {
++                remove(fullPath);
++            }
++        }
+ 
+-            if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) )
+-            {
++        for (String name : children) {
++            String fullPath = ZKPaths.makePath(root, name);
++
++            boolean exists = currentData.containsKey(fullPath);
++
++            if ((mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !exists) {
+                 getDataAndStat(fullPath);
+             }
+ 
+             updateInitialSet(name, NULL_CHILD_DATA);
++
++            if (!exists && descendantHandlingMode == DescendantHandlingMode.ALL_DESCENDANTS) {
++                if (!removedNodes.contains(fullPath)) {
++                    refresh(mode, fullPath);
++                }
++            }
+         }
+         maybeOfferInitializedEvent(initialSet.get());
+     }
+ 
+-    private void applyNewData(String fullPath, int resultCode, Stat stat, byte[] bytes)
+-    {
+-        if ( resultCode == KeeperException.Code.OK.intValue() ) // otherwise - node must have dropped or something - we should be getting another event
++    private void applyNewData(String fullPath, int resultCode, Stat stat, byte[] bytes) {
++        if (resultCode == KeeperException.Code.OK.intValue()) // otherwise - node must have dropped
++                                                              // or something - we should be getting
++                                                              // another event
+         {
+             ChildData data = new ChildData(fullPath, stat, bytes);
+             ChildData previousData = currentData.put(fullPath, data);
+-            if ( previousData == null ) // i.e. new
+-            {
+-                offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data)));
+-            }
+-            else if ( previousData.getStat().getVersion() != stat.getVersion() )
++            if (previousData == null) // i.e. new
+             {
+-                offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data)));
++                offerOperation(new EventOperation(this, new PathChildrenCacheEvent(
++                        PathChildrenCacheEvent.Type.CHILD_ADDED, data)));
++            } else if (previousData.getStat().getVersion() != stat.getVersion()) {
++                offerOperation(new EventOperation(this, new PathChildrenCacheEvent(
++                        PathChildrenCacheEvent.Type.CHILD_UPDATED, data)));
+             }
+             updateInitialSet(ZKPaths.getNodeFromPath(fullPath), data);
+         }
+     }
+ 
+-    private void updateInitialSet(String name, ChildData data)
+-    {
++    private void updateInitialSet(String name, ChildData data) {
+         Map<String, ChildData> localInitialSet = initialSet.get();
+-        if ( localInitialSet != null )
+-        {
++        if (localInitialSet != null) {
+             localInitialSet.put(name, data);
+             maybeOfferInitializedEvent(localInitialSet);
+         }
+     }
+ 
+-    private void maybeOfferInitializedEvent(Map<String, ChildData> localInitialSet)
+-    {
+-        if ( !hasUninitialized(localInitialSet) )
+-        {
++    private void maybeOfferInitializedEvent(Map<String, ChildData> localInitialSet) {
++        if (!hasUninitialized(localInitialSet)) {
+             // all initial children have been processed - send initialized message
+ 
+-            if ( initialSet.getAndSet(null) != null )   // avoid edge case - don't send more than 1 INITIALIZED event
++            if (initialSet.getAndSet(null) != null) // avoid edge case - don't send more than 1
++                                                    // INITIALIZED event
+             {
+                 final List<ChildData> children = ImmutableList.copyOf(localInitialSet.values());
+-                PathChildrenCacheEvent event = new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED, null)
+-                {
++                PathChildrenCacheEvent event = new PathChildrenCacheEvent(
++                        PathChildrenCacheEvent.Type.INITIALIZED, null) {
++
+                     @Override
+-                    public List<ChildData> getInitialData()
+-                    {
++                    public List<ChildData> getInitialData() {
+                         return children;
+                     }
+                 };
+@@ -716,71 +804,56 @@ public class PathChildrenCache implements Closeable
+         }
+     }
+ 
+-    private boolean hasUninitialized(Map<String, ChildData> localInitialSet)
+-    {
+-        if ( localInitialSet == null )
+-        {
++    private boolean hasUninitialized(Map<String, ChildData> localInitialSet) {
++        if (localInitialSet == null) {
+             return false;
+         }
+ 
+-        Map<String, ChildData> uninitializedChildren = Maps.filterValues
+-            (
+-                localInitialSet,
+-                new Predicate<ChildData>()
+-                {
++        Map<String, ChildData> uninitializedChildren = Maps.filterValues(
++                localInitialSet, new Predicate<ChildData>() {
++
+                     @Override
+-                    public boolean apply(ChildData input)
+-                    {
+-                        return (input == NULL_CHILD_DATA);  // check against ref intentional
++                    public boolean apply(ChildData input) {
++                        return (input == NULL_CHILD_DATA); // check against ref intentional
+                     }
+-                }
+-            );
++                });
+         return (uninitializedChildren.size() != 0);
+     }
+ 
+-    private void offerOperation(final Operation operation)
+-    {
+-        if ( operationsQuantizer.add(operation) )
+-        {
+-            submitToExecutor
+-            (
+-                new Runnable()
+-                {
+-                    @Override
+-                    public void run()
+-                    {
+-                        try
+-                        {
+-                            operationsQuantizer.remove(operation);
+-                            operation.invoke();
+-                        }
+-                        catch ( Exception e )
+-                        {
+-                            handleException(e);
+-                        }
++    private void offerOperation(final Operation operation) {
++        if (operationsQuantizer.add(operation)) {
++            submitToExecutor(new Runnable() {
++
++                @Override
++                public void run() {
++                    try {
++                        operationsQuantizer.remove(operation);
++                        operation.invoke();
++                    } catch (Exception e) {
++                        handleException(e);
+                     }
+                 }
+-            );
++            });
+         }
+     }
+ 
+     /**
+      * Submits a runnable to the executor.
+      * <p/>
+-     * This method is synchronized because it has to check state about whether this instance is still open.  Without this check
+-     * there is a race condition with the dataWatchers that get set.  Even after this object is closed() it can still be
+-     * called by those watchers, because the close() method cannot actually disable the watcher.
++     * This method is synchronized because it has to check state about whether this instance is
++     * still open. Without this check there is a race condition with the dataWatchers that get set.
++     * Even after this object is closed() it can still be called by those watchers, because the
++     * close() method cannot actually disable the watcher.
+      * <p/>
+-     * The synchronization overhead should be minimal if non-existant as this is generally only called from the
+-     * ZK client thread and will only contend if close() is called in parallel with an update, and that's the exact state
+-     * we want to protect from.
+-     *
+-     * @param command The runnable to run
++     * The synchronization overhead should be minimal if non-existant as this is generally only
++     * called from the ZK client thread and will only contend if close() is called in parallel with
++     * an update, and that's the exact state we want to protect from.
++     * 
++     * @param command
++     *            The runnable to run
+      */
+-    private synchronized void submitToExecutor(final Runnable command)
+-    {
+-        if ( state.get() == State.STARTED )
+-        {
++    private synchronized void submitToExecutor(final Runnable command) {
++        if (state.get() == State.STARTED) {
+             executorService.submit(command);
+         }
+     }
+diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+index 4b117fb..e524153 100644
+--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
++++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+@@ -1,29 +1,37 @@
+ /**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements.  See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership.  The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License.  You may obtain a copy of the License at
+- *
+- *   http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing,
+- * software distributed under the License is distributed on an
+- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+- * KIND, either express or implied.  See the License for the
+- * specific language governing permissions and limitations
+- * under the License.
++ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
++ * agreements. See the NOTICE file distributed with this work for additional information regarding
++ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance with the License. You may obtain a
++ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
++ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
++ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
++ * for the specific language governing permissions and limitations under the License.
+  */
+ package org.apache.curator.framework.recipes.cache;
+ 
+-import com.google.common.collect.Lists;
+-import com.google.common.io.Closeables;
++import java.util.Collection;
++import java.util.List;
++import java.util.concurrent.BlockingQueue;
++import java.util.concurrent.Callable;
++import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.Exchanger;
++import java.util.concurrent.ExecutionException;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++import java.util.concurrent.Future;
++import java.util.concurrent.LinkedBlockingQueue;
++import java.util.concurrent.Semaphore;
++import java.util.concurrent.TimeUnit;
++import java.util.concurrent.TimeoutException;
++import java.util.concurrent.atomic.AtomicInteger;
++import java.util.concurrent.atomic.AtomicReference;
++
+ import org.apache.curator.framework.CuratorFramework;
+ import org.apache.curator.framework.CuratorFrameworkFactory;
+ import org.apache.curator.framework.api.UnhandledErrorListener;
+ import org.apache.curator.framework.recipes.BaseClassForTests;
++import org.apache.curator.framework.recipes.cache.PathChildrenCache.DescendantHandlingMode;
+ import org.apache.curator.retry.RetryOneTime;
+ import org.apache.curator.test.KillSession;
+ import org.apache.curator.test.Timing;
+@@ -31,57 +39,48 @@ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException;
+ import org.testng.Assert;
+ import org.testng.annotations.Test;
+-import java.util.Collection;
+-import java.util.List;
+-import java.util.concurrent.*;
+-import java.util.concurrent.atomic.AtomicInteger;
+-import java.util.concurrent.atomic.AtomicReference;
+ 
+-public class TestPathChildrenCache extends BaseClassForTests
+-{
++import com.google.common.collect.Lists;
++import com.google.common.io.Closeables;
++
++public class TestPathChildrenCache extends BaseClassForTests {
++
+     @Test
+-    public void testPostInitializedForEmpty() throws Exception
+-    {
++    public void testPostInitializedForEmpty() throws Exception {
+         Timing timing = new Timing();
+         PathChildrenCache cache = null;
+-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+-        try
+-        {
++        CuratorFramework client = CuratorFrameworkFactory.newClient(
++                server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(
++                        1));
++        try {
+             client.start();
+ 
+             final CountDownLatch latch = new CountDownLatch(1);
+             cache = new PathChildrenCache(client, "/test", true);
+-            cache.getListenable().addListener
+-            (
+-                new PathChildrenCacheListener()
+-                {
+-                    @Override
+-                    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+-                    {
+-                        if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED )
+-                        {
+-                            latch.countDown();
+-                        }
++            cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++                @Override
++                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++                        throws Exception {
++                    if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED) {
++                        latch.countDown();
+                     }
+                 }
+-            );
++            });
+             cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+             Assert.assertTrue(timing.awaitLatch(latch));
+-        }
+-        finally
+-        {
++        } finally {
+             Closeables.closeQuietly(cache);
+             Closeables.closeQuietly(client);
+         }
+     }
+ 
+     @Test
+-    public void testAsyncInitialPopulation() throws Exception
+-    {
++    public void testAsyncInitialPopulation() throws Exception {
+         PathChildrenCache cache = null;
+-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+-        try
+-        {
++        CuratorFramework client = CuratorFrameworkFactory.newClient(
++                server.getConnectString(), new RetryOneTime(1));
++        try {
+             client.start();
+ 
+             client.create().forPath("/test");
+@@ -89,17 +88,14 @@ public class TestPathChildrenCache extends BaseClassForTests
+ 
+             final BlockingQueue<PathChildrenCacheEvent> events = new LinkedBlockingQueue<PathChildrenCacheEvent>();
+             cache = new PathChildrenCache(client, "/test", true);
+-            cache.getListenable().addListener
+-                (
+-                    new PathChildrenCacheListener()
+-                    {
+-                        @Override
+-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+-                        {
+-                            events.offer(event);
+-                        }
+-                    }
+-                );
++            cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++                @Override
++                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++                        throws Exception {
++                    events.offer(event);
++                }
++            });
+             cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+ 
+             PathChildrenCacheEvent event = events.poll(10, TimeUnit.SECONDS);
+@@ -108,22 +104,20 @@ public class TestPathChildrenCache extends BaseClassForTests
+             event = events.poll(10, TimeUnit.SECONDS);
+             Assert.assertEquals(event.getType(), PathChildrenCacheEvent.Type.INITIALIZED);
+             Assert.assertEquals(event.getInitialData().size(), 1);
+-        }
+-        finally
+-        {
++        } finally {
+             Closeables.closeQuietly(cache);
+             Closeables.closeQuietly(client);
+         }
+     }
+ 
+     @Test
+-    public void testChildrenInitialized() throws Exception
+-    {
++    public void testChildrenInitialized() throws Exception {
+         Timing timing = new Timing();
+         PathChildrenCache cache = null;
+-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+-        try
+-        {
++        CuratorFramework client = CuratorFrameworkFactory.newClient(
++                server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(
++                        1));
++        try {
+             client.start();
+             client.create().forPath("/test");
+ 
+@@ -131,24 +125,18 @@ public class TestPathChildrenCache extends BaseClassForTests
+ 
+             final CountDownLatch addedLatch = new CountDownLatch(3);
+             final CountDownLatch initLatch = new CountDownLatch(1);
+-            cache.getListenable().addListener
+-                (
+-                    new PathChildrenCacheListener()
+-                    {
+-                        @Override
+-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+-                        {
+-                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+-                            {
+-                                addedLatch.countDown();
+-                            }
+-                            else if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED )
+-                            {
+-                                initLatch.countDown();
+-                            }
+-                        }
++            cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++                @Override
++                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++                        throws Exception {
++                    if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
++                        addedLatch.countDown();
++                    } else if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED) {
++                        initLatch.countDown();
+                     }
+-                );
++                }
++            });
+ 
+             client.create().forPath("/test/1", "1".getBytes());
+             client.create().forPath("/test/2", "2".getBytes());
+@@ -162,45 +150,37 @@ public class TestPathChildrenCache extends BaseClassForTests
+             Assert.assertEquals(cache.getCurrentData().get(0).getData(), "1".getBytes());
+             Assert.assertEquals(cache.getCurrentData().get(1).getData(), "2".getBytes());
+             Assert.assertEquals(cache.getCurrentData().get(2).getData(), "3".getBytes());
+-        }
+-        finally
+-        {
++        } finally {
+             Closeables.closeQuietly(cache);
+             Closeables.closeQuietly(client);
+         }
+     }
+ 
+     @Test
+-    public void testUpdateWhenNotCachingData() throws Exception
+-    {
++    public void testUpdateWhenNotCachingData() throws Exception {
+         Timing timing = new Timing();
+ 
+-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
++        CuratorFramework client = CuratorFrameworkFactory.newClient(
++                server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(
++                        1));
+         client.start();
+-        try
+-        {
++        try {
+             final CountDownLatch updatedLatch = new CountDownLatch(1);
+             final CountDownLatch addedLatch = new CountDownLatch(1);
+             client.create().creatingParentsIfNeeded().forPath("/test");
+             PathChildrenCache cache = new PathChildrenCache(client, "/test", false);
+-            cache.getListenable().addListener
+-                (
+-                    new PathChildrenCacheListener()
+-                    {
+-                        @Override
+-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+-                        {
+-                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED )
+-                            {
+-                                updatedLatch.countDown();
+-                            }
+-                            else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+-                            {
+-                                addedLatch.countDown();
+-                            }
+-                        }
++            cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++                @Override
++                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++                        throws Exception {
++                    if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) {
++                        updatedLatch.countDown();
++                    } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
++                        addedLatch.countDown();
+                     }
+-                );
++                }
++            });
+             cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+ 
+             client.create().forPath("/test/foo", "first".getBytes());
+@@ -208,94 +188,72 @@ public class TestPathChildrenCache extends BaseClassForTests
+ 
+             client.setData().forPath("/test/foo", "something new".getBytes());
+             Assert.assertTrue(timing.awaitLatch(updatedLatch));
+-        }
+-        finally
+-        {
++        } finally {
+             Closeables.closeQuietly(client);
+         }
+     }
+ 
+     @Test
+-    public void testEnsurePath() throws Exception
+-    {
++    public void testEnsurePath() throws Exception {
+         Timing timing = new Timing();
+ 
+-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
++        CuratorFramework client = CuratorFrameworkFactory.newClient(
++                server.getConnectString(), new RetryOneTime(1));
+         client.start();
+-        try
+-        {
++        try {
+             PathChildrenCache cache = new PathChildrenCache(client, "/one/two/three", false);
+             cache.start();
+             timing.sleepABit();
+ 
+-            try
+-            {
++            try {
+                 client.create().forPath("/one/two/three/four");
+-            }
+-            catch ( KeeperException.NoNodeException e )
+-            {
++            } catch (KeeperException.NoNodeException e) {
+                 Assert.fail("Path should exist", e);
+             }
+-        }
+-        finally
+-        {
++        } finally {
+             Closeables.closeQuietly(client);
+         }
+     }
+ 
+     @Test
+-    public void testDeleteThenCreate() throws Exception
+-    {
+-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
++    public void testDeleteThenCreate() throws Exception {
++        CuratorFramework client = CuratorFrameworkFactory.newClient(
++                server.getConnectString(), new RetryOneTime(1));
+         client.start();
+-        try
+-        {
++        try {
+             client.create().forPath("/test");
+             client.create().forPath("/test/foo", "one".getBytes());
+ 
+             final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+-            client.getUnhandledErrorListenable().addListener
+-                (
+-                    new UnhandledErrorListener()
+-                    {
+-                        @Override
+-                        public void unhandledError(String message, Throwable e)
+-                        {
+-                            error.set(e);
+-                        }
+-                    }
+-                );
++            client.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
++
++                @Override
++                public void unhandledError(String message, Throwable e) {
++                    error.set(e);
++                }
++            });
+ 
+             final CountDownLatch removedLatch = new CountDownLatch(1);
+             final CountDownLatch postRemovedLatch = new CountDownLatch(1);
+             final CountDownLatch dataLatch = new CountDownLatch(1);
+             PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
+-            cache.getListenable().addListener
+-                (
+-                    new PathChildrenCacheListener()
+-                    {
+-                        @Override
+-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+-                        {
+-                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
+-                            {
+-                                removedLatch.countDown();
+-                                Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS));
+-                            }
+-                            else
+-                            {
+-                                try
+-                                {
+-                                    Assert.assertEquals(event.getData().getData(), "two".getBytes());
+-                                }
+-                                finally
+-                                {
+-                                    dataLatch.countDown();
+-                                }
+-                            }
++            cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++                @Override
++                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++                        throws Exception {
++                    if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
++                        removedLatch.countDown();
++                        Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS));
++                    } else {
++                        try {
++                            Assert.assertEquals(event.getData().getData(), "two".getBytes());
++                        } finally {
++                            dataLatch.countDown();
+                         }
+                     }
+-                );
++                }
++            });
+             cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+ 
+             client.delete().forPath("/test/foo");
+@@ -305,26 +263,22 @@ public class TestPathChildrenCache extends BaseClassForTests
+             Assert.assertTrue(dataLatch.await(10, TimeUnit.SECONDS));
+ 
+             Throwable t = error.get();
+-            if ( t != null )
+-            {
++            if (t != null) {
+                 Assert.fail("Assert", t);
+             }
+ 
+             cache.close();
+-        }
+-        finally
+-        {
++        } finally {
+             client.close();
+         }
+     }
+ 
+     @Test
+-    public void testRebuildAgainstOtherProcesses() throws Exception
+-    {
+-        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
++    public void testRebuildAgainstOtherProcesses() throws Exception {
++        final CuratorFramework client = CuratorFrameworkFactory.newClient(
++                server.getConnectString(), new RetryOneTime(1));
+         client.start();
+-        try
+-        {
++        try {
+             client.create().forPath("/test");
+             client.create().forPath("/test/foo");
+             client.create().forPath("/test/bar");
+@@ -332,69 +286,56 @@ public class TestPathChildrenCache extends BaseClassForTests
+ 
+             final CountDownLatch addedLatch = new CountDownLatch(2);
+             final PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
+-            cache.getListenable().addListener
+-                (
+-                    new PathChildrenCacheListener()
+-                    {
+-                        @Override
+-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+-                        {
+-                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+-                            {
+-                                if ( event.getData().getPath().equals("/test/test") )
+-                                {
+-                                    addedLatch.countDown();
+-                                }
+-                            }
+-                            else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED )
+-                            {
+-                                if ( event.getData().getPath().equals("/test/snafu") )
+-                                {
+-                                    addedLatch.countDown();
+-                                }
+-                            }
++            cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++                @Override
++                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++                        throws Exception {
++                    if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
++                        if (event.getData().getPath().equals("/test/test")) {
++                            addedLatch.countDown();
++                        }
++                    } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) {
++                        if (event.getData().getPath().equals("/test/snafu")) {
++                            addedLatch.countDown();
+                         }
+                     }
+-                );
++                }
++            });
+             cache.rebuildTestExchanger = new Exchanger<Object>();
+             ExecutorService service = Executors.newSingleThreadExecutor();
+             final AtomicReference<String> deletedPath = new AtomicReference<String>();
+-            Future<Object> future = service.submit
+-                (
+-                    new Callable<Object>()
+-                    {
+-                        @Override
+-                        public Object call() throws Exception
+-                        {
+-                            cache.rebuildTestExchanger.exchange(new Object());
+-
+-                            // simulate another process adding a node while we're rebuilding
+-                            client.create().forPath("/test/test");
+-
+-                            List<ChildData> currentData = cache.getCurrentData();
+-                            Assert.assertTrue(currentData.size() > 0);
+-
+-                            // simulate another process removing a node while we're rebuilding
+-                            client.delete().forPath(currentData.get(0).getPath());
+-                            deletedPath.set(currentData.get(0).getPath());
+-
+-                            cache.rebuildTestExchanger.exchange(new Object());
+-
+-                            ChildData childData = null;
+-                            while ( childData == null )
+-                            {
+-                                childData = cache.getCurrentData("/test/snafu");
+-                                Thread.sleep(1000);
+-                            }
+-                            Assert.assertEquals(childData.getData(), "original".getBytes());
+-                            client.setData().forPath("/test/snafu", "grilled".getBytes());
+-
+-                            cache.rebuildTestExchanger.exchange(new Object());
+-
+-                            return null;
+-                        }
++            Future<Object> future = service.submit(new Callable<Object>() {
++
++                @Override
++                public Object call() throws Exception {
++                    cache.rebuildTestExchanger.exchange(new Object());
++
++                    // simulate another process adding a node while we're rebuilding
++                    client.create().forPath("/test/test");
++
++                    List<ChildData> currentData = cache.getCurrentData();
++                    Assert.assertTrue(currentData.size() > 0);
++
++                    // simulate another process removing a node while we're rebuilding
++                    client.delete().forPath(currentData.get(0).getPath());
++                    deletedPath.set(currentData.get(0).getPath());
++
++                    cache.rebuildTestExchanger.exchange(new Object());
++
++                    ChildData childData = null;
++                    while (childData == null) {
++                        childData = cache.getCurrentData("/test/snafu");
++                        Thread.sleep(1000);
+                     }
+-                );
++                    Assert.assertEquals(childData.getData(), "original".getBytes());
++                    client.setData().forPath("/test/snafu", "grilled".getBytes());
++
++                    cache.rebuildTestExchanger.exchange(new Object());
++
++                    return null;
++                }
++            });
+             cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+             future.get();
+ 
+@@ -404,21 +345,18 @@ public class TestPathChildrenCache extends BaseClassForTests
+             Assert.assertEquals(cache.getCurrentData("/test/snafu").getData(), "grilled".getBytes());
+ 
+             cache.close();
+-        }
+-        finally
+-        {
++        } finally {
+             client.close();
+         }
+     }
+ 
+     // see https://github.com/Netflix/curator/issues/27 - was caused by not comparing old->new data
+     @Test
+-    public void testIssue27() throws Exception
+-    {
+-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
++    public void testIssue27() throws Exception {
++        CuratorFramework client = CuratorFrameworkFactory.newClient(
++                server.getConnectString(), new RetryOneTime(1));
+         client.start();
+-        try
+-        {
++        try {
+             client.create().forPath("/base");
+             client.create().forPath("/base/a");
+             client.create().forPath("/base/b");
+@@ -429,18 +367,15 @@ public class TestPathChildrenCache extends BaseClassForTests
+             final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList();
+             final Semaphore semaphore = new Semaphore(0);
+             PathChildrenCache cache = new PathChildrenCache(client, "/base", true);
+-            cache.getListenable().addListener
+-                (
+-                    new PathChildrenCacheListener()
+-                    {
+-                        @Override
+-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+-                        {
+-                            events.add(event.getType());
+-                            semaphore.release();
+-                        }
+-                    }
+-                );
++            cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++                @Override
++                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++                        throws Exception {
++                    events.add(event.getType());
++                    semaphore.release();
++                }
++            });
+             cache.start();
+ 
+             Assert.assertTrue(semaphore.tryAcquire(3, 10, TimeUnit.SECONDS));
+@@ -451,30 +386,25 @@ public class TestPathChildrenCache extends BaseClassForTests
+             client.create().forPath("/base/a");
+             Assert.assertTrue(semaphore.tryAcquire(1, 10, TimeUnit.SECONDS));
+ 
+-            List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList
+-                (
++            List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList(
+                     PathChildrenCacheEvent.Type.CHILD_ADDED,
+                     PathChildrenCacheEvent.Type.CHILD_ADDED,
+                     PathChildrenCacheEvent.Type.CHILD_ADDED,
+                     PathChildrenCacheEvent.Type.CHILD_REMOVED,
+-                    PathChildrenCacheEvent.Type.CHILD_ADDED
+-                );
++                    PathChildrenCacheEvent.Type.CHILD_ADDED);
+             Assert.assertEquals(expected, events);
+-        }
+-        finally
+-        {
++        } finally {
+             client.close();
+         }
+     }
+ 
+     // test Issue 27 using new rebuild() method
+     @Test
+-    public void testIssue27Alt() throws Exception
+-    {
+-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
++    public void testIssue27Alt() throws Exception {
++        CuratorFramework client = CuratorFrameworkFactory.newClient(
++                server.getConnectString(), new RetryOneTime(1));
+         client.start();
+-        try
+-        {
++        try {
+             client.create().forPath("/base");
+             client.create().forPath("/base/a");
+             client.create().forPath("/base/b");
+@@ -485,18 +415,15 @@ public class TestPathChildrenCache extends BaseClassForTests
+             final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList();
+             final Semaphore semaphore = new Semaphore(0);
+             PathChildrenCache cache = new PathChildrenCache(client, "/base", true);
+-            cache.getListenable().addListener
+-                (
+-                    new PathChildrenCacheListener()
+-                    {
+-                        @Override
+-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+-                        {
+-                            events.add(event.getType());
+-                            semaphore.release();
+-                        }
+-                    }
+-                );
++            cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++                @Override
++                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++                        throws Exception {
++                    events.add(event.getType());
++                    semaphore.release();
++                }
++            });
+             cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+ 
+             client.delete().forPath("/base/a");
+@@ -505,27 +432,23 @@ public class TestPathChildrenCache extends BaseClassForTests
+             client.create().forPath("/base/a");
+             Assert.assertTrue(semaphore.tryAcquire(1, 10, TimeUnit.SECONDS));
+ 
+-            List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList
+-                (
++            List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList(
+                     PathChildrenCacheEvent.Type.CHILD_REMOVED,
+-                    PathChildrenCacheEvent.Type.CHILD_ADDED
+-                );
++                    PathChildrenCacheEvent.Type.CHILD_ADDED);
+             Assert.assertEquals(expected, events);
+-        }
+-        finally
+-        {
++        } finally {
+             client.close();
+         }
+     }
+ 
+     @Test
+-    public void testKilledSession() throws Exception
+-    {
++    public void testKilledSession() throws Exception {
+         Timing timing = new Timing();
+         CuratorFramework client = null;
+-        try
+-        {
+-            client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
++        try {
++            client = CuratorFrameworkFactory.newClient(
++                    server.getConnectString(), timing.session(), timing.connection(),
++                    new RetryOneTime(1));
+             client.start();
+             client.create().forPath("/test");
+ 
+@@ -536,32 +459,22 @@ public class TestPathChildrenCache extends BaseClassForTests
+             final CountDownLatch lostLatch = new CountDownLatch(1);
+             final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+             final CountDownLatch removedLatch = new CountDownLatch(1);
+-            cache.getListenable().addListener
+-                (
+-                    new PathChildrenCacheListener()
+-                    {
+-                        @Override
+-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+-                        {
+-                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+-                            {
+-                                childAddedLatch.countDown();
+-                            }
+-                            else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST )
+-                            {
+-                                lostLatch.countDown();
+-                            }
+-                            else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED )
+-                            {
+-                                reconnectedLatch.countDown();
+-                            }
+-                            else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
+-                            {
+-                                removedLatch.countDown();
+-                            }
+-                        }
++            cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++                @Override
++                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++                        throws Exception {
++                    if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
++                        childAddedLatch.countDown();
++                    } else if (event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST) {
++                        lostLatch.countDown();
++                    } else if (event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED) {
++                        reconnectedLatch.countDown();
++                    } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
++                        removedLatch.countDown();
+                     }
+-                );
++                }
++            });
+ 
+             client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
+             Assert.assertTrue(timing.awaitLatch(childAddedLatch));
+@@ -570,54 +483,48 @@ public class TestPathChildrenCache extends BaseClassForTests
+             Assert.assertTrue(timing.awaitLatch(lostLatch));
+             Assert.assertTrue(timing.awaitLatch(reconnected

<TRUNCATED>

Mime
View raw message