Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E16D817D9F for ; Wed, 6 May 2015 04:56:00 +0000 (UTC) Received: (qmail 68302 invoked by uid 500); 6 May 2015 04:56:00 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 68228 invoked by uid 500); 6 May 2015 04:56:00 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 66929 invoked by uid 99); 6 May 2015 04:55:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 May 2015 04:55:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 65F48E3598; Wed, 6 May 2015 04:55:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.apache.org Date: Wed, 06 May 2015 04:56:30 -0000 Message-Id: In-Reply-To: <34422cdf5d34428891ea339c0ed20424@git.apache.org> References: <34422cdf5d34428891ea339c0ed20424@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [33/50] curator git commit: vast simplication. Holder isn't needed. This is better vast simplication. Holder isn't needed. This is better Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/03879d1e Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/03879d1e Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/03879d1e Branch: refs/heads/CURATOR-160 Commit: 03879d1e627e93bd867bb7a0fdfdd875b033560e Parents: 1a16f82 Author: randgalt Authored: Mon Apr 27 15:12:11 2015 -0500 Committer: randgalt Committed: Mon Apr 27 15:12:11 2015 -0500 ---------------------------------------------------------------------- .../curator/x/discovery/details/Holder.java | 72 -------- .../discovery/details/ServiceDiscoveryImpl.java | 163 +++++++------------ .../discovery/details/TestServiceDiscovery.java | 5 - 3 files changed, 57 insertions(+), 183 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/03879d1e/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java deleted file mode 100644 index d088f8d..0000000 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java +++ /dev/null @@ -1,72 +0,0 @@ -package org.apache.curator.x.discovery.details; - -import org.apache.curator.framework.recipes.cache.NodeCache; -import org.apache.curator.x.discovery.ServiceInstance; - -class Holder -{ - enum State - { - NEW, - REGISTERED, - UNREGISTERED - } - - private ServiceInstance service; - private NodeCache cache; - private State state; - private long stateChangeMs; - - Holder(ServiceInstance service, NodeCache nodeCache) - { - cache = nodeCache; - this.service = service; - setState(State.NEW); - } - - synchronized ServiceInstance getService() - { - return service; - } - - synchronized ServiceInstance getServiceIfRegistered() - { - return (state == State.REGISTERED) ? service : null; - } - - synchronized void setService(ServiceInstance service) - { - this.service = service; - } - - synchronized NodeCache getAndClearCache() - { - NodeCache localCache = cache; - cache = null; - return localCache; - } - - synchronized boolean isRegistered() - { - return state == State.REGISTERED; - } - - synchronized boolean isLapsedUnregistered(int cleanThresholdMs) - { - if ( state == State.UNREGISTERED ) - { - long elapsed = System.currentTimeMillis() - stateChangeMs; - if ( elapsed >= cleanThresholdMs ) - { - return true; - } - } - return false; - } - - synchronized void setState(State state) - { - this.state = state; - stateChangeMs = System.currentTimeMillis(); - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/03879d1e/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java index a35cd3a..7b2a9ec 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java @@ -21,9 +21,7 @@ package org.apache.curator.x.discovery.details; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -50,11 +48,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; /** * A mechanism to register and query service instances using ZooKeeper @@ -66,11 +61,10 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery private final CuratorFramework client; private final String basePath; private final InstanceSerializer serializer; - private final ConcurrentMap> services = Maps.newConcurrentMap(); + private final ConcurrentMap> services = Maps.newConcurrentMap(); private final Collection> caches = Sets.newSetFromMap(Maps., Boolean>newConcurrentMap()); private final Collection> providers = Sets.newSetFromMap(Maps., Boolean>newConcurrentMap()); private final boolean watchInstances; - private final AtomicLong lastCleanMs = new AtomicLong(System.currentTimeMillis()); private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override @@ -91,7 +85,16 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery } }; - private final int CLEAN_THRESHOLD_MS = Integer.getInteger("curator-discovery-clean-threshold-ms", (int)TimeUnit.MINUTES.toMillis(5)); + private static class Entry + { + private volatile ServiceInstance service; + private volatile NodeCache cache; + + private Entry(ServiceInstance service) + { + this.service = service; + } + } /** * @param client the client @@ -108,7 +111,9 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null"); if ( thisInstance != null ) { - setService(thisInstance); + Entry entry = new Entry(thisInstance); + entry.cache = makeNodeCache(thisInstance); + services.put(thisInstance.getId(), entry); } } @@ -143,11 +148,11 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery CloseableUtils.closeQuietly(provider); } - for ( Holder holder : services.values() ) + for ( Entry entry : services.values() ) { try { - internalUnregisterService(holder); + internalUnregisterService(entry); } catch ( KeeperException.NoNodeException ignore ) { @@ -155,7 +160,7 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery } catch ( Exception e ) { - log.error("Could not unregister instance: " + holder.getService().getName(), e); + log.error("Could not unregister instance: " + entry.service.getName(), e); } } @@ -171,26 +176,30 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery @Override public void registerService(ServiceInstance service) throws Exception { - clean(); - - setService(service); - internalRegisterService(service); + Entry newEntry = new Entry(service); + Entry oldEntry = services.putIfAbsent(service.getId(), newEntry); + Entry useEntry = (oldEntry != null) ? oldEntry : newEntry; + synchronized(useEntry) + { + if ( useEntry == newEntry ) // i.e. is new + { + useEntry.cache = makeNodeCache(service); + } + internalRegisterService(service); + } } @Override public void updateService(final ServiceInstance service) throws Exception { - clean(); - - final Holder holder = getOrMakeHolder(service, null); - synchronized(holder) + Entry entry = services.get(service.getId()); + if ( entry == null ) { - if ( !holder.isRegistered() ) - { - throw new Exception("Service has been unregistered: " + service); - } - - holder.setService(service); + throw new Exception("Service has been unregistered: " + service); + } + synchronized(entry) + { + entry.service = service; byte[] bytes = serializer.serialize(service); String path = pathForInstance(service.getName(), service.getId()); client.setData().forPath(path, bytes); @@ -229,9 +238,7 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery @Override public void unregisterService(ServiceInstance service) throws Exception { - clean(); - - internalUnregisterService(getOrMakeHolder(service, null)); + internalUnregisterService(services.remove(service.getId())); } /** @@ -242,8 +249,6 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery @Override public ServiceProviderBuilder serviceProviderBuilder() { - clean(); - return new ServiceProviderBuilderImpl(this) .providerStrategy(new RoundRobinStrategy()) .threadFactory(ThreadUtils.newThreadFactory("ServiceProvider")); @@ -257,8 +262,6 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery @Override public ServiceCacheBuilder serviceCacheBuilder() { - clean(); - return new ServiceCacheBuilderImpl(this) .threadFactory(ThreadUtils.newThreadFactory("ServiceCache")); } @@ -272,8 +275,6 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery @Override public Collection queryForNames() throws Exception { - clean(); - List names = client.getChildren().forPath(basePath); return ImmutableList.copyOf(names); } @@ -302,8 +303,6 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery @Override public ServiceInstance queryForInstance(String name, String id) throws Exception { - clean(); - String path = pathForInstance(name, id); try { @@ -339,8 +338,6 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery CuratorFramework getClient() { - clean(); - return client; } @@ -356,8 +353,6 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery List> queryForInstances(String name, Watcher watcher) throws Exception { - clean(); - ImmutableList.Builder> builder = ImmutableList.builder(); String path = pathForName(name); List instanceIds; @@ -392,21 +387,7 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery @VisibleForTesting int debugServicesQty() { - return Iterables.size - ( - Iterables.filter - ( - services.values(), - new Predicate>() - { - @Override - public boolean apply(Holder holder) - { - return holder.isRegistered(); - } - } - ) - ); + return services.size(); } private List getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception @@ -447,30 +428,24 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery @VisibleForTesting ServiceInstance getRegisteredService(String id) { - Holder holder = services.get(id); - return (holder != null) ? holder.getServiceIfRegistered() : null; + Entry entry = services.get(id); + return (entry != null) ? entry.service : null; } private void reRegisterServices() throws Exception { - for ( final Holder holder : services.values() ) + for ( final Entry entry : services.values() ) { - synchronized(holder) + synchronized(entry) { - if ( holder.isRegistered() ) - { - internalRegisterService(holder.getService()); - } + internalRegisterService(entry.service); } } } - private void setService(final ServiceInstance instance) + private NodeCache makeNodeCache(final ServiceInstance instance) { final NodeCache nodeCache = watchInstances ? new NodeCache(client, pathForInstance(instance.getName(), instance.getId())) : null; - Holder holder = getOrMakeHolder(instance, nodeCache); - holder.setState(Holder.State.REGISTERED); - if ( nodeCache != null ) { try @@ -489,10 +464,13 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery if ( nodeCache.getCurrentData() != null ) { ServiceInstance newInstance = serializer.deserialize(nodeCache.getCurrentData().getData()); - Holder holder = services.get(newInstance.getId()); - if ( holder != null ) + Entry entry = services.get(newInstance.getId()); + if ( entry != null ) { - holder.setService(newInstance); + synchronized(entry) + { + entry.service = newInstance; + } } } else @@ -503,49 +481,22 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery }; nodeCache.getListenable().addListener(listener); } + return nodeCache; } - private Holder getOrMakeHolder(ServiceInstance instance, NodeCache nodeCache) - { - Holder newHolder = new Holder(instance, nodeCache); - Holder oldHolder = services.putIfAbsent(instance.getId(), newHolder); - return (oldHolder != null) ? oldHolder : newHolder; - } - - private void clean() - { - long localLastCleanMs = lastCleanMs.get(); - long now = System.currentTimeMillis(); - long elpased = now - localLastCleanMs; - if ( (elpased >= CLEAN_THRESHOLD_MS) && lastCleanMs.compareAndSet(localLastCleanMs, now + 1) ) - { - final Iterator> iterator = services.values().iterator(); - while ( iterator.hasNext() ) - { - Holder holder = iterator.next(); - if ( holder.isLapsedUnregistered(CLEAN_THRESHOLD_MS) ) - { - iterator.remove(); - } - } - } - } - - private void internalUnregisterService(final Holder holder) throws Exception + private void internalUnregisterService(final Entry entry) throws Exception { - if ( holder != null ) + if ( entry != null ) { - synchronized(holder) + synchronized(entry) { - holder.setState(Holder.State.UNREGISTERED); - NodeCache cache = holder.getAndClearCache(); - if ( cache != null ) + if ( entry.cache != null ) { - CloseableUtils.closeQuietly(cache); + CloseableUtils.closeQuietly(entry.cache); + entry.cache = null; } - ServiceInstance service = holder.getService(); - String path = pathForInstance(service.getName(), service.getId()); + String path = pathForInstance(entry.service.getName(), entry.service.getId()); try { client.delete().guaranteed().forPath(path); http://git-wip-us.apache.org/repos/asf/curator/blob/03879d1e/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java index f60773f..8b1e5fc 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java @@ -367,7 +367,6 @@ public class TestServiceDiscovery extends BaseClassForTests @Test public void testCleaning() throws Exception { - System.setProperty("curator-discovery-clean-threshold-ms", "10"); List closeables = Lists.newArrayList(); try { @@ -381,14 +380,10 @@ public class TestServiceDiscovery extends BaseClassForTests discovery.start(); discovery.unregisterService(instance); - Thread.sleep(100); - - discovery.queryForNames(); // causes a clean Assert.assertEquals(((ServiceDiscoveryImpl)discovery).debugServicesQty(), 0); } finally { - System.clearProperty("curator-discovery-clean-threshold-ms"); Collections.reverse(closeables); for ( Closeable c : closeables ) {