curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [19/32] git commit: Initial work at marking down instances
Date Mon, 22 Jul 2013 23:26:54 GMT
Initial work at marking down instances


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/d3e42d10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/d3e42d10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/d3e42d10

Branch: refs/heads/CURATOR-14
Commit: d3e42d1099afea852aaa8481df87afad139b86c1
Parents: 5ff48fc
Author: randgalt <randgalt@apache.org>
Authored: Sun Jun 23 13:41:51 2013 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Sun Jun 23 13:41:51 2013 -0500

----------------------------------------------------------------------
 .../x/discovery/DownInstanceManager.java        | 88 ++++++++++++++++++++
 .../x/discovery/ServiceProviderBuilder.java     |  2 +
 .../details/ServiceProviderBuilderImpl.java     | 11 ++-
 .../discovery/details/ServiceProviderImpl.java  | 36 +++++++-
 4 files changed, 134 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/d3e42d10/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstanceManager.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstanceManager.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstanceManager.java
new file mode 100644
index 0000000..5a46351
--- /dev/null
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstanceManager.java
@@ -0,0 +1,88 @@
+package org.apache.curator.x.discovery;
+
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+public class DownInstanceManager
+{
+    private final long timeoutMs;
+    private final DelayQueue<Entry> queue = new DelayQueue<Entry>();
+
+    private class Entry implements Delayed
+    {
+        private final long startMs = System.currentTimeMillis();
+        private final ServiceInstance<?> instance;
+
+        private Entry(ServiceInstance<?> instance)
+        {
+            this.instance = instance;
+        }
+
+        public long getDelay(TimeUnit unit)
+        {
+            long elapsedMs = System.currentTimeMillis() - startMs;
+            long remainingMs = timeoutMs - elapsedMs;
+            return (remainingMs > 0) ? TimeUnit.MILLISECONDS.convert(remainingMs, unit)
: 0;
+        }
+
+        @Override
+        // note: note semantically the same as equals()/hashCode()
+        public int compareTo(Delayed rhs)
+        {
+            long diff = getDelay(TimeUnit.MILLISECONDS) - rhs.getDelay(TimeUnit.MILLISECONDS);
+            return (diff < 0) ? -1 :((diff > 0) ? 1 : 0);
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if ( this == o )
+            {
+                return true;
+            }
+            if ( o == null || getClass() != o.getClass() )
+            {
+                return false;
+            }
+
+            Entry entry = (Entry)o;
+
+            //noinspection RedundantIfStatement
+            if ( !instance.equals(entry.instance) )
+            {
+                return false;
+            }
+
+            return true;
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return instance.hashCode();
+        }
+    }
+
+    public DownInstanceManager(long timeout, TimeUnit unit)
+    {
+        timeoutMs = unit.toMillis(timeout);
+    }
+
+    public void add(ServiceInstance<?> instance)
+    {
+        queue.add(new Entry(instance));
+    }
+
+    public boolean contains(ServiceInstance<?> instance)
+    {
+        //noinspection StatementWithEmptyBody
+        while ( queue.poll() != null ){}    // pull out expired entries
+        return queue.contains(new Entry(instance));
+    }
+
+    public boolean hasEntries()
+    {
+        return !queue.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/d3e42d10/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
index 00d8924..1c8a880 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
@@ -63,4 +63,6 @@ public interface ServiceProviderBuilder<T>
      * @return this
      */
     public ServiceProviderBuilder<T> refreshPaddingMs(int refreshPaddingMs);
+
+    public ServiceProviderBuilder<T> downInstanceManager(DownInstanceManager downInstanceManager);
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/d3e42d10/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
index 4f6c64c..2917270 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.x.discovery.details;
 
+import org.apache.curator.x.discovery.DownInstanceManager;
 import org.apache.curator.x.discovery.ProviderStrategy;
 import org.apache.curator.x.discovery.ServiceProvider;
 import org.apache.curator.x.discovery.ServiceProviderBuilder;
@@ -34,6 +35,7 @@ class ServiceProviderBuilderImpl<T> implements ServiceProviderBuilder<T>
     private ProviderStrategy<T> providerStrategy;
     private ThreadFactory threadFactory;
     private int refreshPaddingMs;
+    private DownInstanceManager downInstanceManager;
 
     /**
      * Allocate a new service provider based on the current builder settings
@@ -43,7 +45,7 @@ class ServiceProviderBuilderImpl<T> implements ServiceProviderBuilder<T>
     @Override
     public ServiceProvider<T> build()
     {
-        return new ServiceProviderImpl<T>(discovery, serviceName, providerStrategy,
threadFactory);
+        return new ServiceProviderImpl<T>(discovery, serviceName, providerStrategy,
threadFactory, downInstanceManager);
     }
 
     ServiceProviderBuilderImpl(ServiceDiscoveryImpl<T> discovery)
@@ -104,4 +106,11 @@ class ServiceProviderBuilderImpl<T> implements ServiceProviderBuilder<T>
         this.refreshPaddingMs = refreshPaddingMs;
         return this;
     }
+
+    @Override
+    public ServiceProviderBuilder<T> downInstanceManager(DownInstanceManager downInstanceManager)
+    {
+        this.downInstanceManager = downInstanceManager;
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/d3e42d10/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
index 39d6ca8..b7f237b 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
@@ -18,11 +18,16 @@
  */
 package org.apache.curator.x.discovery.details;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.curator.x.discovery.DownInstanceManager;
 import org.apache.curator.x.discovery.ProviderStrategy;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.apache.curator.x.discovery.ServiceProvider;
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.ThreadFactory;
 
 /**
@@ -32,14 +37,41 @@ import java.util.concurrent.ThreadFactory;
 public class ServiceProviderImpl<T> implements ServiceProvider<T>
 {
     private final ServiceCache<T> cache;
+    private final InstanceProvider<T> instanceProvider;
     private final ServiceDiscoveryImpl<T> discovery;
     private final ProviderStrategy<T> providerStrategy;
 
-    public ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String serviceName,
ProviderStrategy<T> providerStrategy, ThreadFactory threadFactory)
+    public ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String serviceName,
ProviderStrategy<T> providerStrategy, ThreadFactory threadFactory, final DownInstanceManager
downInstanceManager)
     {
         this.discovery = discovery;
         this.providerStrategy = providerStrategy;
         cache = discovery.serviceCacheBuilder().name(serviceName).threadFactory(threadFactory).build();
+
+        instanceProvider = new InstanceProvider<T>()
+        {
+            @Override
+            public List<ServiceInstance<T>> getInstances() throws Exception
+            {
+                List<ServiceInstance<T>> instances = cache.getInstances();
+                if ( (downInstanceManager != null) && downInstanceManager.hasEntries()
)
+                {
+                    Iterable<ServiceInstance<T>> filtered = Iterables.filter
+                    (
+                        instances,
+                        new Predicate<ServiceInstance<T>>()
+                        {
+                            @Override
+                            public boolean apply(ServiceInstance<T> instance)
+                            {
+                                return !downInstanceManager.contains(instance);
+                            }
+                        }
+                    );
+                    instances = ImmutableList.copyOf(filtered);
+                }
+                return instances;
+            }
+        };
     }
 
     /**
@@ -74,6 +106,6 @@ public class ServiceProviderImpl<T> implements ServiceProvider<T>
     @Override
     public ServiceInstance<T> getInstance() throws Exception
     {
-        return providerStrategy.getInstance(cache);
+        return providerStrategy.getInstance(instanceProvider);
     }
 }


Mime
View raw message