curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [1/2] git commit: Added optional leader selector support
Date Wed, 02 Apr 2014 18:17:42 GMT
Repository: curator
Updated Branches:
  refs/heads/CURATOR-76 [created] 2ef7181ea


Added optional leader selector support


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/aed3c3b4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/aed3c3b4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/aed3c3b4

Branch: refs/heads/CURATOR-76
Commit: aed3c3b4acb90b7ad5ebce50ae482bef2d52b793
Parents: d19aff7
Author: randgalt <randgalt@apache.org>
Authored: Wed Apr 2 13:15:31 2014 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Wed Apr 2 13:15:31 2014 -0500

----------------------------------------------------------------------
 .../curator/framework/recipes/locks/Reaper.java |  96 +++++++--
 .../framework/recipes/locks/TestReaper.java     | 203 ++++++++++++-------
 2 files changed, 211 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/aed3c3b4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
index 037eacd..8802372 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
@@ -21,8 +21,9 @@ package org.apache.curator.framework.recipes.locks;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
 import org.apache.curator.utils.CloseableScheduledExecutorService;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.zookeeper.KeeperException;
@@ -31,10 +32,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Set;
+import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -46,8 +48,10 @@ public class Reaper implements Closeable
     private final CuratorFramework client;
     private final CloseableScheduledExecutorService executor;
     private final int reapingThresholdMs;
-    private final Set<String> activePaths = Sets.newSetFromMap(Maps.<String, Boolean>newConcurrentMap());
+    private final Map<String, PathHolder> activePaths = Maps.newConcurrentMap();
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final LeaderLatch leaderLatch;
+    private final AtomicBoolean reapingIsActive = new AtomicBoolean(true);
 
     private enum State
     {
@@ -107,7 +111,7 @@ public class Reaper implements Closeable
      */
     public Reaper(CuratorFramework client)
     {
-        this(client, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS);
+        this(client, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS, null);
     }
 
     /**
@@ -118,7 +122,7 @@ public class Reaper implements Closeable
      */
     public Reaper(CuratorFramework client, int reapingThresholdMs)
     {
-        this(client, newExecutorService(), reapingThresholdMs);
+        this(client, newExecutorService(), reapingThresholdMs, null);
     }
 
     /**
@@ -128,9 +132,27 @@ public class Reaper implements Closeable
      */
     public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs)
     {
+        this(client, executor, reapingThresholdMs, null);
+    }
+
+    /**
+     * @param client             client
+     * @param executor           thread pool
+     * @param reapingThresholdMs threshold in milliseconds that determines that a path can
be deleted
+     * @param leaderPath         if not null, uses a leader selection so that only 1 reaper
is active in the cluster
+     */
+    public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs,
String leaderPath)
+    {
         this.client = client;
         this.executor = new CloseableScheduledExecutorService(executor);
         this.reapingThresholdMs = reapingThresholdMs / EMPTY_COUNT_THRESHOLD;
+
+        LeaderLatch localLeaderLatch = null;
+        if ( leaderPath != null )
+        {
+            localLeaderLatch = makeLeaderLatch(client, leaderPath);
+        }
+        leaderLatch = localLeaderLatch;
     }
 
     /**
@@ -153,8 +175,9 @@ public class Reaper implements Closeable
      */
     public void addPath(String path, Mode mode)
     {
-        activePaths.add(path);
-        schedule(new PathHolder(path, mode, 0), reapingThresholdMs);
+        PathHolder pathHolder = new PathHolder(path, mode, 0);
+        activePaths.put(path, pathHolder);
+        schedule(pathHolder, reapingThresholdMs);
     }
 
     /**
@@ -165,7 +188,7 @@ public class Reaper implements Closeable
      */
     public boolean removePath(String path)
     {
-        return activePaths.remove(path);
+        return activePaths.remove(path) != null;
     }
 
     /**
@@ -176,6 +199,11 @@ public class Reaper implements Closeable
     public void start() throws Exception
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot
be started more than once");
+
+        if ( leaderLatch != null )
+        {
+            leaderLatch.start();
+        }
     }
 
     @Override
@@ -184,18 +212,27 @@ public class Reaper implements Closeable
         if ( state.compareAndSet(State.STARTED, State.CLOSED) )
         {
             executor.close();
+            if ( leaderLatch != null )
+            {
+                leaderLatch.close();
+            }
         }
     }
 
     @VisibleForTesting
     protected Future<?> schedule(PathHolder pathHolder, int reapingThresholdMs)
     {
-        return executor.schedule(pathHolder, reapingThresholdMs, TimeUnit.MILLISECONDS);
+        if ( reapingIsActive.get() )
+        {
+            return executor.schedule(pathHolder, reapingThresholdMs, TimeUnit.MILLISECONDS);
+        }
+        return null;
     }
 
-    private void reap(PathHolder holder)
+    @VisibleForTesting
+    protected void reap(PathHolder holder)
     {
-        if ( !activePaths.contains(holder.path) )
+        if ( !activePaths.containsKey(holder.path) )
         {
             return;
         }
@@ -256,14 +293,47 @@ public class Reaper implements Closeable
         {
             activePaths.remove(holder.path);
         }
-        else if ( !Thread.currentThread().isInterrupted() && (state.get() == State.STARTED)
&& activePaths.contains(holder.path) )
+        else if ( !Thread.currentThread().isInterrupted() && (state.get() == State.STARTED)
&& activePaths.containsKey(holder.path) )
         {
+            activePaths.put(holder.path, holder);
             schedule(new PathHolder(holder.path, holder.mode, newEmptyCount), reapingThresholdMs);
         }
     }
 
-    private static ScheduledExecutorService newExecutorService()
+    /**
+     * Allocate an executor service for the reaper
+     *
+     * @return service
+     */
+    public static ScheduledExecutorService newExecutorService()
     {
         return ThreadUtils.newSingleThreadScheduledExecutor("Reaper");
     }
+
+    private LeaderLatch makeLeaderLatch(CuratorFramework client, String leaderPath)
+    {
+        reapingIsActive.set(false);
+
+        LeaderLatch localLeaderLatch = new LeaderLatch(client, leaderPath);
+        LeaderLatchListener listener = new LeaderLatchListener()
+        {
+            @Override
+            public void isLeader()
+            {
+                reapingIsActive.set(true);
+                for ( PathHolder holder : activePaths.values() )
+                {
+                    schedule(holder, reapingThresholdMs);
+                }
+            }
+
+            @Override
+            public void notLeader()
+            {
+                reapingIsActive.set(false);
+            }
+        };
+        localLeaderLatch.addListener(listener);
+        return localLeaderLatch;
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/aed3c3b4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
index b11bc8b..39c4817 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.locks;
 
-import org.apache.curator.utils.CloseableUtils;
 import junit.framework.Assert;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -28,6 +28,7 @@ import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.Stat;
 import org.testng.annotations.Test;
@@ -42,21 +43,92 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestReaper extends BaseClassForTests
 {
     @Test
     public void testUsingLeader() throws Exception
     {
-        final Timing            timing = new Timing();
-        final CuratorFramework  client = makeClient(timing, null);
-        final CountDownLatch    latch = new CountDownLatch(1);
-        LeaderSelectorListener  listener = new LeaderSelectorListener()
+        final Timing timing = new Timing();
+        CuratorFramework client = makeClient(timing, null);
+        Reaper reaper1 = null;
+        Reaper reaper2 = null;
+        try
+        {
+            final AtomicInteger reaper1Count = new AtomicInteger();
+            reaper1 = new Reaper(client, Reaper.newExecutorService(), 1, "/reaper/leader")
+            {
+                @Override
+                protected void reap(PathHolder holder)
+                {
+                    reaper1Count.incrementAndGet();
+                    super.reap(holder);
+                }
+            };
+
+            final AtomicInteger reaper2Count = new AtomicInteger();
+            reaper2 = new Reaper(client, Reaper.newExecutorService(), 1, "/reaper/leader")
+            {
+                @Override
+                protected void reap(PathHolder holder)
+                {
+                    reaper2Count.incrementAndGet();
+                    super.reap(holder);
+                }
+            };
+
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
+
+            reaper1.start();
+            reaper2.start();
+
+            reaper1.addPath("/one/two/three");
+            reaper2.addPath("/one/two/three");
+
+            timing.sleepABit();
+
+            Assert.assertTrue((reaper1Count.get() == 0) || (reaper2Count.get() == 0));
+            Assert.assertTrue((reaper1Count.get() > 0) || (reaper2Count.get() > 0));
+
+            Reaper activeReaper;
+            AtomicInteger inActiveReaperCount;
+            if ( reaper1Count.get() > 0 )
+            {
+                activeReaper = reaper1;
+                inActiveReaperCount = reaper2Count;
+            }
+            else
+            {
+                activeReaper = reaper2;
+                inActiveReaperCount = reaper1Count;
+            }
+            Assert.assertEquals(inActiveReaperCount.get(), 0);
+            activeReaper.close();
+            timing.sleepABit();
+            Assert.assertTrue(inActiveReaperCount.get() > 0);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(reaper1);
+            CloseableUtils.closeQuietly(reaper2);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testUsingManualLeader() throws Exception
+    {
+        final Timing timing = new Timing();
+        final CuratorFramework client = makeClient(timing, null);
+        final CountDownLatch latch = new CountDownLatch(1);
+        LeaderSelectorListener listener = new LeaderSelectorListener()
         {
             @Override
             public void takeLeadership(CuratorFramework client) throws Exception
             {
-                Reaper      reaper = new Reaper(client, 1);
+                Reaper reaper = new Reaper(client, 1);
                 try
                 {
                     reaper.addPath("/one/two/three", Reaper.Mode.REAP_UNTIL_DELETE);
@@ -76,7 +148,7 @@ public class TestReaper extends BaseClassForTests
             {
             }
         };
-        LeaderSelector  selector = new LeaderSelector(client, "/leader", listener);
+        LeaderSelector selector = new LeaderSelector(client, "/leader", listener);
         try
         {
             client.start();
@@ -99,12 +171,11 @@ public class TestReaper extends BaseClassForTests
     @Test
     public void testSparseUseNoReap() throws Exception
     {
-        final int   THRESHOLD = 3000;
+        final int THRESHOLD = 3000;
 
-        Timing                  timing = new Timing();
-        Reaper                  reaper = null;
-        Future<Void>            watcher = null;
-        CuratorFramework        client = makeClient(timing, null);
+        Timing timing = new Timing();
+        Reaper reaper = null;
+        CuratorFramework client = makeClient(timing, null);
         try
         {
             client.start();
@@ -112,43 +183,36 @@ public class TestReaper extends BaseClassForTests
 
             Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
 
-            final Queue<Reaper.PathHolder>  holders = new ConcurrentLinkedQueue<Reaper.PathHolder>();
-            final ExecutorService           pool = Executors.newCachedThreadPool();
+            final Queue<Reaper.PathHolder> holders = new ConcurrentLinkedQueue<Reaper.PathHolder>();
+            final ExecutorService pool = Executors.newCachedThreadPool();
             ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1);
 
-            reaper = new Reaper
-            (
-                client,
-                service,
-                THRESHOLD
-            )
+            reaper = new Reaper(client, service, THRESHOLD)
             {
                 @Override
                 protected Future<Void> schedule(final PathHolder pathHolder, int reapingThresholdMs)
                 {
                     holders.add(pathHolder);
-                    final Future<?>    f = super.schedule(pathHolder, reapingThresholdMs);
-                    pool.submit
-                        (
-                            new Callable<Void>()
+                    final Future<?> f = super.schedule(pathHolder, reapingThresholdMs);
+                    pool.submit(new Callable<Void>()
+                        {
+                            @Override
+                            public Void call() throws Exception
                             {
-                                @Override
-                                public Void call() throws Exception
-                                {
-                                    f.get();
-                                    holders.remove(pathHolder);
-                                    return null;
-                                }
+                                f.get();
+                                holders.remove(pathHolder);
+                                return null;
                             }
-                        );
+                        }
+                               );
                     return null;
                 }
             };
             reaper.start();
             reaper.addPath("/one/two/three");
 
-            long        start = System.currentTimeMillis();
-            boolean     emptyCountIsCorrect = false;
+            long start = System.currentTimeMillis();
+            boolean emptyCountIsCorrect = false;
             while ( ((System.currentTimeMillis() - start) < timing.forWaiting().milliseconds())
&& !emptyCountIsCorrect )   // need to loop as the Holder can go in/out of the Reaper's
DelayQueue
             {
                 for ( Reaper.PathHolder holder : holders )
@@ -176,10 +240,6 @@ public class TestReaper extends BaseClassForTests
         }
         finally
         {
-            if ( watcher != null )
-            {
-                watcher.cancel(true);
-            }
             CloseableUtils.closeQuietly(reaper);
             CloseableUtils.closeQuietly(client);
         }
@@ -197,7 +257,7 @@ public class TestReaper extends BaseClassForTests
         testReapUntilDelete("test");
     }
 
-     @Test
+    @Test
     public void testReapUntilGone() throws Exception
     {
         testReapUntilGone(null);
@@ -259,9 +319,9 @@ public class TestReaper extends BaseClassForTests
 
     private void testReapUntilDelete(String namespace) throws Exception
     {
-        Timing                  timing = new Timing();
-        Reaper                  reaper = null;
-        CuratorFramework        client = makeClient(timing, namespace);
+        Timing timing = new Timing();
+        Reaper reaper = null;
+        CuratorFramework client = makeClient(timing, namespace);
         try
         {
             client.start();
@@ -290,13 +350,13 @@ public class TestReaper extends BaseClassForTests
 
     private void testReapUntilGone(String namespace) throws Exception
     {
-        Timing                  timing = new Timing();
-        Reaper                  reaper = null;
-        CuratorFramework        client = makeClient(timing, namespace);
+        Timing timing = new Timing();
+        Reaper reaper = null;
+        CuratorFramework client = makeClient(timing, namespace);
         try
         {
             client.start();
-            
+
             reaper = new Reaper(client, 100);
             reaper.start();
 
@@ -318,14 +378,9 @@ public class TestReaper extends BaseClassForTests
         }
     }
 
-
     private CuratorFramework makeClient(Timing timing, String namespace) throws IOException
     {
-        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
-            .connectionTimeoutMs(timing.connection())
-            .sessionTimeoutMs(timing.session())
-            .connectString(server.getConnectString())
-            .retryPolicy(new RetryOneTime(1));
+        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectionTimeoutMs(timing.connection()).sessionTimeoutMs(timing.session()).connectString(server.getConnectString()).retryPolicy(new
RetryOneTime(1));
         if ( namespace != null )
         {
             builder = builder.namespace(namespace);
@@ -335,9 +390,9 @@ public class TestReaper extends BaseClassForTests
 
     private void testRemove(String namespace) throws Exception
     {
-        Timing                  timing = new Timing();
-        Reaper                  reaper = null;
-        CuratorFramework        client = makeClient(timing, namespace);
+        Timing timing = new Timing();
+        Reaper reaper = null;
+        CuratorFramework client = makeClient(timing, namespace);
         try
         {
             client.start();
@@ -368,16 +423,16 @@ public class TestReaper extends BaseClassForTests
 
     private void testSimulationWithLocks(String namespace) throws Exception
     {
-        final int           LOCK_CLIENTS = 10;
-        final int           ITERATIONS = 250;
-        final int           MAX_WAIT_MS = 10;
+        final int LOCK_CLIENTS = 10;
+        final int ITERATIONS = 250;
+        final int MAX_WAIT_MS = 10;
 
-        ExecutorService                     service = Executors.newFixedThreadPool(LOCK_CLIENTS);
-        ExecutorCompletionService<Object>   completionService = new ExecutorCompletionService<Object>(service);
+        ExecutorService service = Executors.newFixedThreadPool(LOCK_CLIENTS);
+        ExecutorCompletionService<Object> completionService = new ExecutorCompletionService<Object>(service);
 
-        Timing                      timing = new Timing();
-        Reaper                      reaper = null;
-        final CuratorFramework      client = makeClient(timing, namespace);
+        Timing timing = new Timing();
+        Reaper reaper = null;
+        final CuratorFramework client = makeClient(timing, namespace);
         try
         {
             client.start();
@@ -388,14 +443,12 @@ public class TestReaper extends BaseClassForTests
 
             for ( int i = 0; i < LOCK_CLIENTS; ++i )
             {
-                completionService.submit
-                (
-                    new Callable<Object>()
+                completionService.submit(new Callable<Object>()
                     {
                         @Override
                         public Object call() throws Exception
                         {
-                            final InterProcessMutex     lock = new InterProcessMutex(client,
"/a/b");
+                            final InterProcessMutex lock = new InterProcessMutex(client,
"/a/b");
                             for ( int i = 0; i < ITERATIONS; ++i )
                             {
                                 lock.acquire();
@@ -411,7 +464,7 @@ public class TestReaper extends BaseClassForTests
                             return null;
                         }
                     }
-                );
+                                        );
             }
 
             for ( int i = 0; i < LOCK_CLIENTS; ++i )
@@ -435,10 +488,10 @@ public class TestReaper extends BaseClassForTests
 
     private void testWithEphemerals(String namespace) throws Exception
     {
-        Timing                  timing = new Timing();
-        Reaper                  reaper = null;
-        CuratorFramework        client2 = null;
-        CuratorFramework        client = makeClient(timing, namespace);
+        Timing timing = new Timing();
+        Reaper reaper = null;
+        CuratorFramework client2 = null;
+        CuratorFramework client = makeClient(timing, namespace);
         try
         {
             client.start();
@@ -479,9 +532,9 @@ public class TestReaper extends BaseClassForTests
 
     private void testBasic(String namespace) throws Exception
     {
-        Timing                  timing = new Timing();
-        Reaper                  reaper = null;
-        CuratorFramework        client = makeClient(timing, namespace);
+        Timing timing = new Timing();
+        Reaper reaper = null;
+        CuratorFramework client = makeClient(timing, namespace);
         try
         {
             client.start();


Mime
View raw message