curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cammcken...@apache.org
Subject [32/50] curator git commit: Adding the notion of a 'lock schema' to ChildReaper that enables it to reap both the direct children its watching and subnodes of those children. This is necessary with InterProcessSemaphoreV2 as it creates multiple subnodes
Date Tue, 21 Apr 2015 23:07:29 GMT
Adding the notion of a 'lock schema' to ChildReaper that enables it to reap both the direct
children its watching and subnodes of those children.  This is necessary with InterProcessSemaphoreV2
as it creates multiple subnodes beneath its lock nodes and otherwise is unreapable with ChildReaper


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/72aea4a3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/72aea4a3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/72aea4a3

Branch: refs/heads/CURATOR-154
Commit: 72aea4a30b36201fe2a673358c1e062d6b5109a7
Parents: 49eb02a
Author: David Kesler <dkesler@yodle.com>
Authored: Mon Feb 9 16:34:20 2015 -0500
Committer: David Kesler <dkesler@yodle.com>
Committed: Mon Feb 9 16:34:20 2015 -0500

----------------------------------------------------------------------
 .../framework/recipes/locks/ChildReaper.java    | 35 +++++++++++++++--
 .../recipes/locks/InterProcessSemaphoreV2.java  |  8 ++++
 .../framework/recipes/locks/LockSchema.java     | 22 +++++++++++
 .../locks/TestInterProcessSemaphore.java        | 40 ++++++++++++++++++++
 4 files changed, 101 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
index 56c56ab..7935f0b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
@@ -55,6 +55,7 @@ public class ChildReaper implements Closeable
     private final CloseableScheduledExecutorService executor;
     private final int reapingThresholdMs;
     private final LeaderLatch leaderLatch;
+    private final LockSchema lockSchema;
 
     private volatile Future<?> task;
 
@@ -108,6 +109,21 @@ public class ChildReaper implements Closeable
      */
     public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService
executor, int reapingThresholdMs, String leaderPath)
     {
+        this(client, path, mode, executor, reapingThresholdMs, leaderPath, new LockSchema());
+    }
+
+
+    /**
+     * @param client the client
+     * @param path path to reap children from
+     * @param executor executor to use for background tasks
+     * @param reapingThresholdMs threshold in milliseconds that determines that a path can
be deleted
+     * @param mode reaping mode
+     * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active
in the cluster
+     * @param lockSchema a set of the possible subnodes of the children of path that must
be reaped in addition to the child nodes
+     */
+    public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService
executor, int reapingThresholdMs, String leaderPath, LockSchema lockSchema)
+    {
         this.client = client;
         this.mode = mode;
         this.executor = new CloseableScheduledExecutorService(executor);
@@ -121,6 +137,7 @@ public class ChildReaper implements Closeable
             leaderLatch = null;
         }
         this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderLatch);
+        this.lockSchema = lockSchema;
         addPath(path);
     }
 
@@ -207,12 +224,13 @@ public class ChildReaper implements Closeable
                     List<String> children = client.getChildren().forPath(path);
                     for ( String name : children )
                     {
-                        String thisPath = ZKPaths.makePath(path, name);
-                        Stat stat = client.checkExists().forPath(thisPath);
-                        if ( (stat != null) && (stat.getNumChildren() == 0) )
+                        String childPath = ZKPaths.makePath(path, name);
+                        addPathToReaperIfEmpty(childPath);
+                        for ( String subNode : lockSchema.getPaths() )
                         {
-                            reaper.addPath(thisPath, mode);
+                            addPathToReaperIfEmpty(ZKPaths.makePath(childPath, subNode));
                         }
+
                     }
                 }
                 catch ( Exception e )
@@ -223,6 +241,15 @@ public class ChildReaper implements Closeable
         }
     }
 
+    private void addPathToReaperIfEmpty(String path) throws Exception
+    {
+        Stat stat = client.checkExists().forPath(path);
+        if ( (stat != null) && (stat.getNumChildren() == 0) )
+        {
+            reaper.addPath(path, mode);
+        }
+    }
+
     private boolean shouldDoWork()
     {
         return this.leaderLatch == null || this.leaderLatch.hasLeadership();

http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 2e14ee1..55647ad 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -21,6 +21,8 @@ package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.RetryLoop;
 import org.apache.curator.framework.CuratorFramework;
@@ -92,6 +94,12 @@ public class InterProcessSemaphoreV2
     private static final String LOCK_PARENT = "locks";
     private static final String LEASE_PARENT = "leases";
     private static final String LEASE_BASE_NAME = "lease-";
+    public static final LockSchema LOCK_SCHEMA = new LockSchema(
+            Sets.newHashSet(
+                    LOCK_PARENT,
+                    LEASE_PARENT
+            )
+    );
 
     /**
      * @param client    the client

http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java
new file mode 100644
index 0000000..5794705
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java
@@ -0,0 +1,22 @@
+package org.apache.curator.framework.recipes.locks;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+public class LockSchema {
+    private final Set<String> paths;
+
+    public LockSchema() {
+        paths = new HashSet<String>();
+    }
+
+    public LockSchema(Set<String> paths) {
+        this.paths = Sets.newHashSet(paths);
+    }
+
+    public Set<String> getPaths() {
+        return Sets.newHashSet(paths);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index dd3f98f..631b7c7 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -531,4 +531,44 @@ public class TestInterProcessSemaphore extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
         }
     }
+
+    @Test
+    public void testChildReaperCleansUpLockNodes() throws Exception
+    {
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+
+        ChildReaper childReaper = null;
+        try
+        {
+            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test/lock",
1);
+            semaphore.returnLease(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+
+            Assert.assertTrue(client.getChildren().forPath("/test").size() > 0);
+
+            childReaper = new ChildReaper(
+                    client,
+                    "/test",
+                    Reaper.Mode.REAP_UNTIL_GONE,
+                    ChildReaper.newExecutorService(),
+                    1,
+                    "/test-leader",
+                    InterProcessSemaphoreV2.LOCK_SCHEMA
+            );
+            childReaper.start();
+
+            timing.forWaiting().sleepABit();
+
+            List<String> children = client.getChildren().forPath("/test");
+
+            Assert.assertEquals(children.size(), 0, "All children of /test should have been
reaped");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(childReaper);
+            CloseableUtils.closeQuietly(client);
+        }
+
+    }
 }


Mime
View raw message