Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ACF6017C08 for ; Wed, 6 May 2015 04:13:39 +0000 (UTC) Received: (qmail 94757 invoked by uid 500); 6 May 2015 04:13:39 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 94680 invoked by uid 500); 6 May 2015 04:13:39 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 93361 invoked by uid 99); 6 May 2015 04:13:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 May 2015 04:13:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8CCEEE17C9; Wed, 6 May 2015 04:13:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.apache.org Date: Wed, 06 May 2015 04:14:04 -0000 Message-Id: <26f05789eeb447a891f9563a648732d0@git.apache.org> In-Reply-To: <3e8ce4f06069481e98d71f2596f23c85@git.apache.org> References: <3e8ce4f06069481e98d71f2596f23c85@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [29/50] curator git commit: moved holder into separate class so that it's easier to reason about and lock moved holder into separate class so that it's easier to reason about and lock Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d6a51f4a Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d6a51f4a Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d6a51f4a Branch: refs/heads/CURATOR-3.0 Commit: d6a51f4ae9a0365fba1ae77b9780d9bb43a79c72 Parents: c62b113 Author: randgalt Authored: Tue Apr 21 17:44:53 2015 -0500 Committer: randgalt Committed: Tue Apr 21 17:44:53 2015 -0500 ---------------------------------------------------------------------- .../curator/x/discovery/details/Holder.java | 173 +++++++++++++++++++ .../discovery/details/ServiceDiscoveryImpl.java | 120 ++++++------- .../discovery/details/TestServiceDiscovery.java | 3 - 3 files changed, 227 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/d6a51f4a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java new file mode 100644 index 0000000..69c7667 --- /dev/null +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java @@ -0,0 +1,173 @@ +package org.apache.curator.x.discovery.details; + +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.x.discovery.ServiceInstance; +import java.util.concurrent.locks.ReentrantLock; + +class Holder +{ + enum State + { + NEW, + REGISTERED, + UNREGISTERED + } + + private ServiceInstance service; + private NodeCache cache; + private State state; + private long stateChangeMs; + private final ReentrantLock lock = new ReentrantLock(); + + Holder(ServiceInstance service) + { + this.service = service; + setState(State.NEW); + } + + ServiceInstance getService() + { + lock.lock(); + try + { + return service; + } + finally + { + lock.unlock(); + } + } + + ServiceInstance getServiceIfRegistered() + { + lock.lock(); + try + { + return (state == State.REGISTERED) ? service : null; + } + finally + { + lock.unlock(); + } + } + + void setService(ServiceInstance service) + { + lock.lock(); + try + { + this.service = service; + } + finally + { + lock.unlock(); + } + } + + NodeCache getCache() + { + lock.lock(); + try + { + return cache; + } + finally + { + lock.unlock(); + } + } + + NodeCache getAndClearCache() + { + lock.lock(); + try + { + NodeCache localCache = cache; + cache = null; + return localCache; + } + finally + { + lock.unlock(); + } + } + + void setCache(NodeCache cache) + { + lock.lock(); + try + { + this.cache = cache; + } + finally + { + lock.unlock(); + } + } + + State getState() + { + lock.lock(); + try + { + return state; + } + finally + { + lock.unlock(); + } + } + + boolean isRegistered() + { + lock.lock(); + try + { + return state == State.REGISTERED; + } + finally + { + lock.unlock(); + } + } + + boolean isLapsedUnregistered(int cleanThresholdMs) + { + lock.lock(); + try + { + if ( state == State.UNREGISTERED ) + { + long elapsed = System.currentTimeMillis() - stateChangeMs; + if ( elapsed >= cleanThresholdMs ) + { + return true; + } + } + return false; + } + finally + { + lock.unlock(); + } + } + + void setState(State state) + { + lock.lock(); + try + { + this.state = state; + stateChangeMs = System.currentTimeMillis(); + } + finally + { + lock.unlock(); + } + } + + ReentrantLock getLock() + { + return lock; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/d6a51f4a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java index ba18e42..ec049fd 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java @@ -21,7 +21,9 @@ package org.apache.curator.x.discovery.details; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -53,12 +55,10 @@ import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; /** * A mechanism to register and query service instances using ZooKeeper */ -@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") public class ServiceDiscoveryImpl implements ServiceDiscovery { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -92,33 +92,6 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery private final int CLEAN_THRESHOLD_MS = Integer.getInteger("curator-discovery-clean-threshold-ms", (int)TimeUnit.MINUTES.toMillis(5)); - private enum State - { - NEW, - REGISTERED, - UNREGISTERED - } - - private static class Holder - { - private final AtomicReference> service = new AtomicReference>(); - private final AtomicReference cache = new AtomicReference(); - private final AtomicReference state = new AtomicReference(); - private final AtomicLong stateChangeMs = new AtomicLong(); - - public Holder(ServiceInstance instance) - { - service.set(instance); - setState(State.NEW); - } - - public void setState(State state) - { - this.state.set(state); - stateChangeMs.set(System.currentTimeMillis()); - } - } - /** * @param client the client * @param basePath base path to store data @@ -181,7 +154,7 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery } catch ( Exception e ) { - log.error("Could not unregister instance: " + holder.service.get().getName(), e); + log.error("Could not unregister instance: " + holder.getService().getName(), e); } } @@ -204,23 +177,28 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery } @Override - public void updateService(ServiceInstance service) throws Exception + public void updateService(final ServiceInstance service) throws Exception { clean(); - final Holder holder = getOrMakeHolder(service, null); - synchronized(holder) + Holder holder = getOrMakeHolder(service, null); + holder.getLock().lock(); + try { - if ( holder.state.get() == State.UNREGISTERED ) + if ( holder.getState() == Holder.State.UNREGISTERED ) { throw new Exception("Service has been unregistered: " + service); } - holder.service.set(service); + holder.setService(service); byte[] bytes = serializer.serialize(service); String path = pathForInstance(service.getName(), service.getId()); client.setData().forPath(path, bytes); } + finally + { + holder.getLock().unlock(); + } } @VisibleForTesting @@ -418,7 +396,21 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery @VisibleForTesting int debugServicesQty() { - return services.size(); + return Iterables.size + ( + Iterables.filter + ( + services.values(), + new Predicate>() + { + @Override + public boolean apply(Holder holder) + { + return holder.isRegistered(); + } + } + ) + ); } private List getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception @@ -459,28 +451,26 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery @VisibleForTesting ServiceInstance getRegisteredService(String id) { - final Holder holder = services.get(id); - if ( holder != null ) - { - synchronized(holder) - { - return (holder.state.get() == State.REGISTERED) ? holder.service.get() : null; - } - } - return null; + Holder holder = services.get(id); + return (holder != null) ? holder.getServiceIfRegistered() : null; } private void reRegisterServices() throws Exception { for ( final Holder holder : services.values() ) { - synchronized(holder) + holder.getLock().lock(); + try { - if ( holder.state.get() == State.REGISTERED ) + if ( holder.isRegistered() ) { - internalRegisterService(holder.service.get()); + internalRegisterService(holder.getService()); } } + finally + { + holder.getLock().unlock(); + } } } @@ -488,7 +478,7 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery { final NodeCache nodeCache = watchInstances ? new NodeCache(client, pathForInstance(instance.getName(), instance.getId())) : null; Holder holder = getOrMakeHolder(instance, nodeCache); - holder.setState(State.REGISTERED); + holder.setState(Holder.State.REGISTERED); if ( nodeCache != null ) { @@ -511,7 +501,7 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery Holder holder = services.get(newInstance.getId()); if ( holder != null ) { - holder.service.set(newInstance); + holder.setService(newInstance); } } else @@ -529,7 +519,7 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery Holder newHolder = new Holder(instance); Holder oldHolder = services.putIfAbsent(instance.getId(), newHolder); Holder useHolder = (oldHolder != null) ? oldHolder : newHolder; - useHolder.cache.set(nodeCache); + useHolder.setCache(nodeCache); return useHolder; } @@ -540,20 +530,13 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery long elpased = now - localLastCleanMs; if ( (elpased >= CLEAN_THRESHOLD_MS) && lastCleanMs.compareAndSet(localLastCleanMs, now + 1) ) { - Iterator> iterator = services.values().iterator(); + final Iterator> iterator = services.values().iterator(); while ( iterator.hasNext() ) { - final Holder holder = iterator.next(); - synchronized(holder) + Holder holder = iterator.next(); + if ( holder.isLapsedUnregistered(CLEAN_THRESHOLD_MS) ) { - if ( holder.state.get() == State.UNREGISTERED ) - { - long elapsed = System.currentTimeMillis() - holder.stateChangeMs.get(); - if ( elapsed >= CLEAN_THRESHOLD_MS ) - { - iterator.remove(); - } - } + iterator.remove(); } } } @@ -563,16 +546,17 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery { if ( holder != null ) { - synchronized(holder) + holder.getLock().lock(); + try { - holder.setState(State.UNREGISTERED); - NodeCache cache = holder.cache.getAndSet(null); + holder.setState(Holder.State.UNREGISTERED); + NodeCache cache = holder.getAndClearCache(); if ( cache != null ) { CloseableUtils.closeQuietly(cache); } - ServiceInstance service = holder.service.get(); + ServiceInstance service = holder.getService(); String path = pathForInstance(service.getName(), service.getId()); try { @@ -583,6 +567,10 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery // ignore } } + finally + { + holder.getLock().unlock(); + } } } } http://git-wip-us.apache.org/repos/asf/curator/blob/d6a51f4a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java index 2808c5c..f60773f 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java @@ -31,9 +31,6 @@ import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; -import org.apache.curator.x.discovery.details.InstanceSerializer; -import org.apache.curator.x.discovery.details.JsonInstanceSerializer; -import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl; import org.testng.Assert; import org.testng.annotations.Test; import java.io.Closeable;