curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [7/9] git commit: Finalized implementation and generalized concept of an instance filter.
Date Sun, 07 Jul 2013 17:27:02 GMT
Finalized implementation and generalized concept of an instance filter.


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/86857cb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/86857cb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/86857cb5

Branch: refs/heads/master
Commit: 86857cb5aa5511240544f49848054cb6a6453ed4
Parents: 46d112f
Author: randgalt <randgalt@apache.org>
Authored: Fri Jun 28 14:02:33 2013 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Fri Jun 28 14:02:33 2013 -0500

----------------------------------------------------------------------
 .../curator/x/discovery/DownInstancePolicy.java | 21 +++++++++---
 .../curator/x/discovery/InstanceFilter.java     |  3 ++
 .../curator/x/discovery/ServiceProvider.java    |  6 ++++
 .../x/discovery/ServiceProviderBuilder.java     | 15 ++++++++-
 .../discovery/details/DownInstanceManager.java  | 21 ++++++++----
 .../details/ServiceProviderBuilderImpl.java     |  2 --
 .../details/TestDownInstanceManager.java        | 35 ++++++++++++++------
 7 files changed, 78 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86857cb5/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstancePolicy.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstancePolicy.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstancePolicy.java
index 0360200..53ad671 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstancePolicy.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstancePolicy.java
@@ -21,23 +21,34 @@ package org.apache.curator.x.discovery;
 
 import java.util.concurrent.TimeUnit;
 
+/**
+ * Abstraction for values that determine when an instance is down
+ */
 public class DownInstancePolicy
 {
     private final long timeoutMs;
-    private final int threshold;
+    private final int errorThreshold;
 
     private static final long DEFAULT_TIMEOUT_MS = 30000;
     private static final int DEFAULT_THRESHOLD = 2;
 
+    /**
+     * Policy with default values
+     */
     public DownInstancePolicy()
     {
         this(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS, DEFAULT_THRESHOLD);
     }
 
-    public DownInstancePolicy(long timeout, TimeUnit unit, int threshold)
+    /**
+     * @param timeout window of time for down instances
+     * @param unit time unit
+     * @param errorThreshold number of errors within time window that denotes a down instance
+     */
+    public DownInstancePolicy(long timeout, TimeUnit unit, int errorThreshold)
     {
         this.timeoutMs = unit.toMillis(timeout);
-        this.threshold = threshold;
+        this.errorThreshold = errorThreshold;
     }
 
     public long getTimeoutMs()
@@ -45,8 +56,8 @@ public class DownInstancePolicy
         return timeoutMs;
     }
 
-    public int getThreshold()
+    public int getErrorThreshold()
     {
-        return threshold;
+        return errorThreshold;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86857cb5/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/InstanceFilter.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/InstanceFilter.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/InstanceFilter.java
index 72cd410..ddccb4d 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/InstanceFilter.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/InstanceFilter.java
@@ -20,6 +20,9 @@ package org.apache.curator.x.discovery;
 
 import com.google.common.base.Predicate;
 
+/**
+ * Typedef for an Instance predicate
+ */
 public interface InstanceFilter<T> extends Predicate<ServiceInstance<T>>
 {
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86857cb5/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
index 44ee291..e8fcdcf 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
@@ -44,5 +44,11 @@ public interface ServiceProvider<T> extends Closeable
      */
     public ServiceInstance<T> getInstance() throws Exception;
 
+    /**
+     * Take note of an error connecting to the given instance. The instance will potentially
+     * be marked as "down" depending on the {@link DownInstancePolicy}.
+     *
+     * @param instance instance that had an error
+     */
     public void noteError(ServiceInstance<T> instance);
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86857cb5/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
index 4a0e636..2be311f 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
@@ -20,7 +20,6 @@ package org.apache.curator.x.discovery;
 
 import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 
 public interface ServiceProviderBuilder<T>
 {
@@ -55,7 +54,21 @@ public interface ServiceProviderBuilder<T>
      */
     public ServiceProviderBuilder<T> threadFactory(ThreadFactory threadFactory);
 
+    /**
+     * Set the down instance policy
+     *
+     * @param downInstancePolicy new policy
+     * @return this
+     */
     public ServiceProviderBuilder<T> downInstancePolicy(DownInstancePolicy downInstancePolicy);
 
+    /**
+     * Add an instance filter. NOTE: this does not remove previously added filters. i.e.
+     * a l;ist is created of all added filters. Filters are called in the order they were
+     * added.
+     *
+     * @param filter filter to add
+     * @return this
+     */
     public ServiceProviderBuilder<T> additionalFilter(InstanceFilter<T> filter);
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86857cb5/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/DownInstanceManager.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/DownInstanceManager.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/DownInstanceManager.java
index a70f205..cda0968 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/DownInstanceManager.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/DownInstanceManager.java
@@ -25,11 +25,13 @@ import org.apache.curator.x.discovery.ServiceInstance;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 class DownInstanceManager<T> implements InstanceFilter<T>
 {
     private final ConcurrentMap<ServiceInstance<?>, Status> statuses = Maps.newConcurrentMap();
     private final DownInstancePolicy downInstancePolicy;
+    private final AtomicLong lastPurge = new AtomicLong(System.currentTimeMillis());
 
     private static class Status
     {
@@ -37,11 +39,6 @@ class DownInstanceManager<T> implements InstanceFilter<T>
         private final AtomicInteger errorCount = new AtomicInteger(0);
     }
 
-    DownInstanceManager()
-    {
-        this(new DownInstancePolicy());
-    }
-
     DownInstanceManager(DownInstancePolicy downInstancePolicy)
     {
         this.downInstancePolicy = downInstancePolicy;
@@ -63,11 +60,23 @@ class DownInstanceManager<T> implements InstanceFilter<T>
         purge();
 
         Status status = statuses.get(instance);
-        return (status == null) || (status.errorCount.get() < downInstancePolicy.getThreshold());
+        return (status == null) || (status.errorCount.get() < downInstancePolicy.getErrorThreshold());
     }
 
     private void purge()
     {
+        long localLastPurge = lastPurge.get();
+        long ticksSinceLastPurge = System.currentTimeMillis() - localLastPurge;
+        if ( ticksSinceLastPurge < (downInstancePolicy.getTimeoutMs() / 2) )
+        {
+            return;
+        }
+
+        if ( !lastPurge.compareAndSet(localLastPurge, System.currentTimeMillis()) )
+        {
+            return;
+        }
+
         for ( Map.Entry<ServiceInstance<?>, Status> entry : statuses.entrySet()
)
         {
             long elapsedMs = System.currentTimeMillis() - entry.getValue().startMs;

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86857cb5/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
index 5c020c5..f094c59 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
@@ -25,10 +25,8 @@ import org.apache.curator.x.discovery.ProviderStrategy;
 import org.apache.curator.x.discovery.ServiceProvider;
 import org.apache.curator.x.discovery.ServiceProviderBuilder;
 import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Builder for service providers

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86857cb5/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestDownInstanceManager.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestDownInstanceManager.java
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestDownInstanceManager.java
index ff4d4f8..0b60655 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestDownInstanceManager.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestDownInstanceManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.x.discovery.details;
 
+import org.apache.curator.x.discovery.DownInstancePolicy;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -25,13 +26,16 @@ import java.util.concurrent.TimeUnit;
 
 public class TestDownInstanceManager
 {
+    private static final DownInstancePolicy debugDownInstancePolicy = new DownInstancePolicy(1,
TimeUnit.SECONDS, 1);
+    private static final DownInstancePolicy debugMultiDownInstancePolicy = new DownInstancePolicy(1,
TimeUnit.SECONDS, 2);
+
     @Test
     public void testBasic() throws Exception
     {
         ServiceInstance<Void> instance1 = ServiceInstance.<Void>builder().name("hey").id("1").build();
         ServiceInstance<Void> instance2 = ServiceInstance.<Void>builder().name("hey").id("2").build();
 
-        DownInstanceManager<Void> downInstanceManager = new DownInstanceManager<Void>();
+        DownInstanceManager<Void> downInstanceManager = new DownInstanceManager<Void>(debugDownInstancePolicy);
         Assert.assertTrue(downInstanceManager.apply(instance1));
         Assert.assertTrue(downInstanceManager.apply(instance2));
 
@@ -41,30 +45,39 @@ public class TestDownInstanceManager
     }
 
     @Test
-    public void testExpiration() throws Exception
+    public void testThreshold() throws Exception
     {
         ServiceInstance<Void> instance1 = ServiceInstance.<Void>builder().name("hey").id("1").build();
         ServiceInstance<Void> instance2 = ServiceInstance.<Void>builder().name("hey").id("2").build();
 
-        DownInstanceManager<Void> downInstanceManager = new DownInstanceManager<Void>();
+        DownInstanceManager<Void> downInstanceManager = new DownInstanceManager<Void>(debugMultiDownInstancePolicy);
+        Assert.assertTrue(downInstanceManager.apply(instance1));
+        Assert.assertTrue(downInstanceManager.apply(instance2));
 
         downInstanceManager.add(instance1);
-        Assert.assertFalse(downInstanceManager.apply(instance1));
+        Assert.assertTrue(downInstanceManager.apply(instance1));
         Assert.assertTrue(downInstanceManager.apply(instance2));
 
-        Thread.sleep(1000);
-
-        Assert.assertTrue(downInstanceManager.apply(instance1));
+        downInstanceManager.add(instance1);
+        Assert.assertFalse(downInstanceManager.apply(instance1));
         Assert.assertTrue(downInstanceManager.apply(instance2));
     }
 
-    //@Test
-    public void testInProvider() throws Exception
+    @Test
+    public void testExpiration() throws Exception
     {
         ServiceInstance<Void> instance1 = ServiceInstance.<Void>builder().name("hey").id("1").build();
         ServiceInstance<Void> instance2 = ServiceInstance.<Void>builder().name("hey").id("2").build();
 
-        DownInstanceManager<Void> downInstanceManager = new DownInstanceManager<Void>();
-        ServiceDiscoveryImpl<Void> discovery = new ServiceDiscoveryImpl<Void>(null,
null, null, null);
+        DownInstanceManager<Void> downInstanceManager = new DownInstanceManager<Void>(debugDownInstancePolicy);
+
+        downInstanceManager.add(instance1);
+        Assert.assertFalse(downInstanceManager.apply(instance1));
+        Assert.assertTrue(downInstanceManager.apply(instance2));
+
+        Thread.sleep(debugDownInstancePolicy.getTimeoutMs());
+
+        Assert.assertTrue(downInstanceManager.apply(instance1));
+        Assert.assertTrue(downInstanceManager.apply(instance2));
     }
 }


Mime
View raw message