Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C294E200AC8 for ; Sun, 8 May 2016 20:44:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C1305160A06; Sun, 8 May 2016 18:44:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E46601609B2 for ; Sun, 8 May 2016 20:44:29 +0200 (CEST) Received: (qmail 72183 invoked by uid 500); 8 May 2016 18:44:29 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 72165 invoked by uid 99); 8 May 2016 18:44:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 08 May 2016 18:44:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D1F5ADFE04; Sun, 8 May 2016 18:44:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.apache.org Date: Sun, 08 May 2016 18:44:28 -0000 Message-Id: <64c693b0866b4963956fe965fb586e82@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] curator git commit: Return leases before retrying aquire archived-at: Sun, 08 May 2016 18:44:30 -0000 Repository: curator Updated Branches: refs/heads/master 0ef5e454a -> 168dfd734 Return leases before retrying aquire Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/613a51be Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/613a51be Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/613a51be Branch: refs/heads/master Commit: 613a51be4535544e990d31e485674e237bd7e2da Parents: 73cd00a Author: Ulrich Geilmann Authored: Mon Apr 11 15:32:29 2016 +0200 Committer: Ulrich Geilmann Committed: Mon Apr 11 23:02:28 2016 +0200 ---------------------------------------------------------------------- .../recipes/locks/InterProcessSemaphoreV2.java | 8 ++- .../locks/TestInterProcessSemaphore.java | 66 ++++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/613a51be/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java index 8524075..3d96be2 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java @@ -346,12 +346,15 @@ public class InterProcessSemaphoreV2 { lock.acquire(); } + + Lease lease = null; + try { PathAndBytesable createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL); String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME)); String nodeName = ZKPaths.getNodeFromPath(path); - builder.add(makeLease(path)); + lease = makeLease(path); synchronized(this) { @@ -361,6 +364,7 @@ public class InterProcessSemaphoreV2 if ( !children.contains(nodeName) ) { log.error("Sequential path not found: " + path); + returnLease(lease); return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE; } @@ -373,6 +377,7 @@ public class InterProcessSemaphoreV2 long thisWaitMs = getThisWaitMs(startMs, waitMs); if ( thisWaitMs <= 0 ) { + returnLease(lease); return InternalAcquireResult.RETURN_NULL; } wait(thisWaitMs); @@ -388,6 +393,7 @@ public class InterProcessSemaphoreV2 { lock.release(); } + builder.add(Preconditions.checkNotNull(lease)); return InternalAcquireResult.CONTINUE; } http://git-wip-us.apache.org/repos/asf/curator/blob/613a51be/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java index 631b7c7..ad45d90 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java @@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.locks; import com.google.common.collect.Lists; +import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; @@ -27,6 +28,8 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.Timing; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.testng.Assert; import org.testng.annotations.Test; import java.util.Collection; @@ -571,4 +574,67 @@ public class TestInterProcessSemaphore extends BaseClassForTests } } + + @Test + public void testNoOrphanedNodes() throws Exception + { + final Timing timing = new Timing(); + final ExecutorService executor = Executors.newFixedThreadPool(1); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + client.start(); + try + { + final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", 1); + Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS); + Assert.assertNotNull(lease); + final List childNodes = client.getChildren().forPath("/test/leases"); + Assert.assertEquals(childNodes.size(), 1); + + final CountDownLatch nodeCreatedLatch = new CountDownLatch(1); + client.getChildren().usingWatcher(new CuratorWatcher() { + @Override + public void process(WatchedEvent event) throws Exception { + if (event.getType() == Watcher.Event.EventType.NodeCreated) { + nodeCreatedLatch.countDown(); + } + } + }).forPath("/test/leases"); + + final Future leaseFuture = executor.submit(new Callable() { + @Override + public Lease call() throws Exception { + return semaphore.acquire(timing.forWaiting().multiple(2).seconds(), TimeUnit.SECONDS); + } + }); + + // wait for second lease to create its node + timing.awaitLatch(nodeCreatedLatch); + String newNode = null; + for (String c : client.getChildren().forPath("/test/leases")) { + if (!childNodes.contains(c)) { + newNode = c; + } + } + Assert.assertNotNull(newNode); + + // delete the ephemeral node to trigger a retry + client.delete().forPath("/test/leases/" + newNode); + + // release first lease so second one can be acquired + lease.close(); + lease = leaseFuture.get(); + Assert.assertNotNull(lease); + lease.close(); + Assert.assertEquals(client.getChildren().forPath("/test/leases").size(), 0); + + // no more lease exist. must be possible to acquire a new one + Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS)); + } + finally + { + client.close(); + executor.shutdownNow(); + } + } + }