curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [3/6] curator git commit: Trying to make tests more reliable
Date Sat, 10 Oct 2015 21:52:02 GMT
Trying to make tests more reliable


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/b25a8a35
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/b25a8a35
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/b25a8a35

Branch: refs/heads/CURATOR-3.0
Commit: b25a8a35856abf9710d42fae0a7324fbe66c362d
Parents: 967faf1
Author: randgalt <randgalt@apache.org>
Authored: Sat Oct 10 15:15:50 2015 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Sat Oct 10 15:15:50 2015 -0500

----------------------------------------------------------------------
 .../recipes/cache/PathChildrenCache.java        |   6 +-
 .../framework/recipes/cache/TreeCache.java      |  21 +-
 .../recipes/cache/TestPathChildrenCache.java    | 432 +++++++++----------
 3 files changed, 220 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/b25a8a35/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 99a652d..e4e18d9 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -315,7 +315,7 @@ public class PathChildrenCache implements Closeable
      */
     public void rebuild() throws Exception
     {
-        Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");
+        Preconditions.checkState(state.get() == State.STARTED, "cache has been closed");
 
         ensurePath();
 
@@ -347,7 +347,7 @@ public class PathChildrenCache implements Closeable
     public void rebuildNode(String fullPath) throws Exception
     {
         Preconditions.checkArgument(ZKPaths.getPathAndNode(fullPath).getPath().equals(path),
"Node is not part of this cache: " + fullPath);
-        Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");
+        Preconditions.checkState(state.get() == State.STARTED, "cache has been closed");
 
         ensurePath();
         internalRebuildNode(fullPath);
@@ -370,8 +370,6 @@ public class PathChildrenCache implements Closeable
             client.getConnectionStateListenable().removeListener(connectionStateListener);
             listeners.clear();
             executorService.close();
-            client.clearWatcherReferences(childrenWatcher);
-            client.clearWatcherReferences(dataWatcher);
             client.removeWatchers();
 
             // TODO

http://git-wip-us.apache.org/repos/asf/curator/blob/b25a8a35/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index bda00bf..8030e8b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -248,18 +248,24 @@ public class TreeCache implements Closeable
 
         private void doRefreshChildren() throws Exception
         {
-            client.getChildren().usingWatcher(this).inBackground(this).forPath(path);
+            if ( treeState.get() == TreeState.STARTED )
+            {
+                client.getChildren().usingWatcher(this).inBackground(this).forPath(path);
+            }
         }
 
         private void doRefreshData() throws Exception
         {
-            if ( dataIsCompressed )
-            {
-                client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path);
-            }
-            else
+            if ( treeState.get() == TreeState.STARTED )
             {
-                client.getData().usingWatcher(this).inBackground(this).forPath(path);
+                if ( dataIsCompressed )
+                {
+                    client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path);
+                }
+                else
+                {
+                    client.getData().usingWatcher(this).inBackground(this).forPath(path);
+                }
             }
         }
 
@@ -285,7 +291,6 @@ public class TreeCache implements Closeable
         {
             stat.set(null);
             data.set(null);
-            client.clearWatcherReferences(this);
             ConcurrentMap<String, TreeNode> childMap = children.getAndSet(null);
             if ( childMap != null )
             {

http://git-wip-us.apache.org/repos/asf/curator/blob/b25a8a35/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 3571ca7..a4e2b2e 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -18,15 +18,9 @@
  */
 package org.apache.curator.framework.recipes.cache;
 
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.Pathable;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.retry.RetryOneTime;
@@ -35,31 +29,12 @@ import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.KillSession;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.log4j.Appender;
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.SimpleLayout;
-import org.apache.log4j.spi.LoggingEvent;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
 import org.testng.annotations.Test;
-
-import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Exchanger;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -348,48 +323,48 @@ public class TestPathChildrenCache extends BaseClassForTests
             final CountDownLatch removedLatch = new CountDownLatch(1);
             final CountDownLatch postRemovedLatch = new CountDownLatch(1);
             final CountDownLatch dataLatch = new CountDownLatch(1);
-            PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
-                    {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event) throws Exception
+            try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true)
)
+            {
+                cache.getListenable().addListener
+                    (
+                        new PathChildrenCacheListener()
                         {
-                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED
)
-                            {
-                                removedLatch.countDown();
-                                Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS));
-                            }
-                            else
+                            @Override
+                            public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event) throws Exception
                             {
-                                try
+                                if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED
)
                                 {
-                                    Assert.assertEquals(event.getData().getData(), "two".getBytes());
+                                    removedLatch.countDown();
+                                    Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS));
                                 }
-                                finally
+                                else
                                 {
-                                    dataLatch.countDown();
+                                    try
+                                    {
+                                        Assert.assertEquals(event.getData().getData(), "two".getBytes());
+                                    }
+                                    finally
+                                    {
+                                        dataLatch.countDown();
+                                    }
                                 }
                             }
                         }
-                    }
-                );
-            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+                    );
+                cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
 
-            client.delete().forPath("/test/foo");
-            Assert.assertTrue(timing.awaitLatch(removedLatch));
-            client.create().forPath("/test/foo", "two".getBytes());
-            postRemovedLatch.countDown();
-            Assert.assertTrue(timing.awaitLatch(dataLatch));
+                client.delete().forPath("/test/foo");
+                Assert.assertTrue(timing.awaitLatch(removedLatch));
+                client.create().forPath("/test/foo", "two".getBytes());
+                postRemovedLatch.countDown();
+                Assert.assertTrue(timing.awaitLatch(dataLatch));
 
-            Throwable t = error.get();
-            if ( t != null )
-            {
-                Assert.fail("Assert", t);
+                Throwable t = error.get();
+                if ( t != null )
+                {
+                    Assert.fail("Assert", t);
+                }
             }
-
-            cache.close();
         }
         finally
         {
@@ -411,79 +386,79 @@ public class TestPathChildrenCache extends BaseClassForTests
             client.create().forPath("/test/snafu", "original".getBytes());
 
             final CountDownLatch addedLatch = new CountDownLatch(2);
-            final PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
-                    {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event) throws Exception
+            try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test",
true) )
+            {
+                cache.getListenable().addListener
+                    (
+                        new PathChildrenCacheListener()
                         {
-                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED
)
+                            @Override
+                            public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event) throws Exception
                             {
-                                if ( event.getData().getPath().equals("/test/test") )
+                                if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED
)
                                 {
-                                    addedLatch.countDown();
+                                    if ( event.getData().getPath().equals("/test/test") )
+                                    {
+                                        addedLatch.countDown();
+                                    }
                                 }
-                            }
-                            else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED
)
-                            {
-                                if ( event.getData().getPath().equals("/test/snafu") )
+                                else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED
)
                                 {
-                                    addedLatch.countDown();
+                                    if ( event.getData().getPath().equals("/test/snafu")
)
+                                    {
+                                        addedLatch.countDown();
+                                    }
                                 }
                             }
                         }
-                    }
-                );
-            cache.rebuildTestExchanger = new Exchanger<Object>();
-            ExecutorService service = Executors.newSingleThreadExecutor();
-            final AtomicReference<String> deletedPath = new AtomicReference<String>();
-            Future<Object> future = service.submit
-                (
-                    new Callable<Object>()
-                    {
-                        @Override
-                        public Object call() throws Exception
+                    );
+                cache.rebuildTestExchanger = new Exchanger<Object>();
+                ExecutorService service = Executors.newSingleThreadExecutor();
+                final AtomicReference<String> deletedPath = new AtomicReference<String>();
+                Future<Object> future = service.submit
+                    (
+                        new Callable<Object>()
                         {
-                            cache.rebuildTestExchanger.exchange(new Object());
+                            @Override
+                            public Object call() throws Exception
+                            {
+                                cache.rebuildTestExchanger.exchange(new Object());
 
-                            // simulate another process adding a node while we're rebuilding
-                            client.create().forPath("/test/test");
+                                // simulate another process adding a node while we're rebuilding
+                                client.create().forPath("/test/test");
 
-                            List<ChildData> currentData = cache.getCurrentData();
-                            Assert.assertTrue(currentData.size() > 0);
+                                List<ChildData> currentData = cache.getCurrentData();
+                                Assert.assertTrue(currentData.size() > 0);
 
-                            // simulate another process removing a node while we're rebuilding
-                            client.delete().forPath(currentData.get(0).getPath());
-                            deletedPath.set(currentData.get(0).getPath());
+                                // simulate another process removing a node while we're rebuilding
+                                client.delete().forPath(currentData.get(0).getPath());
+                                deletedPath.set(currentData.get(0).getPath());
 
-                            cache.rebuildTestExchanger.exchange(new Object());
+                                cache.rebuildTestExchanger.exchange(new Object());
 
-                            ChildData childData = null;
-                            while ( childData == null )
-                            {
-                                childData = cache.getCurrentData("/test/snafu");
-                                Thread.sleep(1000);
-                            }
-                            Assert.assertEquals(childData.getData(), "original".getBytes());
-                            client.setData().forPath("/test/snafu", "grilled".getBytes());
+                                ChildData childData = null;
+                                while ( childData == null )
+                                {
+                                    childData = cache.getCurrentData("/test/snafu");
+                                    Thread.sleep(1000);
+                                }
+                                Assert.assertEquals(childData.getData(), "original".getBytes());
+                                client.setData().forPath("/test/snafu", "grilled".getBytes());
 
-                            cache.rebuildTestExchanger.exchange(new Object());
+                                cache.rebuildTestExchanger.exchange(new Object());
 
-                            return null;
+                                return null;
+                            }
                         }
-                    }
-                );
-            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
-            future.get();
-
-            Assert.assertTrue(timing.awaitLatch(addedLatch));
-            Assert.assertNotNull(cache.getCurrentData("/test/test"));
-            Assert.assertNull(cache.getCurrentData(deletedPath.get()));
-            Assert.assertEquals(cache.getCurrentData("/test/snafu").getData(), "grilled".getBytes());
+                    );
+                cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+                future.get();
 
-            cache.close();
+                Assert.assertTrue(timing.awaitLatch(addedLatch));
+                Assert.assertNotNull(cache.getCurrentData("/test/test"));
+                Assert.assertNull(cache.getCurrentData(deletedPath.get()));
+                Assert.assertEquals(cache.getCurrentData("/test/snafu").getData(), "grilled".getBytes());
+            }
         }
         finally
         {
@@ -653,7 +628,7 @@ public class TestPathChildrenCache extends BaseClassForTests
             client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
             Assert.assertTrue(timing.awaitLatch(childAddedLatch));
 
-            KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
+            KillSession.kill(client.getZookeeperClient().getZooKeeper());
             Assert.assertTrue(timing.awaitLatch(lostLatch));
             Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
             Assert.assertTrue(timing.awaitLatch(removedLatch));
@@ -695,9 +670,9 @@ public class TestPathChildrenCache extends BaseClassForTests
         Timing timing = new Timing();
         PathChildrenCache cache = null;
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
-        client.start();
         try
         {
+            client.start();
             client.create().creatingParentsIfNeeded().forPath("/test/one", "one".getBytes());
 
             final CountDownLatch latch = new CountDownLatch(1);
@@ -716,7 +691,7 @@ public class TestPathChildrenCache extends BaseClassForTests
             };
             cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
 
-            latch.await();
+            Assert.assertTrue(timing.awaitLatch(latch));
 
             int saveCounter = counter.get();
             client.setData().forPath("/test/one", "alt".getBytes());
@@ -725,6 +700,7 @@ public class TestPathChildrenCache extends BaseClassForTests
             Assert.assertEquals(saveCounter, counter.get());
 
             semaphore.release(1000);
+            timing.sleepABit();
         }
         finally
         {
@@ -735,44 +711,43 @@ public class TestPathChildrenCache extends BaseClassForTests
 
     private void internalTestMode(CuratorFramework client, boolean cacheData) throws Exception
     {
-        PathChildrenCache cache = new PathChildrenCache(client, "/test", cacheData);
-
-        final CountDownLatch latch = new CountDownLatch(2);
-        cache.getListenable().addListener
-            (
-                new PathChildrenCacheListener()
-                {
-                    @Override
-                    public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event) throws Exception
+        try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", cacheData)
)
+        {
+            final CountDownLatch latch = new CountDownLatch(2);
+            cache.getListenable().addListener
+                (
+                    new PathChildrenCacheListener()
                     {
-                        if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+                        @Override
+                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event) throws Exception
                         {
-                            latch.countDown();
+                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED
)
+                            {
+                                latch.countDown();
+                            }
                         }
                     }
-                }
-            );
-        cache.start();
+                );
+            cache.start();
 
-        client.create().forPath("/test/one", "one".getBytes());
-        client.create().forPath("/test/two", "two".getBytes());
-        Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+            client.create().forPath("/test/one", "one".getBytes());
+            client.create().forPath("/test/two", "two".getBytes());
+            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
 
-        for ( ChildData data : cache.getCurrentData() )
-        {
-            if ( cacheData )
-            {
-                Assert.assertNotNull(data.getData());
-                Assert.assertNotNull(data.getStat());
-            }
-            else
+            for ( ChildData data : cache.getCurrentData() )
             {
-                Assert.assertNull(data.getData());
-                Assert.assertNotNull(data.getStat());
+                if ( cacheData )
+                {
+                    Assert.assertNotNull(data.getData());
+                    Assert.assertNotNull(data.getStat());
+                }
+                else
+                {
+                    Assert.assertNull(data.getData());
+                    Assert.assertNotNull(data.getStat());
+                }
             }
         }
-
-        cache.close();
     }
 
     @Test
@@ -786,34 +761,34 @@ public class TestPathChildrenCache extends BaseClassForTests
             client.create().forPath("/test");
 
             final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
-            PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
-                    {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event) throws Exception
+            try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true)
)
+            {
+                cache.getListenable().addListener
+                    (
+                        new PathChildrenCacheListener()
                         {
-                            if ( event.getData().getPath().equals("/test/one") )
+                            @Override
+                            public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event) throws Exception
                             {
-                                events.offer(event.getType());
+                                if ( event.getData().getPath().equals("/test/one") )
+                                {
+                                    events.offer(event.getType());
+                                }
                             }
                         }
-                    }
-                );
-            cache.start();
-
-            client.create().forPath("/test/one", "hey there".getBytes());
-            Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_ADDED);
+                    );
+                cache.start();
 
-            client.setData().forPath("/test/one", "sup!".getBytes());
-            Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_UPDATED);
-            Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()),
"sup!");
+                client.create().forPath("/test/one", "hey there".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_ADDED);
 
-            client.delete().forPath("/test/one");
-            Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                client.setData().forPath("/test/one", "sup!".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()),
"sup!");
 
-            cache.close();
+                client.delete().forPath("/test/one");
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_REMOVED);
+            }
         }
         finally
         {
@@ -833,56 +808,58 @@ public class TestPathChildrenCache extends BaseClassForTests
 
             final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
             final ExecutorService exec = Executors.newSingleThreadExecutor();
-            PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false,
exec);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
-                    {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event) throws Exception
+            try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true,
false, exec) )
+            {
+                cache.getListenable().addListener
+                    (
+                        new PathChildrenCacheListener()
                         {
-                            if ( event.getData().getPath().equals("/test/one") )
+                            @Override
+                            public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event) throws Exception
                             {
-                                events.offer(event.getType());
+                                if ( event.getData().getPath().equals("/test/one") )
+                                {
+                                    events.offer(event.getType());
+                                }
                             }
                         }
-                    }
-                );
-            cache.start();
+                    );
+                cache.start();
 
-            final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
-            PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false,
exec);
-            cache2.getListenable().addListener(
-                    new PathChildrenCacheListener() {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event)
-                                throws Exception
+                final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
+                try ( PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true,
false, exec) )
+                {
+                    cache2.getListenable().addListener(
+                        new PathChildrenCacheListener()
                         {
-                            if ( event.getData().getPath().equals("/test/one") )
+                            @Override
+                            public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event)
+                                throws Exception
                             {
-                                events2.offer(event.getType());
+                                if ( event.getData().getPath().equals("/test/one") )
+                                {
+                                    events2.offer(event.getType());
+                                }
                             }
                         }
-                    }
-            );
-            cache2.start();
-
-            client.create().forPath("/test/one", "hey there".getBytes());
-            Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_ADDED);
-            Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_ADDED);
-
-            client.setData().forPath("/test/one", "sup!".getBytes());
-            Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_UPDATED);
-            Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_UPDATED);
-            Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()),
"sup!");
-            Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()),
"sup!");
-
-            client.delete().forPath("/test/one");
-            Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_REMOVED);
-            Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_REMOVED);
-
-            cache.close();
-            cache2.close();
+                                                      );
+                    cache2.start();
+
+                    client.create().forPath("/test/one", "hey there".getBytes());
+                    Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_ADDED);
+                    Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_ADDED);
+
+                    client.setData().forPath("/test/one", "sup!".getBytes());
+                    Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                    Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                    Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()),
"sup!");
+                    Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()),
"sup!");
+
+                    client.delete().forPath("/test/one");
+                    Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                    Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS),
PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                }
+            }
         }
         finally
         {
@@ -902,17 +879,17 @@ public class TestPathChildrenCache extends BaseClassForTests
             client.create().forPath("/test");
 
             final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
-            PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false,
exec);
-
-            cache.start();
-            client.create().forPath("/test/one", "hey there".getBytes());
+            try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true,
false, exec) )
+            {
+                cache.start();
+                client.create().forPath("/test/one", "hey there".getBytes());
 
-            cache.rebuild();
-            Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()),
"hey there");
-            Assert.assertTrue(exec.isExecuteCalled());
+                cache.rebuild();
+                Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()),
"hey there");
+                Assert.assertTrue(exec.isExecuteCalled());
 
-            exec.setExecuteCalled(false);
-            cache.close();
+                exec.setExecuteCalled(false);
+            }
             Assert.assertFalse(exec.isExecuteCalled());
 
             client.delete().forPath("/test/one");
@@ -940,28 +917,29 @@ public class TestPathChildrenCache extends BaseClassForTests
         try
         {
             final CountDownLatch latch = new CountDownLatch(1);
-            final PathChildrenCache cache = new PathChildrenCache(client, "/test", false)
{
+            try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test",
false) {
                 @Override
                 protected void handleException(Throwable e)
                 {
                     latch.countDown();
                 }
-            };
-            cache.start();
-
-            cache.offerOperation(new Operation()
+            } )
             {
+                cache.start();
 
-                @Override
-                public void invoke() throws Exception
+                cache.offerOperation(new Operation()
                 {
-                    Thread.sleep(5000);
-                }
-            });
 
-            Thread.sleep(1000);
+                    @Override
+                    public void invoke() throws Exception
+                    {
+                        Thread.sleep(5000);
+                    }
+                });
 
-            cache.close();
+                Thread.sleep(1000);
+
+            }
 
             latch.await(5, TimeUnit.SECONDS);
 


Mime
View raw message