Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E3C721109D for ; Mon, 21 Apr 2014 19:52:57 +0000 (UTC) Received: (qmail 29092 invoked by uid 500); 21 Apr 2014 19:52:57 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 29061 invoked by uid 500); 21 Apr 2014 19:52:57 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 29054 invoked by uid 99); 21 Apr 2014 19:52:57 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Apr 2014 19:52:57 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A585398EB18; Mon, 21 Apr 2014 19:52:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.apache.org Message-Id: <3e82201ca30940bfa363c93366927b14@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: Major issue! Thread that was canceled due to CancelLeadershipException was getting re-used and thus exiting immediately. Instead, a new thread from the pool should be used Date: Mon, 21 Apr 2014 19:52:56 +0000 (UTC) Repository: curator Updated Branches: refs/heads/master 99a1b7c1d -> 62494bd63 Major issue! Thread that was canceled due to CancelLeadershipException was getting re-used and thus exiting immediately. Instead, a new thread from the pool should be used Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/62494bd6 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/62494bd6 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/62494bd6 Branch: refs/heads/master Commit: 62494bd639bf02ed3f654b98270a3c082902f923 Parents: 99a1b7c Author: randgalt Authored: Mon Apr 21 13:22:40 2014 -0500 Committer: randgalt Committed: Mon Apr 21 13:22:40 2014 -0500 ---------------------------------------------------------------------- .../recipes/leader/LeaderSelector.java | 61 +++++------ .../recipes/leader/TestLeaderSelector.java | 101 +++++++++++++++++++ 2 files changed, 132 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/62494bd6/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java index ac10733..1a2470c 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java @@ -224,11 +224,15 @@ public class LeaderSelector implements Closeable * * @return true if re-queue is successful */ - public synchronized boolean requeue() + public boolean requeue() { Preconditions.checkState(state.get() == State.STARTED, "close() has already been called"); + return internalRequeue(); + } - if ( !isQueued ) + public synchronized boolean internalRequeue() + { + if ( !isQueued && (state.get() == State.STARTED) ) { isQueued = true; Future task = executorService.submit(new Callable() @@ -243,6 +247,10 @@ public class LeaderSelector implements Closeable finally { clearIsQueued(); + if ( autoRequeue.get() ) + { + internalRequeue(); + } } return null; } @@ -393,6 +401,7 @@ public class LeaderSelector implements Closeable catch ( InterruptedException e ) { Thread.currentThread().interrupt(); + throw e; } catch ( Throwable e ) { @@ -406,6 +415,7 @@ public class LeaderSelector implements Closeable catch ( InterruptedException e ) { Thread.currentThread().interrupt(); + throw e; } catch ( Exception e ) { @@ -428,36 +438,27 @@ public class LeaderSelector implements Closeable private void doWorkLoop() throws Exception { - do + KeeperException exception = null; + try { - KeeperException exception = null; - try - { - doWork(); - } - catch ( KeeperException.ConnectionLossException e ) - { - exception = e; - } - catch ( KeeperException.SessionExpiredException e ) - { - exception = e; - } - catch ( InterruptedException ignore ) - { - Future task = ourTask.get(); - if ( (task == null) || !task.isCancelled() ) // if interruptLeadership() was called, not re-set the interrupt state of the thread - { - Thread.currentThread().interrupt(); - } - break; - } - if ( (exception != null) && !autoRequeue.get() ) // autoRequeue should ignore connection loss or session expired and just keep trying - { - throw exception; - } + doWork(); + } + catch ( KeeperException.ConnectionLossException e ) + { + exception = e; + } + catch ( KeeperException.SessionExpiredException e ) + { + exception = e; + } + catch ( InterruptedException ignore ) + { + Thread.currentThread().interrupt(); + } + if ( (exception != null) && !autoRequeue.get() ) // autoRequeue should ignore connection loss or session expired and just keep trying + { + throw exception; } - while ( autoRequeue.get() && (state.get() == State.STARTED) && !Thread.currentThread().isInterrupted() ); } private synchronized void clearIsQueued() http://git-wip-us.apache.org/repos/asf/curator/blob/62494bd6/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java index 1ae041b..a4ae2ba 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java @@ -371,6 +371,107 @@ public class TestLeaderSelector extends BaseClassForTests } } + /** + * This is similar to TestLeaderSelector.testKillSessionThenCloseShouldElectNewLeader + * The differences are: + * it restarts the TestingServer instead of killing the session + * it uses autoRequeue instead of explicitly calling requeue + */ + @Test + public void testKillServerThenCloseShouldElectNewLeader() throws Exception + { + final Timing timing = new Timing(); + + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + client.start(); + try + { + final Semaphore semaphore = new Semaphore(0); + final CountDownLatch interruptedLatch = new CountDownLatch(1); + final AtomicInteger leaderCount = new AtomicInteger(0); + LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() + { + @Override + public void takeLeadership(CuratorFramework client) throws Exception + { + leaderCount.incrementAndGet(); + try + { + semaphore.release(); + try + { + Thread.currentThread().join(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + interruptedLatch.countDown(); + } + } + finally + { + leaderCount.decrementAndGet(); + } + } + }; + LeaderSelector leaderSelector1 = new LeaderSelector(client, PATH_NAME, listener); + LeaderSelector leaderSelector2 = new LeaderSelector(client, PATH_NAME, listener); + + boolean leaderSelector1Closed = false; + boolean leaderSelector2Closed = false; + + leaderSelector1.autoRequeue(); + leaderSelector2.autoRequeue(); + + leaderSelector1.start(); + leaderSelector2.start(); + + Assert.assertTrue(timing.acquireSemaphore(semaphore, 1)); + + int port = server.getPort(); + server.stop(); + timing.sleepABit(); + server = new TestingServer(port); + Assert.assertTrue(timing.awaitLatch(interruptedLatch)); + timing.sleepABit(); + + Assert.assertTrue(timing.acquireSemaphore(semaphore, 1)); + Assert.assertEquals(leaderCount.get(), 1); + + if ( leaderSelector1.hasLeadership() ) + { + leaderSelector1.close(); + leaderSelector1Closed = true; + } + else if ( leaderSelector2.hasLeadership() ) + { + leaderSelector2.close(); + leaderSelector2Closed = true; + } + else + { + fail("No leaderselector has leadership!"); + } + + // Verify that the other leader took over leadership. + Assert.assertTrue(timing.acquireSemaphore(semaphore, 1)); + Assert.assertEquals(leaderCount.get(), 1); + + if ( !leaderSelector1Closed ) + { + leaderSelector1.close(); + } + if ( !leaderSelector2Closed ) + { + leaderSelector2.close(); + } + } + finally + { + client.close(); + } + } + @Test public void testClosing() throws Exception {