Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 11CD910769 for ; Mon, 22 Jul 2013 23:27:10 +0000 (UTC) Received: (qmail 65610 invoked by uid 500); 22 Jul 2013 23:27:10 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 65582 invoked by uid 500); 22 Jul 2013 23:27:09 -0000 Mailing-List: contact commits-help@curator.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.incubator.apache.org Delivered-To: mailing list commits@curator.incubator.apache.org Received: (qmail 65572 invoked by uid 99); 22 Jul 2013 23:27:09 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Jul 2013 23:27:09 +0000 X-ASF-Spam-Status: No, hits=-2001.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 22 Jul 2013 23:27:02 +0000 Received: (qmail 63146 invoked by uid 99); 22 Jul 2013 23:26:37 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Jul 2013 23:26:37 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D002D8B13BC; Mon, 22 Jul 2013 23:26:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.incubator.apache.org Date: Mon, 22 Jul 2013 23:26:54 -0000 Message-Id: <3517aae22ce049729e0a2171f93333fc@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [19/32] git commit: Initial work at marking down instances X-Virus-Checked: Checked by ClamAV on apache.org Initial work at marking down instances Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/d3e42d10 Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/d3e42d10 Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/d3e42d10 Branch: refs/heads/CURATOR-14 Commit: d3e42d1099afea852aaa8481df87afad139b86c1 Parents: 5ff48fc Author: randgalt Authored: Sun Jun 23 13:41:51 2013 -0500 Committer: randgalt Committed: Sun Jun 23 13:41:51 2013 -0500 ---------------------------------------------------------------------- .../x/discovery/DownInstanceManager.java | 88 ++++++++++++++++++++ .../x/discovery/ServiceProviderBuilder.java | 2 + .../details/ServiceProviderBuilderImpl.java | 11 ++- .../discovery/details/ServiceProviderImpl.java | 36 +++++++- 4 files changed, 134 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/d3e42d10/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstanceManager.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstanceManager.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstanceManager.java new file mode 100644 index 0000000..5a46351 --- /dev/null +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/DownInstanceManager.java @@ -0,0 +1,88 @@ +package org.apache.curator.x.discovery; + +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +public class DownInstanceManager +{ + private final long timeoutMs; + private final DelayQueue queue = new DelayQueue(); + + private class Entry implements Delayed + { + private final long startMs = System.currentTimeMillis(); + private final ServiceInstance instance; + + private Entry(ServiceInstance instance) + { + this.instance = instance; + } + + public long getDelay(TimeUnit unit) + { + long elapsedMs = System.currentTimeMillis() - startMs; + long remainingMs = timeoutMs - elapsedMs; + return (remainingMs > 0) ? TimeUnit.MILLISECONDS.convert(remainingMs, unit) : 0; + } + + @Override + // note: note semantically the same as equals()/hashCode() + public int compareTo(Delayed rhs) + { + long diff = getDelay(TimeUnit.MILLISECONDS) - rhs.getDelay(TimeUnit.MILLISECONDS); + return (diff < 0) ? -1 :((diff > 0) ? 1 : 0); + } + + @Override + public boolean equals(Object o) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + + Entry entry = (Entry)o; + + //noinspection RedundantIfStatement + if ( !instance.equals(entry.instance) ) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return instance.hashCode(); + } + } + + public DownInstanceManager(long timeout, TimeUnit unit) + { + timeoutMs = unit.toMillis(timeout); + } + + public void add(ServiceInstance instance) + { + queue.add(new Entry(instance)); + } + + public boolean contains(ServiceInstance instance) + { + //noinspection StatementWithEmptyBody + while ( queue.poll() != null ){} // pull out expired entries + return queue.contains(new Entry(instance)); + } + + public boolean hasEntries() + { + return !queue.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/d3e42d10/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java index 00d8924..1c8a880 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java @@ -63,4 +63,6 @@ public interface ServiceProviderBuilder * @return this */ public ServiceProviderBuilder refreshPaddingMs(int refreshPaddingMs); + + public ServiceProviderBuilder downInstanceManager(DownInstanceManager downInstanceManager); } http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/d3e42d10/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java index 4f6c64c..2917270 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java @@ -18,6 +18,7 @@ */ package org.apache.curator.x.discovery.details; +import org.apache.curator.x.discovery.DownInstanceManager; import org.apache.curator.x.discovery.ProviderStrategy; import org.apache.curator.x.discovery.ServiceProvider; import org.apache.curator.x.discovery.ServiceProviderBuilder; @@ -34,6 +35,7 @@ class ServiceProviderBuilderImpl implements ServiceProviderBuilder private ProviderStrategy providerStrategy; private ThreadFactory threadFactory; private int refreshPaddingMs; + private DownInstanceManager downInstanceManager; /** * Allocate a new service provider based on the current builder settings @@ -43,7 +45,7 @@ class ServiceProviderBuilderImpl implements ServiceProviderBuilder @Override public ServiceProvider build() { - return new ServiceProviderImpl(discovery, serviceName, providerStrategy, threadFactory); + return new ServiceProviderImpl(discovery, serviceName, providerStrategy, threadFactory, downInstanceManager); } ServiceProviderBuilderImpl(ServiceDiscoveryImpl discovery) @@ -104,4 +106,11 @@ class ServiceProviderBuilderImpl implements ServiceProviderBuilder this.refreshPaddingMs = refreshPaddingMs; return this; } + + @Override + public ServiceProviderBuilder downInstanceManager(DownInstanceManager downInstanceManager) + { + this.downInstanceManager = downInstanceManager; + return this; + } } http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/d3e42d10/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java index 39d6ca8..b7f237b 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java @@ -18,11 +18,16 @@ */ package org.apache.curator.x.discovery.details; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.curator.x.discovery.DownInstanceManager; import org.apache.curator.x.discovery.ProviderStrategy; import org.apache.curator.x.discovery.ServiceCache; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceProvider; import java.io.IOException; +import java.util.List; import java.util.concurrent.ThreadFactory; /** @@ -32,14 +37,41 @@ import java.util.concurrent.ThreadFactory; public class ServiceProviderImpl implements ServiceProvider { private final ServiceCache cache; + private final InstanceProvider instanceProvider; private final ServiceDiscoveryImpl discovery; private final ProviderStrategy providerStrategy; - public ServiceProviderImpl(ServiceDiscoveryImpl discovery, String serviceName, ProviderStrategy providerStrategy, ThreadFactory threadFactory) + public ServiceProviderImpl(ServiceDiscoveryImpl discovery, String serviceName, ProviderStrategy providerStrategy, ThreadFactory threadFactory, final DownInstanceManager downInstanceManager) { this.discovery = discovery; this.providerStrategy = providerStrategy; cache = discovery.serviceCacheBuilder().name(serviceName).threadFactory(threadFactory).build(); + + instanceProvider = new InstanceProvider() + { + @Override + public List> getInstances() throws Exception + { + List> instances = cache.getInstances(); + if ( (downInstanceManager != null) && downInstanceManager.hasEntries() ) + { + Iterable> filtered = Iterables.filter + ( + instances, + new Predicate>() + { + @Override + public boolean apply(ServiceInstance instance) + { + return !downInstanceManager.contains(instance); + } + } + ); + instances = ImmutableList.copyOf(filtered); + } + return instances; + } + }; } /** @@ -74,6 +106,6 @@ public class ServiceProviderImpl implements ServiceProvider @Override public ServiceInstance getInstance() throws Exception { - return providerStrategy.getInstance(cache); + return providerStrategy.getInstance(instanceProvider); } }