curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [1/2] git commit: Added CloseMode support to LeaderLatch in order to be able to trigger the notLeader() callback when a Latch is manually closed.
Date Thu, 06 Mar 2014 10:59:13 GMT
Repository: curator
Updated Branches:
  refs/heads/master 0d9eaed1c -> a40b81940


Added CloseMode support to LeaderLatch in order to be able to trigger the notLeader() callback
when a Latch is manually closed.

This closes #1


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2580ef3f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2580ef3f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2580ef3f

Branch: refs/heads/master
Commit: 2580ef3f5df148a67fe5d2769e4d04890e8b4fd6
Parents: 0d9eaed
Author: David Trott <github@davidtrott.com>
Authored: Wed Mar 5 16:57:04 2014 -0800
Committer: randgalt <randgalt@apache.org>
Committed: Wed Mar 5 21:53:16 2014 -0500

----------------------------------------------------------------------
 .../framework/recipes/leader/LeaderLatch.java   |  50 +++++-
 .../recipes/leader/TestLeaderLatch.java         | 171 +++++++++++++++++++
 2 files changed, 218 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/2580ef3f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 8d9a11f..310919b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -67,6 +67,7 @@ public class LeaderLatch implements Closeable
     private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
     private final AtomicReference<String> ourPath = new AtomicReference<String>();
     private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
+    private final CloseMode closeMode;
 
     private final ConnectionStateListener listener = new ConnectionStateListener()
     {
@@ -95,13 +96,19 @@ public class LeaderLatch implements Closeable
         CLOSED
     }
 
+    public enum CloseMode
+    {
+        SILENT,
+        NOTIFY_LEADER
+    }
+
     /**
      * @param client    the client
      * @param latchPath the path for this leadership group
      */
     public LeaderLatch(CuratorFramework client, String latchPath)
     {
-        this(client, latchPath, "");
+        this(client, latchPath, "", CloseMode.SILENT);
     }
 
     /**
@@ -111,9 +118,21 @@ public class LeaderLatch implements Closeable
      */
     public LeaderLatch(CuratorFramework client, String latchPath, String id)
     {
+        this(client, latchPath, id, CloseMode.SILENT);
+    }
+
+    /**
+     * @param client    the client
+     * @param latchPath the path for this leadership group
+     * @param id        participant ID
+     * @param closeMode behaviour of listener on explicit close.
+     */
+    public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
+    {
         this.client = Preconditions.checkNotNull(client, "client cannot be null");
         this.latchPath = Preconditions.checkNotNull(latchPath, "mutexPath cannot be null");
         this.id = Preconditions.checkNotNull(id, "id cannot be null");
+        this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
     }
 
     /**
@@ -139,7 +158,22 @@ public class LeaderLatch implements Closeable
     @Override
     public void close() throws IOException
     {
+        close(this.closeMode);
+    }
+
+    /**
+     * Remove this instance from the leadership election. If this instance is the leader,
leadership
+     * is released. IMPORTANT: the only way to release leadership is by calling close().
All LeaderLatch
+     * instances must eventually be closed.
+     *
+     * @param closeMode allows the default close mode to be overridden at the time the latch
is closed.
+     *
+     * @throws IOException errors
+     */
+    public void close(CloseMode closeMode) throws IOException
+    {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already
closed or has not been started");
+        Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
 
         try
         {
@@ -152,8 +186,18 @@ public class LeaderLatch implements Closeable
         finally
         {
             client.getConnectionStateListenable().removeListener(listener);
-            listeners.clear();
-            setLeadership(false);
+
+            switch(closeMode)
+            {
+                case NOTIFY_LEADER:
+                    setLeadership(false);
+                    listeners.clear();
+                    break;
+                default:
+                    listeners.clear();
+                    setLeadership(false);
+                    break;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/2580ef3f/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index f4b5590..067c817 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -365,6 +365,177 @@ public class TestLeaderLatch extends BaseClassForTests
         }
     }
 
+    @Test
+    public void testCallbackNotifyLeader() throws Exception
+    {
+        final int PARTICIPANT_QTY = 10;
+        final int SILENT_QTY = 3;
+
+        final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
+        final AtomicLong masterCounter = new AtomicLong(0);
+        final AtomicLong dunceCounter = new AtomicLong(0);
+
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+        ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackNotifyLeader-%s").build());
+
+        List<LeaderLatch> latches = Lists.newArrayList();
+        for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+        {
+            LeaderLatch.CloseMode closeMode = i < SILENT_QTY ?
+                    LeaderLatch.CloseMode.SILENT :
+                    LeaderLatch.CloseMode.NOTIFY_LEADER;
+
+            final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "", closeMode);
+            latch.addListener(
+                new LeaderLatchListener()
+                {
+                    boolean beenLeader = false;
+
+                    @Override
+                    public void isLeader()
+                    {
+                        if ( !beenLeader )
+                        {
+                            masterCounter.incrementAndGet();
+                            beenLeader = true;
+                            try
+                            {
+                                latch.reset();
+                            }
+                            catch ( Exception e )
+                            {
+                                throw Throwables.propagate(e);
+                            }
+                        }
+                        else
+                        {
+                            masterCounter.incrementAndGet();
+                            CloseableUtils.closeQuietly(latch);
+                            timesSquare.countDown();
+                        }
+                    }
+
+                    @Override
+                    public void notLeader()
+                    {
+                        dunceCounter.incrementAndGet();
+                    }
+                },
+                exec
+            );
+            latches.add(latch);
+        }
+
+        try
+        {
+            client.start();
+
+            for ( LeaderLatch latch : latches )
+            {
+                latch.start();
+            }
+
+            timesSquare.await();
+
+            Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
+            Assert.assertEquals(dunceCounter.get(), PARTICIPANT_QTY * 2 - SILENT_QTY);
+            for ( LeaderLatch latch : latches )
+            {
+                Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
+            }
+        }
+        finally
+        {
+            for ( LeaderLatch latch : latches )
+            {
+                if ( latch.getState() != LeaderLatch.State.CLOSED )
+                {
+                    CloseableUtils.closeQuietly(latch);
+                }
+            }
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testCallbackDontNotifyDunce() throws Exception {
+        final AtomicLong masterCounter = new AtomicLong(0);
+        final AtomicLong dunceCounter = new AtomicLong(0);
+
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+
+        final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
+        final LeaderLatch dunce = new LeaderLatch(client, PATH_NAME, "", LeaderLatch.CloseMode.NOTIFY_LEADER);
+
+        leader.addListener(new LeaderLatchListener()
+        {
+            @Override
+            public void isLeader()
+            {
+            }
+
+            @Override
+            public void notLeader()
+            {
+                masterCounter.incrementAndGet();
+            }
+        });
+
+        dunce.addListener(new LeaderLatchListener()
+        {
+            @Override
+            public void isLeader()
+            {
+            }
+
+            @Override
+            public void notLeader()
+            {
+                dunceCounter.incrementAndGet();
+            }
+        });
+
+        try
+        {
+            client.start();
+
+            leader.start();
+
+            timing.sleepABit();
+
+            dunce.start();
+
+            timing.sleepABit();
+
+            dunce.close();
+
+            timing.sleepABit();
+
+            // Test the close override
+            leader.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
+
+            Assert.assertEquals(leader.getState(), LeaderLatch.State.CLOSED);
+            Assert.assertEquals(dunce.getState(), LeaderLatch.State.CLOSED);
+
+            Assert.assertEquals(masterCounter.get(), 1);
+            Assert.assertEquals(dunceCounter.get(), 0);
+        }
+        finally
+        {
+            if (leader.getState() != LeaderLatch.State.CLOSED)
+            {
+                CloseableUtils.closeQuietly(leader);
+            }
+            if (dunce.getState() != LeaderLatch.State.CLOSED)
+            {
+                CloseableUtils.closeQuietly(dunce);
+            }
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
     private enum Mode
     {
         START_IMMEDIATELY,


Mime
View raw message