Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3295B1076B for ; Mon, 22 Jul 2013 23:27:10 +0000 (UTC) Received: (qmail 65689 invoked by uid 500); 22 Jul 2013 23:27:10 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 65662 invoked by uid 500); 22 Jul 2013 23:27:10 -0000 Mailing-List: contact commits-help@curator.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.incubator.apache.org Delivered-To: mailing list commits@curator.incubator.apache.org Received: (qmail 65655 invoked by uid 99); 22 Jul 2013 23:27:10 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Jul 2013 23:27:10 +0000 X-ASF-Spam-Status: No, hits=-2001.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 22 Jul 2013 23:27:02 +0000 Received: (qmail 63147 invoked by uid 99); 22 Jul 2013 23:26:37 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Jul 2013 23:26:37 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 061008B13C8; Mon, 22 Jul 2013 23:26:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.incubator.apache.org Date: Mon, 22 Jul 2013 23:27:00 -0000 Message-Id: <9607ba799525475c999ad264ad2cd6eb@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [25/32] git commit: Finalized implementation and generalized concept of an instance filter. X-Virus-Checked: Checked by ClamAV on apache.org Finalized implementation and generalized concept of an instance filter. Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/86857cb5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/86857cb5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/86857cb5 Branch: refs/heads/CURATOR-14 Commit: 86857cb5aa5511240544f49848054cb6a6453ed4 Parents: 46d112f Author: randgalt Authored: Fri Jun 28 14:02:33 2013 -0500 Committer: randgalt Committed: Fri Jun 28 14:02:33 2013 -0500 ---------------------------------------------------------------------- .../curator/x/discovery/DownInstancePolicy.java | 21 +++++++++--- .../curator/x/discovery/InstanceFilter.java | 3 ++ .../curator/x/discovery/ServiceProvider.java | 6 ++++ .../x/discovery/ServiceProviderBuilder.java | 15 ++++++++- .../discovery/details/DownInstanceManager.java | 21 ++++++++---- .../details/ServiceProviderBuilderImpl.java | 2 -- .../details/TestDownInstanceManager.java | 35 ++++++++++++++------ 7 files changed, 78 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86857cb5/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstancePolicy.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstancePolicy.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstancePolicy.java index 0360200..53ad671 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstancePolicy.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstancePolicy.java @@ -21,23 +21,34 @@ package org.apache.curator.x.discovery; import java.util.concurrent.TimeUnit; +/** + * Abstraction for values that determine when an instance is down + */ public class DownInstancePolicy { private final long timeoutMs; - private final int threshold; + private final int errorThreshold; private static final long DEFAULT_TIMEOUT_MS = 30000; private static final int DEFAULT_THRESHOLD = 2; + /** + * Policy with default values + */ public DownInstancePolicy() { this(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS, DEFAULT_THRESHOLD); } - public DownInstancePolicy(long timeout, TimeUnit unit, int threshold) + /** + * @param timeout window of time for down instances + * @param unit time unit + * @param errorThreshold number of errors within time window that denotes a down instance + */ + public DownInstancePolicy(long timeout, TimeUnit unit, int errorThreshold) { this.timeoutMs = unit.toMillis(timeout); - this.threshold = threshold; + this.errorThreshold = errorThreshold; } public long getTimeoutMs() @@ -45,8 +56,8 @@ public class DownInstancePolicy return timeoutMs; } - public int getThreshold() + public int getErrorThreshold() { - return threshold; + return errorThreshold; } } http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86857cb5/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/InstanceFilter.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/InstanceFilter.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/InstanceFilter.java index 72cd410..ddccb4d 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/InstanceFilter.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/InstanceFilter.java @@ -20,6 +20,9 @@ package org.apache.curator.x.discovery; import com.google.common.base.Predicate; +/** + * Typedef for an Instance predicate + */ public interface InstanceFilter extends Predicate> { } http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86857cb5/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java index 44ee291..e8fcdcf 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java @@ -44,5 +44,11 @@ public interface ServiceProvider extends Closeable */ public ServiceInstance getInstance() throws Exception; + /** + * Take note of an error connecting to the given instance. The instance will potentially + * be marked as "down" depending on the {@link DownInstancePolicy}. + * + * @param instance instance that had an error + */ public void noteError(ServiceInstance instance); } http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86857cb5/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java index 4a0e636..2be311f 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java @@ -20,7 +20,6 @@ package org.apache.curator.x.discovery; import org.apache.curator.x.discovery.strategies.RoundRobinStrategy; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; public interface ServiceProviderBuilder { @@ -55,7 +54,21 @@ public interface ServiceProviderBuilder */ public ServiceProviderBuilder threadFactory(ThreadFactory threadFactory); + /** + * Set the down instance policy + * + * @param downInstancePolicy new policy + * @return this + */ public ServiceProviderBuilder downInstancePolicy(DownInstancePolicy downInstancePolicy); + /** + * Add an instance filter. NOTE: this does not remove previously added filters. i.e. + * a l;ist is created of all added filters. Filters are called in the order they were + * added. + * + * @param filter filter to add + * @return this + */ public ServiceProviderBuilder additionalFilter(InstanceFilter filter); } http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86857cb5/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/DownInstanceManager.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/DownInstanceManager.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/DownInstanceManager.java index a70f205..cda0968 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/DownInstanceManager.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/DownInstanceManager.java @@ -25,11 +25,13 @@ import org.apache.curator.x.discovery.ServiceInstance; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; class DownInstanceManager implements InstanceFilter { private final ConcurrentMap, Status> statuses = Maps.newConcurrentMap(); private final DownInstancePolicy downInstancePolicy; + private final AtomicLong lastPurge = new AtomicLong(System.currentTimeMillis()); private static class Status { @@ -37,11 +39,6 @@ class DownInstanceManager implements InstanceFilter private final AtomicInteger errorCount = new AtomicInteger(0); } - DownInstanceManager() - { - this(new DownInstancePolicy()); - } - DownInstanceManager(DownInstancePolicy downInstancePolicy) { this.downInstancePolicy = downInstancePolicy; @@ -63,11 +60,23 @@ class DownInstanceManager implements InstanceFilter purge(); Status status = statuses.get(instance); - return (status == null) || (status.errorCount.get() < downInstancePolicy.getThreshold()); + return (status == null) || (status.errorCount.get() < downInstancePolicy.getErrorThreshold()); } private void purge() { + long localLastPurge = lastPurge.get(); + long ticksSinceLastPurge = System.currentTimeMillis() - localLastPurge; + if ( ticksSinceLastPurge < (downInstancePolicy.getTimeoutMs() / 2) ) + { + return; + } + + if ( !lastPurge.compareAndSet(localLastPurge, System.currentTimeMillis()) ) + { + return; + } + for ( Map.Entry, Status> entry : statuses.entrySet() ) { long elapsedMs = System.currentTimeMillis() - entry.getValue().startMs; http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86857cb5/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java index 5c020c5..f094c59 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java @@ -25,10 +25,8 @@ import org.apache.curator.x.discovery.ProviderStrategy; import org.apache.curator.x.discovery.ServiceProvider; import org.apache.curator.x.discovery.ServiceProviderBuilder; import org.apache.curator.x.discovery.strategies.RoundRobinStrategy; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; /** * Builder for service providers http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86857cb5/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestDownInstanceManager.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestDownInstanceManager.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestDownInstanceManager.java index ff4d4f8..0b60655 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestDownInstanceManager.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestDownInstanceManager.java @@ -18,6 +18,7 @@ */ package org.apache.curator.x.discovery.details; +import org.apache.curator.x.discovery.DownInstancePolicy; import org.apache.curator.x.discovery.ServiceInstance; import org.testng.Assert; import org.testng.annotations.Test; @@ -25,13 +26,16 @@ import java.util.concurrent.TimeUnit; public class TestDownInstanceManager { + private static final DownInstancePolicy debugDownInstancePolicy = new DownInstancePolicy(1, TimeUnit.SECONDS, 1); + private static final DownInstancePolicy debugMultiDownInstancePolicy = new DownInstancePolicy(1, TimeUnit.SECONDS, 2); + @Test public void testBasic() throws Exception { ServiceInstance instance1 = ServiceInstance.builder().name("hey").id("1").build(); ServiceInstance instance2 = ServiceInstance.builder().name("hey").id("2").build(); - DownInstanceManager downInstanceManager = new DownInstanceManager(); + DownInstanceManager downInstanceManager = new DownInstanceManager(debugDownInstancePolicy); Assert.assertTrue(downInstanceManager.apply(instance1)); Assert.assertTrue(downInstanceManager.apply(instance2)); @@ -41,30 +45,39 @@ public class TestDownInstanceManager } @Test - public void testExpiration() throws Exception + public void testThreshold() throws Exception { ServiceInstance instance1 = ServiceInstance.builder().name("hey").id("1").build(); ServiceInstance instance2 = ServiceInstance.builder().name("hey").id("2").build(); - DownInstanceManager downInstanceManager = new DownInstanceManager(); + DownInstanceManager downInstanceManager = new DownInstanceManager(debugMultiDownInstancePolicy); + Assert.assertTrue(downInstanceManager.apply(instance1)); + Assert.assertTrue(downInstanceManager.apply(instance2)); downInstanceManager.add(instance1); - Assert.assertFalse(downInstanceManager.apply(instance1)); + Assert.assertTrue(downInstanceManager.apply(instance1)); Assert.assertTrue(downInstanceManager.apply(instance2)); - Thread.sleep(1000); - - Assert.assertTrue(downInstanceManager.apply(instance1)); + downInstanceManager.add(instance1); + Assert.assertFalse(downInstanceManager.apply(instance1)); Assert.assertTrue(downInstanceManager.apply(instance2)); } - //@Test - public void testInProvider() throws Exception + @Test + public void testExpiration() throws Exception { ServiceInstance instance1 = ServiceInstance.builder().name("hey").id("1").build(); ServiceInstance instance2 = ServiceInstance.builder().name("hey").id("2").build(); - DownInstanceManager downInstanceManager = new DownInstanceManager(); - ServiceDiscoveryImpl discovery = new ServiceDiscoveryImpl(null, null, null, null); + DownInstanceManager downInstanceManager = new DownInstanceManager(debugDownInstancePolicy); + + downInstanceManager.add(instance1); + Assert.assertFalse(downInstanceManager.apply(instance1)); + Assert.assertTrue(downInstanceManager.apply(instance2)); + + Thread.sleep(debugDownInstancePolicy.getTimeoutMs()); + + Assert.assertTrue(downInstanceManager.apply(instance1)); + Assert.assertTrue(downInstanceManager.apply(instance2)); } }