curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject curator git commit: vast simplication. Holder isn't needed. This is better
Date Mon, 27 Apr 2015 20:12:16 GMT
Repository: curator
Updated Branches:
  refs/heads/CURATOR-164 1a16f82ba -> 03879d1e6


vast simplication. Holder isn't needed. This is better


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/03879d1e
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/03879d1e
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/03879d1e

Branch: refs/heads/CURATOR-164
Commit: 03879d1e627e93bd867bb7a0fdfdd875b033560e
Parents: 1a16f82
Author: randgalt <randgalt@apache.org>
Authored: Mon Apr 27 15:12:11 2015 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Mon Apr 27 15:12:11 2015 -0500

----------------------------------------------------------------------
 .../curator/x/discovery/details/Holder.java     |  72 --------
 .../discovery/details/ServiceDiscoveryImpl.java | 163 +++++++------------
 .../discovery/details/TestServiceDiscovery.java |   5 -
 3 files changed, 57 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/03879d1e/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
deleted file mode 100644
index d088f8d..0000000
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/Holder.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package org.apache.curator.x.discovery.details;
-
-import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.apache.curator.x.discovery.ServiceInstance;
-
-class Holder<T>
-{
-    enum State
-    {
-        NEW,
-        REGISTERED,
-        UNREGISTERED
-    }
-
-    private ServiceInstance<T> service;
-    private NodeCache cache;
-    private State state;
-    private long stateChangeMs;
-
-    Holder(ServiceInstance<T> service, NodeCache nodeCache)
-    {
-        cache = nodeCache;
-        this.service = service;
-        setState(State.NEW);
-    }
-
-    synchronized ServiceInstance<T> getService()
-    {
-        return service;
-    }
-
-    synchronized ServiceInstance<T> getServiceIfRegistered()
-    {
-        return (state == State.REGISTERED) ? service : null;
-    }
-
-    synchronized void setService(ServiceInstance<T> service)
-    {
-        this.service = service;
-    }
-
-    synchronized NodeCache getAndClearCache()
-    {
-        NodeCache localCache = cache;
-        cache = null;
-        return localCache;
-    }
-
-    synchronized boolean isRegistered()
-    {
-        return state == State.REGISTERED;
-    }
-
-    synchronized boolean isLapsedUnregistered(int cleanThresholdMs)
-    {
-        if ( state == State.UNREGISTERED )
-        {
-            long elapsed = System.currentTimeMillis() - stateChangeMs;
-            if ( elapsed >= cleanThresholdMs )
-            {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    synchronized void setState(State state)
-    {
-        this.state = state;
-        stateChangeMs = System.currentTimeMillis();
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/03879d1e/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index a35cd3a..7b2a9ec 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -21,9 +21,7 @@ package org.apache.curator.x.discovery.details;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -50,11 +48,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * A mechanism to register and query service instances using ZooKeeper
@@ -66,11 +61,10 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     private final CuratorFramework client;
     private final String basePath;
     private final InstanceSerializer<T> serializer;
-    private final ConcurrentMap<String, Holder<T>> services = Maps.newConcurrentMap();
+    private final ConcurrentMap<String, Entry<T>> services = Maps.newConcurrentMap();
     private final Collection<ServiceCache<T>> caches = Sets.newSetFromMap(Maps.<ServiceCache<T>,
Boolean>newConcurrentMap());
     private final Collection<ServiceProvider<T>> providers = Sets.newSetFromMap(Maps.<ServiceProvider<T>,
Boolean>newConcurrentMap());
     private final boolean watchInstances;
-    private final AtomicLong lastCleanMs = new AtomicLong(System.currentTimeMillis());
     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
     {
         @Override
@@ -91,7 +85,16 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
         }
     };
 
-    private final int CLEAN_THRESHOLD_MS = Integer.getInteger("curator-discovery-clean-threshold-ms",
(int)TimeUnit.MINUTES.toMillis(5));
+    private static class Entry<T>
+    {
+        private volatile ServiceInstance<T> service;
+        private volatile NodeCache cache;
+
+        private Entry(ServiceInstance<T> service)
+        {
+            this.service = service;
+        }
+    }
 
     /**
      * @param client the client
@@ -108,7 +111,9 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
         this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null");
         if ( thisInstance != null )
         {
-            setService(thisInstance);
+            Entry<T> entry = new Entry<T>(thisInstance);
+            entry.cache = makeNodeCache(thisInstance);
+            services.put(thisInstance.getId(), entry);
         }
     }
 
@@ -143,11 +148,11 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
             CloseableUtils.closeQuietly(provider);
         }
 
-        for ( Holder<T> holder : services.values() )
+        for ( Entry<T> entry : services.values() )
         {
             try
             {
-                internalUnregisterService(holder);
+                internalUnregisterService(entry);
             }
             catch ( KeeperException.NoNodeException ignore )
             {
@@ -155,7 +160,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
             }
             catch ( Exception e )
             {
-                log.error("Could not unregister instance: " + holder.getService().getName(),
e);
+                log.error("Could not unregister instance: " + entry.service.getName(), e);
             }
         }
 
@@ -171,26 +176,30 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @Override
     public void registerService(ServiceInstance<T> service) throws Exception
     {
-        clean();
-
-        setService(service);
-        internalRegisterService(service);
+        Entry<T> newEntry = new Entry<T>(service);
+        Entry<T> oldEntry = services.putIfAbsent(service.getId(), newEntry);
+        Entry<T> useEntry = (oldEntry != null) ? oldEntry : newEntry;
+        synchronized(useEntry)
+        {
+            if ( useEntry == newEntry ) // i.e. is new
+            {
+                useEntry.cache = makeNodeCache(service);
+            }
+            internalRegisterService(service);
+        }
     }
 
     @Override
     public void updateService(final ServiceInstance<T> service) throws Exception
     {
-        clean();
-
-        final Holder<T> holder = getOrMakeHolder(service, null);
-        synchronized(holder)
+        Entry<T> entry = services.get(service.getId());
+        if ( entry == null )
         {
-            if ( !holder.isRegistered() )
-            {
-                throw new Exception("Service has been unregistered: " + service);
-            }
-
-            holder.setService(service);
+            throw new Exception("Service has been unregistered: " + service);
+        }
+        synchronized(entry)
+        {
+            entry.service = service;
             byte[] bytes = serializer.serialize(service);
             String path = pathForInstance(service.getName(), service.getId());
             client.setData().forPath(path, bytes);
@@ -229,9 +238,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @Override
     public void unregisterService(ServiceInstance<T> service) throws Exception
     {
-        clean();
-
-        internalUnregisterService(getOrMakeHolder(service, null));
+        internalUnregisterService(services.remove(service.getId()));
     }
 
     /**
@@ -242,8 +249,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @Override
     public ServiceProviderBuilder<T> serviceProviderBuilder()
     {
-        clean();
-
         return new ServiceProviderBuilderImpl<T>(this)
             .providerStrategy(new RoundRobinStrategy<T>())
             .threadFactory(ThreadUtils.newThreadFactory("ServiceProvider"));
@@ -257,8 +262,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @Override
     public ServiceCacheBuilder<T> serviceCacheBuilder()
     {
-        clean();
-
         return new ServiceCacheBuilderImpl<T>(this)
             .threadFactory(ThreadUtils.newThreadFactory("ServiceCache"));
     }
@@ -272,8 +275,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @Override
     public Collection<String> queryForNames() throws Exception
     {
-        clean();
-
         List<String> names = client.getChildren().forPath(basePath);
         return ImmutableList.copyOf(names);
     }
@@ -302,8 +303,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @Override
     public ServiceInstance<T> queryForInstance(String name, String id) throws Exception
     {
-        clean();
-
         String path = pathForInstance(name, id);
         try
         {
@@ -339,8 +338,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
 
     CuratorFramework getClient()
     {
-        clean();
-
         return client;
     }
 
@@ -356,8 +353,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
 
     List<ServiceInstance<T>> queryForInstances(String name, Watcher watcher)
throws Exception
     {
-        clean();
-
         ImmutableList.Builder<ServiceInstance<T>> builder = ImmutableList.builder();
         String path = pathForName(name);
         List<String> instanceIds;
@@ -392,21 +387,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @VisibleForTesting
     int debugServicesQty()
     {
-        return Iterables.size
-            (
-                Iterables.filter
-                    (
-                        services.values(),
-                        new Predicate<Holder<T>>()
-                        {
-                            @Override
-                            public boolean apply(Holder<T> holder)
-                            {
-                                return holder.isRegistered();
-                            }
-                        }
-                    )
-            );
+        return services.size();
     }
 
     private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse)
throws Exception
@@ -447,30 +428,24 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @VisibleForTesting
     ServiceInstance<T> getRegisteredService(String id)
     {
-        Holder<T> holder = services.get(id);
-        return (holder != null) ? holder.getServiceIfRegistered() : null;
+        Entry<T> entry = services.get(id);
+        return (entry != null) ? entry.service : null;
     }
 
     private void reRegisterServices() throws Exception
     {
-        for ( final Holder<T> holder : services.values() )
+        for ( final Entry<T> entry : services.values() )
         {
-            synchronized(holder)
+            synchronized(entry)
             {
-                if ( holder.isRegistered() )
-                {
-                    internalRegisterService(holder.getService());
-                }
+                internalRegisterService(entry.service);
             }
         }
     }
 
-    private void setService(final ServiceInstance<T> instance)
+    private NodeCache makeNodeCache(final ServiceInstance<T> instance)
     {
         final NodeCache nodeCache = watchInstances ? new NodeCache(client, pathForInstance(instance.getName(),
instance.getId())) : null;
-        Holder<T> holder = getOrMakeHolder(instance, nodeCache);
-        holder.setState(Holder.State.REGISTERED);
-
         if ( nodeCache != null )
         {
             try
@@ -489,10 +464,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
                     if ( nodeCache.getCurrentData() != null )
                     {
                         ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData());
-                        Holder<T> holder = services.get(newInstance.getId());
-                        if ( holder != null )
+                        Entry<T> entry = services.get(newInstance.getId());
+                        if ( entry != null )
                         {
-                            holder.setService(newInstance);
+                            synchronized(entry)
+                            {
+                                entry.service = newInstance;
+                            }
                         }
                     }
                     else
@@ -503,49 +481,22 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
             };
             nodeCache.getListenable().addListener(listener);
         }
+        return nodeCache;
     }
 
-    private Holder<T> getOrMakeHolder(ServiceInstance<T> instance, NodeCache
nodeCache)
-    {
-        Holder<T> newHolder = new Holder<T>(instance, nodeCache);
-        Holder<T> oldHolder = services.putIfAbsent(instance.getId(), newHolder);
-        return (oldHolder != null) ? oldHolder : newHolder;
-    }
-
-    private void clean()
-    {
-        long localLastCleanMs = lastCleanMs.get();
-        long now = System.currentTimeMillis();
-        long elpased = now - localLastCleanMs;
-        if ( (elpased >= CLEAN_THRESHOLD_MS) && lastCleanMs.compareAndSet(localLastCleanMs,
now + 1) )
-        {
-            final Iterator<Holder<T>> iterator = services.values().iterator();
-            while ( iterator.hasNext() )
-            {
-                Holder<T> holder = iterator.next();
-                if ( holder.isLapsedUnregistered(CLEAN_THRESHOLD_MS) )
-                {
-                    iterator.remove();
-                }
-            }
-        }
-    }
-
-    private void internalUnregisterService(final Holder<T> holder) throws Exception
+    private void internalUnregisterService(final Entry<T> entry) throws Exception
     {
-        if ( holder != null )
+        if ( entry != null )
         {
-            synchronized(holder)
+            synchronized(entry)
             {
-                holder.setState(Holder.State.UNREGISTERED);
-                NodeCache cache = holder.getAndClearCache();
-                if ( cache != null )
+                if ( entry.cache != null )
                 {
-                    CloseableUtils.closeQuietly(cache);
+                    CloseableUtils.closeQuietly(entry.cache);
+                    entry.cache = null;
                 }
 
-                ServiceInstance<T> service = holder.getService();
-                String path = pathForInstance(service.getName(), service.getId());
+                String path = pathForInstance(entry.service.getName(), entry.service.getId());
                 try
                 {
                     client.delete().guaranteed().forPath(path);

http://git-wip-us.apache.org/repos/asf/curator/blob/03879d1e/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index f60773f..8b1e5fc 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -367,7 +367,6 @@ public class TestServiceDiscovery extends BaseClassForTests
     @Test
     public void testCleaning() throws Exception
     {
-        System.setProperty("curator-discovery-clean-threshold-ms", "10");
         List<Closeable> closeables = Lists.newArrayList();
         try
         {
@@ -381,14 +380,10 @@ public class TestServiceDiscovery extends BaseClassForTests
             discovery.start();
             discovery.unregisterService(instance);
 
-            Thread.sleep(100);
-
-            discovery.queryForNames();  // causes a clean
             Assert.assertEquals(((ServiceDiscoveryImpl)discovery).debugServicesQty(), 0);
         }
         finally
         {
-            System.clearProperty("curator-discovery-clean-threshold-ms");
             Collections.reverse(closeables);
             for ( Closeable c : closeables )
             {


Mime
View raw message