curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [4/4] curator git commit: Finished merge of CURATOR-315
Date Sun, 08 May 2016 18:53:29 GMT
Finished merge of CURATOR-315


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/29906f1e
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/29906f1e
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/29906f1e

Branch: refs/heads/CURATOR-3.0
Commit: 29906f1ebb9130b91ec1d7034de9885af9f15d07
Parents: e76eb59 168dfd7
Author: randgalt <randgalt@apache.org>
Authored: Sun May 8 13:53:20 2016 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Sun May 8 13:53:20 2016 -0500

----------------------------------------------------------------------
 .../recipes/locks/InterProcessSemaphoreV2.java  |  8 ++-
 .../locks/TestInterProcessSemaphore.java        | 66 ++++++++++++++++++++
 2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/29906f1e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 36dbff4,3d96be2..d967b98
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@@ -352,38 -354,37 +355,40 @@@ public class InterProcessSemaphoreV
              PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
              String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath,
LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
              String nodeName = ZKPaths.getNodeFromPath(path);
-             builder.add(makeLease(path));
+             lease = makeLease(path);
  
 -            synchronized(this)
 +            try
              {
 -                for(;;)
 +                synchronized(this)
                  {
 -                    List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
 -                    if ( !children.contains(nodeName) )
 +                    for(;;)
                      {
 -                        log.error("Sequential path not found: " + path);
 -                        returnLease(lease);
 -                        return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
 -                    }
 -
 -                    if ( children.size() <= maxLeases )
 -                    {
 -                        break;
 -                    }
 -                    if ( hasWait )
 -                    {
 -                        long thisWaitMs = getThisWaitMs(startMs, waitMs);
 -                        if ( thisWaitMs <= 0 )
 +                        List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
 +                        if ( !children.contains(nodeName) )
                          {
 +                            log.error("Sequential path not found: " + path);
+                             returnLease(lease);
 -                            return InternalAcquireResult.RETURN_NULL;
 +                            return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
 +                        }
 +
 +                        if ( children.size() <= maxLeases )
 +                        {
 +                            break;
 +                        }
 +                        if ( hasWait )
 +                        {
 +                            long thisWaitMs = getThisWaitMs(startMs, waitMs);
 +                            if ( thisWaitMs <= 0 )
 +                            {
++                                returnLease(lease);
 +                                return InternalAcquireResult.RETURN_NULL;
 +                            }
 +                            wait(thisWaitMs);
 +                        }
 +                        else
 +                        {
 +                            wait();
                          }
 -                        wait(thisWaitMs);
 -                    }
 -                    else
 -                    {
 -                        wait();
                      }
                  }
              }

http://git-wip-us.apache.org/repos/asf/curator/blob/29906f1e/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 2797b5f,ad45d90..802290e
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@@ -20,14 -20,16 +20,17 @@@
  package org.apache.curator.framework.recipes.locks;
  
  import com.google.common.collect.Lists;
 -import org.apache.curator.framework.api.CuratorWatcher;
 -import org.apache.curator.test.BaseClassForTests;
 -import org.apache.curator.utils.CloseableUtils;
  import org.apache.curator.framework.CuratorFramework;
  import org.apache.curator.framework.CuratorFrameworkFactory;
++import org.apache.curator.framework.api.CuratorWatcher;
 +import org.apache.curator.framework.imps.TestCleanState;
  import org.apache.curator.framework.recipes.shared.SharedCount;
  import org.apache.curator.retry.RetryOneTime;
 +import org.apache.curator.test.BaseClassForTests;
  import org.apache.curator.test.Timing;
 +import org.apache.curator.utils.CloseableUtils;
+ import org.apache.zookeeper.WatchedEvent;
+ import org.apache.zookeeper.Watcher;
  import org.testng.Assert;
  import org.testng.annotations.Test;
  import java.util.Collection;
@@@ -531,7 -531,110 +534,70 @@@ public class TestInterProcessSemaphore 
              {
                  CloseableUtils.closeQuietly(l);
              }
 -            CloseableUtils.closeQuietly(client);
 +            TestCleanState.closeAndTestClean(client);
          }
      }
+ 
 -    @Test
 -    public void testChildReaperCleansUpLockNodes() throws Exception
 -    {
 -        Timing timing = new Timing();
 -        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
 -        client.start();
 -
 -        ChildReaper childReaper = null;
 -        try
 -        {
 -            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test/lock",
1);
 -            semaphore.returnLease(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
 -
 -            Assert.assertTrue(client.getChildren().forPath("/test").size() > 0);
 -
 -            childReaper = new ChildReaper(
 -                    client,
 -                    "/test",
 -                    Reaper.Mode.REAP_UNTIL_GONE,
 -                    ChildReaper.newExecutorService(),
 -                    1,
 -                    "/test-leader",
 -                    InterProcessSemaphoreV2.LOCK_SCHEMA
 -            );
 -            childReaper.start();
 -
 -            timing.forWaiting().sleepABit();
 -
 -            List<String> children = client.getChildren().forPath("/test");
 -
 -            Assert.assertEquals(children.size(), 0, "All children of /test should have been
reaped");
 -        }
 -        finally
 -        {
 -            CloseableUtils.closeQuietly(childReaper);
 -            CloseableUtils.closeQuietly(client);
 -        }
 -
 -    }
+ 
+     @Test
+     public void testNoOrphanedNodes() throws Exception
+     {
+         final Timing timing = new Timing();
+         final ExecutorService executor = Executors.newFixedThreadPool(1);
+         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+         client.start();
+         try
+         {
+             final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client,
"/test", 1);
+             Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+             Assert.assertNotNull(lease);
+             final List<String> childNodes = client.getChildren().forPath("/test/leases");
+             Assert.assertEquals(childNodes.size(), 1);
+ 
+             final CountDownLatch nodeCreatedLatch = new CountDownLatch(1);
+             client.getChildren().usingWatcher(new CuratorWatcher() {
+                 @Override
+                 public void process(WatchedEvent event) throws Exception {
+                     if (event.getType() == Watcher.Event.EventType.NodeCreated) {
+                         nodeCreatedLatch.countDown();
+                     }
+                 }
+             }).forPath("/test/leases");
+ 
+             final Future<Lease> leaseFuture = executor.submit(new Callable<Lease>()
{
+                 @Override
+                 public Lease call() throws Exception {
+                     return semaphore.acquire(timing.forWaiting().multiple(2).seconds(),
TimeUnit.SECONDS);
+                 }
+             });
+ 
+             // wait for second lease to create its node
+             timing.awaitLatch(nodeCreatedLatch);
+             String newNode = null;
+             for (String c : client.getChildren().forPath("/test/leases")) {
+                 if (!childNodes.contains(c)) {
+                     newNode = c;
+                 }
+             }
+             Assert.assertNotNull(newNode);
+ 
+             // delete the ephemeral node to trigger a retry
+             client.delete().forPath("/test/leases/" + newNode);
+ 
+             // release first lease so second one can be acquired
+             lease.close();
+             lease = leaseFuture.get();
+             Assert.assertNotNull(lease);
+             lease.close();
+             Assert.assertEquals(client.getChildren().forPath("/test/leases").size(), 0);
+ 
+             // no more lease exist. must be possible to acquire a new one
+             Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+         }
+         finally
+         {
 -            client.close();
+             executor.shutdownNow();
++            TestCleanState.closeAndTestClean(client);
+         }
+     }
 -
  }


Mime
View raw message