Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C837A183C7 for ; Mon, 17 Aug 2015 17:02:09 +0000 (UTC) Received: (qmail 79979 invoked by uid 500); 17 Aug 2015 17:02:09 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 79880 invoked by uid 500); 17 Aug 2015 17:02:09 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 79352 invoked by uid 99); 17 Aug 2015 17:02:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Aug 2015 17:02:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 499D2E0415; Mon, 17 Aug 2015 17:02:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dragonsinth@apache.org To: commits@curator.apache.org Date: Mon, 17 Aug 2015 17:02:20 -0000 Message-Id: In-Reply-To: <7ac50fe071104b4493121735a87f2dc8@git.apache.org> References: <7ac50fe071104b4493121735a87f2dc8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/41] curator git commit: [CURATOR-223] Add executorService methods to ServiceCacheBuilder [CURATOR-223] Add executorService methods to ServiceCacheBuilder Add executorService methods to ServiceCacheBuilder to allow the caller to specify an ExecutorService or a CloseableExecutorService to be used by the PathChildrenCache embedded in ServiceCacheImpl. Extracts ExecuteCalledWatchingExecutorService (and DelegatingExecutorService) into the curator-test module for use by TestServiceCache. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/6ca77776 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/6ca77776 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/6ca77776 Branch: refs/heads/CURATOR-3.0 Commit: 6ca77776d3d2c71b1e541c0edd60d2c17efe9c66 Parents: 20e92a5 Author: Tom Dyas Authored: Tue Jun 16 17:38:18 2015 -0400 Committer: Tom Dyas Committed: Wed Jun 17 13:03:17 2015 -0400 ---------------------------------------------------------------------- .../recipes/cache/TestPathChildrenCache.java | 124 +------------------ .../curator/test/DelegatingExecutorService.java | 119 ++++++++++++++++++ .../ExecuteCalledWatchingExecutorService.java | 48 +++++++ .../x/discovery/ServiceCacheBuilder.java | 24 +++- .../details/ServiceCacheBuilderImpl.java | 39 +++++- .../x/discovery/details/ServiceCacheImpl.java | 17 ++- .../curator/x/discovery/TestServiceCache.java | 53 ++++++++ 7 files changed, 297 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java index b904bdc..216660f 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java @@ -31,6 +31,7 @@ import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.ExecuteCalledWatchingExecutorService; import org.apache.curator.test.KillSession; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; @@ -1039,127 +1040,4 @@ public class TestPathChildrenCache extends BaseClassForTests CloseableUtils.closeQuietly(client); } } - - public static class ExecuteCalledWatchingExecutorService extends DelegatingExecutorService - { - boolean executeCalled = false; - - public ExecuteCalledWatchingExecutorService(ExecutorService delegate) - { - super(delegate); - } - - @Override - public synchronized void execute(Runnable command) - { - executeCalled = true; - super.execute(command); - } - - public synchronized boolean isExecuteCalled() - { - return executeCalled; - } - - public synchronized void setExecuteCalled(boolean executeCalled) - { - this.executeCalled = executeCalled; - } - } - - public static class DelegatingExecutorService implements ExecutorService - { - private final ExecutorService delegate; - - public DelegatingExecutorService( - ExecutorService delegate - ) - { - this.delegate = delegate; - } - - - @Override - public void shutdown() - { - delegate.shutdown(); - } - - @Override - public List shutdownNow() - { - return delegate.shutdownNow(); - } - - @Override - public boolean isShutdown() - { - return delegate.isShutdown(); - } - - @Override - public boolean isTerminated() - { - return delegate.isTerminated(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException - { - return delegate.awaitTermination(timeout, unit); - } - - @Override - public Future submit(Callable task) - { - return delegate.submit(task); - } - - @Override - public Future submit(Runnable task, T result) - { - return delegate.submit(task, result); - } - - @Override - public Future submit(Runnable task) - { - return delegate.submit(task); - } - - @Override - public List> invokeAll(Collection> tasks) - throws InterruptedException - { - return delegate.invokeAll(tasks); - } - - @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException - { - return delegate.invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) - throws InterruptedException, ExecutionException - { - return delegate.invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException - { - return delegate.invokeAny(tasks, timeout, unit); - } - - @Override - public void execute(Runnable command) - { - delegate.execute(command); - } - } } http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java ---------------------------------------------------------------------- diff --git a/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java b/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java new file mode 100644 index 0000000..eff34dd --- /dev/null +++ b/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.test; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.*; + +public class DelegatingExecutorService implements ExecutorService +{ + private final ExecutorService delegate; + + public DelegatingExecutorService( + ExecutorService delegate + ) + { + this.delegate = delegate; + } + + + @Override + public void shutdown() + { + delegate.shutdown(); + } + + @Override + public List shutdownNow() + { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() + { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() + { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException + { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) + { + return delegate.submit(task); + } + + @Override + public Future submit(Runnable task, T result) + { + return delegate.submit(task, result); + } + + @Override + public Future submit(Runnable task) + { + return delegate.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) + throws InterruptedException + { + return delegate.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException + { + return delegate.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException + { + return delegate.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException + { + return delegate.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) + { + delegate.execute(command); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java ---------------------------------------------------------------------- diff --git a/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java b/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java new file mode 100644 index 0000000..da7bc66 --- /dev/null +++ b/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.test; + +import java.util.concurrent.ExecutorService; + +public class ExecuteCalledWatchingExecutorService extends DelegatingExecutorService +{ + boolean executeCalled = false; + + public ExecuteCalledWatchingExecutorService(ExecutorService delegate) + { + super(delegate); + } + + @Override + public synchronized void execute(Runnable command) + { + executeCalled = true; + super.execute(command); + } + + public synchronized boolean isExecuteCalled() + { + return executeCalled; + } + + public synchronized void setExecuteCalled(boolean executeCalled) + { + this.executeCalled = executeCalled; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java index 10ce305..290d9b1 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java @@ -18,6 +18,8 @@ */ package org.apache.curator.x.discovery; +import org.apache.curator.utils.CloseableExecutorService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; public interface ServiceCacheBuilder @@ -38,10 +40,30 @@ public interface ServiceCacheBuilder public ServiceCacheBuilder name(String name); /** - * Optional thread factory to use for the cache's internal thread + * Optional thread factory to use for the cache's internal thread. The specified ExecutorService + * overrides any prior ThreadFactory or ExecutorService set on the ServiceCacheBuilder. * * @param threadFactory factory * @return this */ public ServiceCacheBuilder threadFactory(ThreadFactory threadFactory); + + /** + * Optional ExecutorService to use for the cache's background thread. The specified ExecutorService + * will be wrapped in a CloseableExecutorService and overrides any prior ThreadFactory or ExecutorService + * set on the ServiceCacheBuilder. + * + * @param executorService executor service + * @return this + */ + public ServiceCacheBuilder executorService(ExecutorService executorService); + + /** + * Optional CloseableExecutorService to use for the cache's background thread. The specified ExecutorService + * overrides any prior ThreadFactory or ExecutorService set on the ServiceCacheBuilder. + * + * @param executorService an instance of CloseableExecutorService + * @return this + */ + public ServiceCacheBuilder executorService(CloseableExecutorService executorService); } http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java index c4104f4..8922233 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java @@ -18,8 +18,10 @@ */ package org.apache.curator.x.discovery.details; +import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.x.discovery.ServiceCache; import org.apache.curator.x.discovery.ServiceCacheBuilder; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; /** @@ -30,6 +32,7 @@ class ServiceCacheBuilderImpl implements ServiceCacheBuilder private ServiceDiscoveryImpl discovery; private String name; private ThreadFactory threadFactory; + private CloseableExecutorService executorService; ServiceCacheBuilderImpl(ServiceDiscoveryImpl discovery) { @@ -44,7 +47,14 @@ class ServiceCacheBuilderImpl implements ServiceCacheBuilder @Override public ServiceCache build() { - return new ServiceCacheImpl(discovery, name, threadFactory); + if (executorService != null) + { + return new ServiceCacheImpl(discovery, name, executorService); + } + else + { + return new ServiceCacheImpl(discovery, name, threadFactory); + } } /** @@ -70,6 +80,33 @@ class ServiceCacheBuilderImpl implements ServiceCacheBuilder public ServiceCacheBuilder threadFactory(ThreadFactory threadFactory) { this.threadFactory = threadFactory; + this.executorService = null; + return this; + } + + /** + * Optional executor service to use for the cache's background thread + * + * @param executorService executor service + * @return this + */ + @Override + public ServiceCacheBuilder executorService(ExecutorService executorService) { + this.executorService = new CloseableExecutorService(executorService); + this.threadFactory = null; + return this; + } + + /** + * Optional CloseableExecutorService to use for the cache's background thread + * + * @param executorService an instance of CloseableExecutorService + * @return this + */ + @Override + public ServiceCacheBuilder executorService(CloseableExecutorService executorService) { + this.executorService = executorService; + this.threadFactory = null; return this; } } http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java index 0269d24..b8f39d5 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java @@ -22,6 +22,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.ListenerContainer; @@ -36,6 +37,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicReference; @@ -54,15 +56,26 @@ public class ServiceCacheImpl implements ServiceCache, PathChildrenCacheLi STOPPED } + private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory) + { + Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null"); + return new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory)); + } + ServiceCacheImpl(ServiceDiscoveryImpl discovery, String name, ThreadFactory threadFactory) { + this(discovery, name, convertThreadFactory(threadFactory)); + } + + ServiceCacheImpl(ServiceDiscoveryImpl discovery, String name, CloseableExecutorService executorService) + { Preconditions.checkNotNull(discovery, "discovery cannot be null"); Preconditions.checkNotNull(name, "name cannot be null"); - Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null"); + Preconditions.checkNotNull(executorService, "executorService cannot be null"); this.discovery = discovery; - cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, threadFactory); + cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, false, executorService); cache.getListenable().addListener(this); } http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java index be114d4..5850961 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java @@ -20,6 +20,7 @@ package org.apache.curator.x.discovery; import com.google.common.collect.Lists; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.ExecuteCalledWatchingExecutorService; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; @@ -35,6 +36,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -255,4 +257,55 @@ public class TestServiceCache extends BaseClassForTests } } } + + @Test + public void testExecutorServiceIsInvoked() throws Exception { + List closeables = Lists.newArrayList(); + try { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + closeables.add(client); + client.start(); + + ServiceDiscovery discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build(); + closeables.add(discovery); + discovery.start(); + + ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor()); + Assert.assertFalse(exec.isExecuteCalled()); + + ServiceCache cache = discovery.serviceCacheBuilder().name("test").executorService(exec).build(); + closeables.add(cache); + cache.start(); + + final Semaphore semaphore = new Semaphore(0); + ServiceCacheListener listener = new ServiceCacheListener() + { + @Override + public void cacheChanged() + { + semaphore.release(); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + } + }; + cache.addListener(listener); + + ServiceInstance instance1 = ServiceInstance.builder().payload("thing").name("test").port(10064).build(); + discovery.registerService(instance1); + Assert.assertTrue(semaphore.tryAcquire(10, TimeUnit.SECONDS)); + + Assert.assertTrue(exec.isExecuteCalled()); + } + finally + { + Collections.reverse(closeables); + for ( Closeable c : closeables ) + { + CloseableUtils.closeQuietly(c); + } + } + } }