Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 74C05DB14 for ; Fri, 17 May 2013 20:39:34 +0000 (UTC) Received: (qmail 82887 invoked by uid 500); 17 May 2013 20:39:34 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 82851 invoked by uid 500); 17 May 2013 20:39:34 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 82836 invoked by uid 99); 17 May 2013 20:39:34 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 May 2013 20:39:34 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 17 May 2013 20:39:32 +0000 Received: (qmail 81948 invoked by uid 99); 17 May 2013 20:39:12 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 May 2013 20:39:12 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id EF09137A6A; Fri, 17 May 2013 20:39:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.incubator.apache.org Date: Fri, 17 May 2013 20:39:19 -0000 Message-Id: <90706fd5d6674313b409f4b27dbad040@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/12] git commit: Introduced CloseableExecutorService. Instead of blindly shutting down exectors, this container shuts down any futures created by an executor. This resolves issues where custom executors are given to Curator. X-Virus-Checked: Checked by ClamAV on apache.org Introduced CloseableExecutorService. Instead of blindly shutting down exectors, this container shuts down any futures created by an executor. This resolves issues where custom executors are given to Curator. Merge branch 'CURATOR-17' into 2.0.1-incubating Conflicts: curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/6e3c9e27 Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/6e3c9e27 Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/6e3c9e27 Branch: refs/heads/master Commit: 6e3c9e27b8e6e2cda6ad146b8a63a60346badb0d Parents: 11ae23a 601bc4c Author: randgalt Authored: Fri May 10 19:11:23 2013 -0700 Committer: randgalt Committed: Fri May 10 19:11:23 2013 -0700 ---------------------------------------------------------------------- .../curator/utils/CloseableExecutorService.java | 123 +++++++ .../utils/CloseableScheduledExecutorService.java | 72 ++++ .../utils/TestCloseableExecutorService.java | 252 +++++++++++++++ .../framework/recipes/cache/PathChildrenCache.java | 9 +- .../framework/recipes/locks/ChildReaper.java | 9 +- .../curator/framework/recipes/locks/Reaper.java | 25 +- .../recipes/cache/TestPathChildrenCache.java | 27 +-- .../framework/recipes/locks/TestReaper.java | 55 ++-- 8 files changed, 502 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6e3c9e27/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java ---------------------------------------------------------------------- diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java index 76efc6c,61c3af7..ec2d328 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java @@@ -350,11 -355,10 +351,11 @@@ public class PathChildrenCache implemen @Override public void close() throws IOException { - Preconditions.checkState(!executorService.isShutdown(), "has not been started"); - - client.getConnectionStateListenable().removeListener(connectionStateListener); - executorService.close(); + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + client.getConnectionStateListenable().removeListener(connectionStateListener); - executorService.shutdownNow(); ++ executorService.close(); + } } /** @@@ -751,24 -749,9 +752,24 @@@ } } - private void offerOperation(Operation operation) + /** + * Submits a runnable to the executor. + *

+ * This method is synchronized because it has to check state about whether this instance is still open. Without this check + * there is a race condition with the dataWatchers that get set. Even after this object is closed() it can still be + * called by those watchers, because the close() method cannot actually disable the watcher. + *

+ * The synchronization overhead should be minimal if non-existant as this is generally only called from the + * ZK client thread and will only contend if close() is called in parallel with an update, and that's the exact state + * we want to protect from. + * + * @param command The runnable to run + */ + private synchronized void submitToExecutor(final Runnable command) { - operations.remove(operation); // avoids herding for refresh operations - operations.offer(operation); + if ( state.get() == State.STARTED ) + { - executorService.execute(command); ++ executorService.submit(command); + } } } http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6e3c9e27/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java ---------------------------------------------------------------------- diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java index 7fed5f8,e51125b..4b117fb --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java @@@ -730,253 -728,4 +729,229 @@@ public class TestPathChildrenCache exte client.close(); } } + + @Test + public void testBasicsOnTwoCachesWithSameExecutor() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + client.start(); + try + { + client.create().forPath("/test"); + + final BlockingQueue events = new LinkedBlockingQueue(); - final ExecutorService exec = new ShutdownNowIgnoringExecutorService(Executors.newSingleThreadExecutor()); ++ final ExecutorService exec = Executors.newSingleThreadExecutor(); + PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec); + cache.getListenable().addListener + ( + new PathChildrenCacheListener() + { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + { + if ( event.getData().getPath().equals("/test/one") ) + { + events.offer(event.getType()); + } + } + } + ); + cache.start(); + + final BlockingQueue events2 = new LinkedBlockingQueue(); + PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec); + cache2.getListenable().addListener( + new PathChildrenCacheListener() { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) + throws Exception + { + if ( event.getData().getPath().equals("/test/one") ) + { + events2.offer(event.getType()); + } + } + } + ); + cache2.start(); + + client.create().forPath("/test/one", "hey there".getBytes()); + Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED); + Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED); + + client.setData().forPath("/test/one", "sup!".getBytes()); + Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED); + Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!"); + Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!"); + + client.delete().forPath("/test/one"); + Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED); + Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED); + + cache.close(); + cache2.close(); + } + finally + { + client.close(); + } + } + + @Test + public void testDeleteNodeAfterCloseDoesntCallExecutor() + throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + client.start(); + try + { + client.create().forPath("/test"); + + final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor()); + PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec); + + cache.start(); + client.create().forPath("/test/one", "hey there".getBytes()); + + cache.rebuild(); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); + Assert.assertTrue(exec.isExecuteCalled()); + + exec.setExecuteCalled(false); + cache.close(); + Assert.assertFalse(exec.isExecuteCalled()); + + client.delete().forPath("/test/one"); + Thread.sleep(100); + Assert.assertFalse(exec.isExecuteCalled()); + } + finally { + client.close(); + } + + } + + public static class ExecuteCalledWatchingExecutorService extends DelegatingExecutorService + { + boolean executeCalled = false; + + public ExecuteCalledWatchingExecutorService(ExecutorService delegate) + { + super(delegate); + } + + @Override + public synchronized void execute(Runnable command) + { + executeCalled = true; + super.execute(command); + } + + public synchronized boolean isExecuteCalled() + { + return executeCalled; + } + + public synchronized void setExecuteCalled(boolean executeCalled) + { + this.executeCalled = executeCalled; + } + } + - /** - * This is required to work around https://issues.apache.org/jira/browse/CURATOR-17 - */ - public static class ShutdownNowIgnoringExecutorService extends DelegatingExecutorService - { - public ShutdownNowIgnoringExecutorService(ExecutorService delegate) - { - super(delegate); - } - - @Override - public void shutdown() - { - // ignore - } - - @Override - public List shutdownNow() - { - // ignore - return ImmutableList.of(); - } - } - + public static class DelegatingExecutorService implements ExecutorService + { + private final ExecutorService delegate; + + public DelegatingExecutorService( + ExecutorService delegate + ) + { + this.delegate = delegate; + } + + + @Override + public void shutdown() + { + delegate.shutdown(); + } + + @Override + public List shutdownNow() + { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() + { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() + { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException + { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) + { + return delegate.submit(task); + } + + @Override + public Future submit(Runnable task, T result) + { + return delegate.submit(task, result); + } + + @Override + public Future submit(Runnable task) + { + return delegate.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) + throws InterruptedException + { + return delegate.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException + { + return delegate.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException + { + return delegate.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException + { + return delegate.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) + { + delegate.execute(command); + } + } }