curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dragonsi...@apache.org
Subject [29/31] curator git commit: Added WatcherRemoveCuratorFramework to locks and updated tests to check for cleanliness
Date Tue, 18 Aug 2015 23:18:45 GMT
Added WatcherRemoveCuratorFramework to locks and updated tests to check for cleanliness


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f0a09db4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f0a09db4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f0a09db4

Branch: refs/heads/CURATOR-3.0
Commit: f0a09db4423f06455ed93c20778c65aaf7e8b06e
Parents: 2c921d6
Author: randgalt <randgalt@apache.org>
Authored: Tue May 19 22:42:14 2015 -0700
Committer: randgalt <randgalt@apache.org>
Committed: Tue May 19 22:42:14 2015 -0700

----------------------------------------------------------------------
 .../framework/imps/WatcherRemovalFacade.java    |   3 +-
 .../locks/InterProcessSemaphoreMutex.java       |   6 +-
 .../recipes/locks/InterProcessSemaphoreV2.java  |  54 +++--
 .../framework/recipes/locks/LockInternals.java  |   9 +-
 .../locks/TestInterProcessMultiMutex.java       |   7 +-
 .../recipes/locks/TestInterProcessMutex.java    |   5 +-
 .../locks/TestInterProcessMutexBase.java        |  23 +-
 .../locks/TestInterProcessReadWriteLock.java    | 223 +++++++++++--------
 .../locks/TestInterProcessSemaphore.java        |  27 ++-
 .../locks/TestInterProcessSemaphoreCluster.java |   3 +-
 .../framework/recipes/locks/TestLockACLs.java   |   3 +-
 .../locks/TestLockCleanlinessWithFaults.java    |   3 +-
 12 files changed, 213 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
index eee423f..156341e 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import org.apache.curator.CuratorZookeeperClient;
@@ -45,7 +46,7 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove
     @Override
     public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework()
     {
-        throw new UnsupportedOperationException();
+        return client.newWatcherRemoveCuratorFramework();
     }
 
     WatcherRemovalManager getRemovalManager()

http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
index 88b5f5d..444b10d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 public class InterProcessSemaphoreMutex implements InterProcessLock
 {
     private final InterProcessSemaphoreV2 semaphore;
+    private final WatcherRemoveCuratorFramework watcherRemoveClient;
     private volatile Lease lease;
 
     /**
@@ -37,7 +39,8 @@ public class InterProcessSemaphoreMutex implements InterProcessLock
      */
     public InterProcessSemaphoreMutex(CuratorFramework client, String path)
     {
-        this.semaphore = new InterProcessSemaphoreV2(client, path, 1);
+        watcherRemoveClient = client.newWatcherRemoveCuratorFramework();
+        this.semaphore = new InterProcessSemaphoreV2(watcherRemoveClient, path, 1);
     }
 
     @Override
@@ -66,6 +69,7 @@ public class InterProcessSemaphoreMutex implements InterProcessLock
         try
         {
             lease.close();
+            watcherRemoveClient.removeWatchers();
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 2e14ee1..2a55107 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.RetryLoop;
 import org.apache.curator.framework.CuratorFramework;
@@ -75,7 +76,7 @@ public class InterProcessSemaphoreV2
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final InterProcessMutex lock;
-    private final CuratorFramework client;
+    private final WatcherRemoveCuratorFramework client;
     private final String leasesPath;
     private final Watcher watcher = new Watcher()
     {
@@ -115,7 +116,7 @@ public class InterProcessSemaphoreV2
 
     private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases,
SharedCountReader count)
     {
-        this.client = client;
+        this.client = client.newWatcherRemoveCuratorFramework();
         path = PathUtils.validatePath(path);
         lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
         this.maxLeases = (count != null) ? count.getCount() : maxLeases;
@@ -345,36 +346,43 @@ public class InterProcessSemaphoreV2
             String nodeName = ZKPaths.getNodeFromPath(path);
             builder.add(makeLease(path));
 
-            synchronized(this)
+            try
             {
-                for(;;)
+                synchronized(this)
                 {
-                    List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
-                    if ( !children.contains(nodeName) )
+                    for(;;)
                     {
-                        log.error("Sequential path not found: " + path);
-                        return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
-                    }
+                        List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
+                        if ( !children.contains(nodeName) )
+                        {
+                            log.error("Sequential path not found: " + path);
+                            return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
+                        }
 
-                    if ( children.size() <= maxLeases )
-                    {
-                        break;
-                    }
-                    if ( hasWait )
-                    {
-                        long thisWaitMs = getThisWaitMs(startMs, waitMs);
-                        if ( thisWaitMs <= 0 )
+                        if ( children.size() <= maxLeases )
                         {
-                            return InternalAcquireResult.RETURN_NULL;
+                            break;
+                        }
+                        if ( hasWait )
+                        {
+                            long thisWaitMs = getThisWaitMs(startMs, waitMs);
+                            if ( thisWaitMs <= 0 )
+                            {
+                                return InternalAcquireResult.RETURN_NULL;
+                            }
+                            wait(thisWaitMs);
+                        }
+                        else
+                        {
+                            wait();
                         }
-                        wait(thisWaitMs);
-                    }
-                    else
-                    {
-                        wait();
                     }
                 }
             }
+            finally
+            {
+                client.removeWatchers();
+            }
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
index 2b4d3d9..4b0da11 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
@@ -24,11 +24,11 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import org.apache.curator.RetryLoop;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.utils.PathUtils;
 import org.apache.curator.utils.ZKPaths;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -42,7 +42,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class LockInternals
 {
-    private final CuratorFramework                  client;
+    private final WatcherRemoveCuratorFramework     client;
     private final String                            path;
     private final String                            basePath;
     private final LockInternalsDriver               driver;
@@ -100,7 +100,7 @@ public class LockInternals
         this.lockName = lockName;
         this.maxLeases = maxLeases;
 
-        this.client = client;
+        this.client = client.newWatcherRemoveCuratorFramework();
         this.basePath = PathUtils.validatePath(path);
         this.path = ZKPaths.makePath(path, lockName);
     }
@@ -116,8 +116,9 @@ public class LockInternals
         revocable.set(entry);
     }
 
-    void releaseLock(String lockPath) throws Exception
+    final void releaseLock(String lockPath) throws Exception
     {
+        client.removeWatchers();
         revocable.set(null);
         deleteOurPath(lockPath);
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
index b1631a0..df6a2f5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.locks;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.retry.RetryOneTime;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -84,13 +85,14 @@ public class TestInterProcessMultiMutex extends TestInterProcessMutexBase
             }
             catch ( Exception e )
             {
+                // ignore
             }
             Assert.assertFalse(goodLock.isAcquiredInThisProcess());
             Assert.assertTrue(otherGoodLock.isAcquiredInThisProcess());
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -142,13 +144,14 @@ public class TestInterProcessMultiMutex extends TestInterProcessMutexBase
             }
             catch ( Exception e )
             {
+                // ignore
             }
             Assert.assertFalse(goodLock.isAcquiredInThisProcess());
             Assert.assertTrue(goodLockWasLocked.get());
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
index 453de33..d6f8a1d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.locks;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.KillSession;
 import org.apache.zookeeper.CreateMode;
@@ -106,7 +107,7 @@ public class TestInterProcessMutex extends TestInterProcessMutexBase
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -151,7 +152,7 @@ public class TestInterProcessMutex extends TestInterProcessMutexBase
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
index 3fe8110..49e5d19 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
@@ -20,16 +20,14 @@
 package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.KillSession;
-import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -123,7 +121,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -199,7 +197,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -265,7 +263,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -311,7 +309,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -328,7 +326,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -460,11 +458,14 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
                 Assert.assertTrue(acquiredLatchForClient1.await(10, TimeUnit.SECONDS));
                 Assert.assertTrue(mutexForClient1.isAcquiredInThisProcess());
             }
+
+            future1.get();
+            future2.get();
         }
         finally
         {
-            CloseableUtils.closeQuietly(client1);
-            CloseableUtils.closeQuietly(client2);
+            TestCleanState.closeAndTestClean(client1);
+            TestCleanState.closeAndTestClean(client2);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
index f7636ed..48e4805 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
@@ -16,14 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Collection;
@@ -31,6 +32,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -40,21 +42,22 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class TestInterProcessReadWriteLock extends BaseClassForTests
 {
     @Test
-    public void     testGetParticipantNodes() throws Exception
+    public void testGetParticipantNodes() throws Exception
     {
-        final int               READERS = 20;
-        final int               WRITERS = 8;
+        final int READERS = 20;
+        final int WRITERS = 8;
 
-        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
         try
         {
             client.start();
 
-            final CountDownLatch              latch = new CountDownLatch(READERS + WRITERS);
-            final CountDownLatch              readLatch = new CountDownLatch(READERS);
-            final InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client,
"/lock");
+            final CountDownLatch latch = new CountDownLatch(READERS + WRITERS);
+            final CountDownLatch readLatch = new CountDownLatch(READERS);
+            final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client,
"/lock");
 
-            ExecutorService                   service = Executors.newCachedThreadPool();
+            final CountDownLatch exitLatch = new CountDownLatch(1);
+            ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
             for ( int i = 0; i < READERS; ++i )
             {
                 service.submit
@@ -65,8 +68,16 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
                         public Void call() throws Exception
                         {
                             lock.readLock().acquire();
-                            latch.countDown();
-                            readLatch.countDown();
+                            try
+                            {
+                                latch.countDown();
+                                readLatch.countDown();
+                                exitLatch.await();
+                            }
+                            finally
+                            {
+                                lock.readLock().release();
+                            }
                             return null;
                         }
                     }
@@ -84,6 +95,14 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
                             Assert.assertTrue(readLatch.await(10, TimeUnit.SECONDS));
                             latch.countDown();  // must be before as there can only be one
writer
                             lock.writeLock().acquire();
+                            try
+                            {
+                                exitLatch.await();
+                            }
+                            finally
+                            {
+                                lock.writeLock().release();
+                            }
                             return null;
                         }
                     }
@@ -97,22 +116,28 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
 
             Assert.assertEquals(readers.size(), READERS);
             Assert.assertEquals(writers.size(), WRITERS);
+
+            exitLatch.countDown();
+            for ( int i = 0; i < (READERS + WRITERS); ++i )
+            {
+                service.take().get();
+            }
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
     @Test
-    public void     testThatUpgradingIsDisallowed() throws Exception
+    public void testThatUpgradingIsDisallowed() throws Exception
     {
-        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
         try
         {
             client.start();
 
-            InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client, "/lock");
+            InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
             lock.readLock().acquire();
             Assert.assertFalse(lock.writeLock().acquire(5, TimeUnit.SECONDS));
 
@@ -120,70 +145,80 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
     @Test
-    public void     testThatDowngradingRespectsThreads() throws Exception
+    public void testThatDowngradingRespectsThreads() throws Exception
     {
-        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
         try
         {
             client.start();
 
-            final InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client,
"/lock");
-            ExecutorService                   t1 = Executors.newSingleThreadExecutor();
-            ExecutorService                   t2 = Executors.newSingleThreadExecutor();
+            final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client,
"/lock");
+            ExecutorService t1 = Executors.newSingleThreadExecutor();
+            ExecutorService t2 = Executors.newSingleThreadExecutor();
 
-            final CountDownLatch              latch = new CountDownLatch(1);
+            final CountDownLatch latch = new CountDownLatch(1);
 
-            Future<Object>                    f1 = t1.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
+            final CountDownLatch releaseLatch = new CountDownLatch(1);
+            Future<Object> f1 = t1.submit
+                (
+                    new Callable<Object>()
                     {
-                        lock.writeLock().acquire();
-                        latch.countDown();
-                        return null;
+                        @Override
+                        public Object call() throws Exception
+                        {
+                            lock.writeLock().acquire();
+                            latch.countDown();
+                            try
+                            {
+                                releaseLatch.await();
+                            }
+                            finally
+                            {
+                                lock.writeLock().release();
+                            }
+                            return null;
+                        }
                     }
-                }
-            );
+                );
 
-            Future<Object>                    f2 = t2.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
+            Future<Object> f2 = t2.submit
+                (
+                    new Callable<Object>()
                     {
-                        Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
-                        Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
-                        return null;
+                        @Override
+                        public Object call() throws Exception
+                        {
+                            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+                            Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
+                            return null;
+                        }
                     }
-                }
-            );
+                );
 
-            f1.get();
             f2.get();
+            releaseLatch.countDown();
+            f1.get();
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
     @Test
-    public void     testDowngrading() throws Exception
+    public void testDowngrading() throws Exception
     {
-        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
         try
         {
             client.start();
 
-            InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client, "/lock");
+            InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
             lock.writeLock().acquire();
             Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS));
             lock.writeLock().release();
@@ -192,60 +227,60 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
     @Test
-    public void     testBasic() throws Exception
+    public void testBasic() throws Exception
     {
-        final int               CONCURRENCY = 8;
-        final int               ITERATIONS = 100;
+        final int CONCURRENCY = 8;
+        final int ITERATIONS = 100;
 
-        final Random            random = new Random();
-        final AtomicInteger     concurrentCount = new AtomicInteger(0);
-        final AtomicInteger     maxConcurrentCount = new AtomicInteger(0);
-        final AtomicInteger     writeCount = new AtomicInteger(0);
-        final AtomicInteger     readCount = new AtomicInteger(0);
+        final Random random = new Random();
+        final AtomicInteger concurrentCount = new AtomicInteger(0);
+        final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
+        final AtomicInteger writeCount = new AtomicInteger(0);
+        final AtomicInteger readCount = new AtomicInteger(0);
 
-        List<Future<Void>>  futures = Lists.newArrayList();
-        ExecutorService     service = Executors.newCachedThreadPool();
+        List<Future<Void>> futures = Lists.newArrayList();
+        ExecutorService service = Executors.newCachedThreadPool();
         for ( int i = 0; i < CONCURRENCY; ++i )
         {
-            Future<Void>    future = service.submit
-            (
-                new Callable<Void>()
-                {
-                    @Override
-                    public Void call() throws Exception
+            Future<Void> future = service.submit
+                (
+                    new Callable<Void>()
                     {
-                        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
-                        client.start();
-                        try
+                        @Override
+                        public Void call() throws Exception
                         {
-                            InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client,
"/lock");
-                            for ( int i = 0; i < ITERATIONS; ++i )
+                            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+                            client.start();
+                            try
                             {
-                                if ( random.nextInt(100) < 10 )
+                                InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client,
"/lock");
+                                for ( int i = 0; i < ITERATIONS; ++i )
                                 {
-                                    doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount,
random, 1);
-                                    writeCount.incrementAndGet();
-                                }
-                                else
-                                {
-                                    doLocking(lock.readLock(), concurrentCount, maxConcurrentCount,
random, Integer.MAX_VALUE);
-                                    readCount.incrementAndGet();
+                                    if ( random.nextInt(100) < 10 )
+                                    {
+                                        doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount,
random, 1);
+                                        writeCount.incrementAndGet();
+                                    }
+                                    else
+                                    {
+                                        doLocking(lock.readLock(), concurrentCount, maxConcurrentCount,
random, Integer.MAX_VALUE);
+                                        readCount.incrementAndGet();
+                                    }
                                 }
                             }
+                            finally
+                            {
+                                TestCleanState.closeAndTestClean(client);
+                            }
+                            return null;
                         }
-                        finally
-                        {
-                            CloseableUtils.closeQuietly(client);
-                        }
-                        return null;
                     }
-                }
-            );
+                );
             futures.add(future);
         }
 
@@ -262,17 +297,17 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
     }
 
     @Test
-    public void     testSetNodeData() throws Exception
+    public void testSetNodeData() throws Exception
     {
-        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
 
         try
         {
             client.start();
 
-            final byte[] nodeData = new byte[] { 1, 2, 3, 4 };
+            final byte[] nodeData = new byte[]{1, 2, 3, 4};
 
-            InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client, "/lock",
nodeData);
+            InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock",
nodeData);
 
             // mutate passed-in node data, lock has made copy
             nodeData[0] = 5;
@@ -284,13 +319,13 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
 
             byte dataInZk[] = client.getData().forPath("/lock/" + children.get(0));
             Assert.assertNotNull(dataInZk);
-            Assert.assertEquals(new byte[] { 1, 2, 3, 4 }, dataInZk);
+            Assert.assertEquals(new byte[]{1, 2, 3, 4}, dataInZk);
 
             lock.writeLock().release();
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -299,7 +334,7 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
         try
         {
             Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS));
-            int     localConcurrentCount;
+            int localConcurrentCount;
             synchronized(this)
             {
                 localConcurrentCount = concurrentCount.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index dd3f98f..2797b5f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -20,13 +20,14 @@
 package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.framework.recipes.shared.SharedCount;
 import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Collection;
@@ -100,10 +101,12 @@ public class TestInterProcessSemaphore extends BaseClassForTests
 
             future1.get();
             future2.get();
+
+            count.close();
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -140,8 +143,8 @@ public class TestInterProcessSemaphore extends BaseClassForTests
         }
         finally
         {
-            CloseableUtils.closeQuietly(client1);
-            CloseableUtils.closeQuietly(client2);
+            TestCleanState.closeAndTestClean(client1);
+            TestCleanState.closeAndTestClean(client2);
         }
     }
 
@@ -226,7 +229,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
                                     }
                                     finally
                                     {
-                                        client.close();
+                                        TestCleanState.closeAndTestClean(client);
                                     }
                                     return null;
                                 }
@@ -299,7 +302,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
                             }
                             finally
                             {
-                                client.close();
+                                TestCleanState.closeAndTestClean(client);
                             }
                             return null;
                         }
@@ -401,7 +404,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -445,7 +448,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -463,7 +466,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -499,7 +502,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
             {
                 CloseableUtils.closeQuietly(l);
             }
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -528,7 +531,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
             {
                 CloseableUtils.closeQuietly(l);
             }
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
index 2aa8a72..f4cb7bb 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
 import org.apache.curator.ensemble.EnsembleProvider;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
@@ -147,7 +148,7 @@ public class TestInterProcessSemaphoreCluster
                             }
                             finally
                             {
-                                CloseableUtils.closeQuietly(client);
+                                TestCleanState.closeAndTestClean(client);
                             }
                             return null;
                         }

http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java
index 2d9a9aa..d1e6db5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java
@@ -19,6 +19,7 @@
 
 package org.apache.curator.framework.recipes.locks;
 
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.RetryPolicy;
@@ -74,7 +75,7 @@ public class TestLockACLs extends BaseClassForTests
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java
index 457be75..dc14c11 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.locks;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.utils.CloseableUtils;
@@ -67,7 +68,7 @@ public class TestLockCleanlinessWithFaults extends BaseClassForTests
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 }


Mime
View raw message