curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [1/2] curator git commit: Return leases before retrying aquire
Date Sun, 08 May 2016 18:44:28 GMT
Repository: curator
Updated Branches:
  refs/heads/master 0ef5e454a -> 168dfd734


Return leases before retrying aquire


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/613a51be
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/613a51be
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/613a51be

Branch: refs/heads/master
Commit: 613a51be4535544e990d31e485674e237bd7e2da
Parents: 73cd00a
Author: Ulrich Geilmann <ugeilmann@googlemail.com>
Authored: Mon Apr 11 15:32:29 2016 +0200
Committer: Ulrich Geilmann <ugeilmann@googlemail.com>
Committed: Mon Apr 11 23:02:28 2016 +0200

----------------------------------------------------------------------
 .../recipes/locks/InterProcessSemaphoreV2.java  |  8 ++-
 .../locks/TestInterProcessSemaphore.java        | 66 ++++++++++++++++++++
 2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/613a51be/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 8524075..3d96be2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -346,12 +346,15 @@ public class InterProcessSemaphoreV2
         {
             lock.acquire();
         }
+
+        Lease lease = null;
+
         try
         {
             PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
             String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath,
LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
             String nodeName = ZKPaths.getNodeFromPath(path);
-            builder.add(makeLease(path));
+            lease = makeLease(path);
 
             synchronized(this)
             {
@@ -361,6 +364,7 @@ public class InterProcessSemaphoreV2
                     if ( !children.contains(nodeName) )
                     {
                         log.error("Sequential path not found: " + path);
+                        returnLease(lease);
                         return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
                     }
 
@@ -373,6 +377,7 @@ public class InterProcessSemaphoreV2
                         long thisWaitMs = getThisWaitMs(startMs, waitMs);
                         if ( thisWaitMs <= 0 )
                         {
+                            returnLease(lease);
                             return InternalAcquireResult.RETURN_NULL;
                         }
                         wait(thisWaitMs);
@@ -388,6 +393,7 @@ public class InterProcessSemaphoreV2
         {
             lock.release();
         }
+        builder.add(Preconditions.checkNotNull(lease));
         return InternalAcquireResult.CONTINUE;
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/613a51be/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 631b7c7..ad45d90 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -20,6 +20,7 @@
 package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.collect.Lists;
+import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -27,6 +28,8 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.shared.SharedCount;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Collection;
@@ -571,4 +574,67 @@ public class TestInterProcessSemaphore extends BaseClassForTests
         }
 
     }
+
+    @Test
+    public void testNoOrphanedNodes() throws Exception
+    {
+        final Timing timing = new Timing();
+        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client,
"/test", 1);
+            Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+            Assert.assertNotNull(lease);
+            final List<String> childNodes = client.getChildren().forPath("/test/leases");
+            Assert.assertEquals(childNodes.size(), 1);
+
+            final CountDownLatch nodeCreatedLatch = new CountDownLatch(1);
+            client.getChildren().usingWatcher(new CuratorWatcher() {
+                @Override
+                public void process(WatchedEvent event) throws Exception {
+                    if (event.getType() == Watcher.Event.EventType.NodeCreated) {
+                        nodeCreatedLatch.countDown();
+                    }
+                }
+            }).forPath("/test/leases");
+
+            final Future<Lease> leaseFuture = executor.submit(new Callable<Lease>()
{
+                @Override
+                public Lease call() throws Exception {
+                    return semaphore.acquire(timing.forWaiting().multiple(2).seconds(), TimeUnit.SECONDS);
+                }
+            });
+
+            // wait for second lease to create its node
+            timing.awaitLatch(nodeCreatedLatch);
+            String newNode = null;
+            for (String c : client.getChildren().forPath("/test/leases")) {
+                if (!childNodes.contains(c)) {
+                    newNode = c;
+                }
+            }
+            Assert.assertNotNull(newNode);
+
+            // delete the ephemeral node to trigger a retry
+            client.delete().forPath("/test/leases/" + newNode);
+
+            // release first lease so second one can be acquired
+            lease.close();
+            lease = leaseFuture.get();
+            Assert.assertNotNull(lease);
+            lease.close();
+            Assert.assertEquals(client.getChildren().forPath("/test/leases").size(), 0);
+
+            // no more lease exist. must be possible to acquire a new one
+            Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+        }
+        finally
+        {
+            client.close();
+            executor.shutdownNow();
+        }
+    }
+
 }


Mime
View raw message