Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1F8F517356 for ; Mon, 27 Apr 2015 22:09:54 +0000 (UTC) Received: (qmail 27267 invoked by uid 500); 27 Apr 2015 22:09:54 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 27222 invoked by uid 500); 27 Apr 2015 22:09:54 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 27210 invoked by uid 99); 27 Apr 2015 22:09:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Apr 2015 22:09:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D4609E10A5; Mon, 27 Apr 2015 22:09:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.apache.org Date: Mon, 27 Apr 2015 22:09:53 -0000 Message-Id: <6d208746131d46098ce206cdaf8c97d0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/12] curator git commit: when unregistering a service remove it from the internal map first and then delete (guaranteed) the node Repository: curator Updated Branches: refs/heads/master 6e16d0d5c -> 06af6ff1c when unregistering a service remove it from the internal map first and then delete (guaranteed) the node Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/915d83ad Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/915d83ad Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/915d83ad Branch: refs/heads/master Commit: 915d83add911d624ab3584508f566344827fbae6 Parents: c65e091 Author: randgalt Authored: Tue Apr 21 12:31:17 2015 -0500 Committer: randgalt Committed: Tue Apr 21 12:31:17 2015 -0500 ---------------------------------------------------------------------- .../discovery/details/ServiceDiscoveryImpl.java | 62 ++++++++-------- .../x/discovery/TestServiceDiscovery.java | 74 +++++++++++++++++++- 2 files changed, 103 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/915d83ad/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java index 41c5d77..824eb75 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.x.discovery.details; import com.google.common.annotations.VisibleForTesting; @@ -149,7 +150,7 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery ServiceInstance service = it.next(); String path = pathForInstance(service.getName(), service.getId()); boolean doRemove = true; - + try { client.delete().forPath(path); @@ -163,13 +164,13 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery doRemove = false; log.error("Could not unregister instance: " + service.getName(), e); } - + if ( doRemove ) { it.remove(); } } - + client.getConnectionStateListenable().removeListener(connectionStateListener); } @@ -189,25 +190,25 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery @Override public void updateService(ServiceInstance service) throws Exception { - byte[] bytes = serializer.serialize(service); - String path = pathForInstance(service.getName(), service.getId()); + byte[] bytes = serializer.serialize(service); + String path = pathForInstance(service.getName(), service.getId()); client.setData().forPath(path, bytes); services.put(service.getId(), service); } @VisibleForTesting - protected void internalRegisterService(ServiceInstance service) throws Exception + protected void internalRegisterService(ServiceInstance service) throws Exception { - byte[] bytes = serializer.serialize(service); - String path = pathForInstance(service.getName(), service.getId()); + byte[] bytes = serializer.serialize(service); + String path = pathForInstance(service.getName(), service.getId()); - final int MAX_TRIES = 2; - boolean isDone = false; + final int MAX_TRIES = 2; + boolean isDone = false; for ( int i = 0; !isDone && (i < MAX_TRIES); ++i ) { try { - CreateMode mode = (service.getServiceType() == ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT; + CreateMode mode = (service.getServiceType() == ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT; client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, bytes); isDone = true; } @@ -225,18 +226,19 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery * @throws Exception errors */ @Override - public void unregisterService(ServiceInstance service) throws Exception + public void unregisterService(ServiceInstance service) throws Exception { - String path = pathForInstance(service.getName(), service.getId()); + services.remove(service.getId()); + + String path = pathForInstance(service.getName(), service.getId()); try { - client.delete().forPath(path); + client.delete().guaranteed().forPath(path); } catch ( KeeperException.NoNodeException ignore ) { // ignore } - services.remove(service.getId()); } /** @@ -271,9 +273,9 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery * @throws Exception errors */ @Override - public Collection queryForNames() throws Exception + public Collection queryForNames() throws Exception { - List names = client.getChildren().forPath(basePath); + List names = client.getChildren().forPath(basePath); return ImmutableList.copyOf(names); } @@ -285,7 +287,7 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery * @throws Exception errors */ @Override - public Collection> queryForInstances(String name) throws Exception + public Collection> queryForInstances(String name) throws Exception { return queryForInstances(name, null); } @@ -301,10 +303,10 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery @Override public ServiceInstance queryForInstance(String name, String id) throws Exception { - String path = pathForInstance(name, id); + String path = pathForInstance(name, id); try { - byte[] bytes = client.getData().forPath(path); + byte[] bytes = client.getData().forPath(path); return serializer.deserialize(bytes); } catch ( KeeperException.NoNodeException ignore ) @@ -314,22 +316,22 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery return null; } - void cacheOpened(ServiceCache cache) + void cacheOpened(ServiceCache cache) { caches.add(cache); } - void cacheClosed(ServiceCache cache) + void cacheClosed(ServiceCache cache) { caches.remove(cache); } - void providerOpened(ServiceProvider provider) + void providerOpened(ServiceProvider provider) { providers.add(provider); } - void providerClosed(ServiceProvider cache) + void providerClosed(ServiceProvider cache) { providers.remove(cache); } @@ -339,7 +341,7 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery return client; } - String pathForName(String name) + String pathForName(String name) { return ZKPaths.makePath(basePath, name); } @@ -349,11 +351,11 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery return serializer; } - List> queryForInstances(String name, Watcher watcher) throws Exception + List> queryForInstances(String name, Watcher watcher) throws Exception { - ImmutableList.Builder> builder = ImmutableList.builder(); - String path = pathForName(name); - List instanceIds; + ImmutableList.Builder> builder = ImmutableList.builder(); + String path = pathForName(name); + List instanceIds; if ( watcher != null ) { @@ -384,7 +386,7 @@ public class ServiceDiscoveryImpl implements ServiceDiscovery private List getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception { - List instanceIds; + List instanceIds; try { instanceIds = client.getChildren().usingWatcher(watcher).forPath(path); http://git-wip-us.apache.org/repos/asf/curator/blob/915d83ad/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java index 6eb9ebb..40d491a 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java @@ -28,6 +28,7 @@ import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.KillSession; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.discovery.details.InstanceSerializer; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl; import org.testng.Assert; @@ -37,7 +38,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; public class TestServiceDiscovery extends BaseClassForTests { @@ -269,15 +272,15 @@ public class TestServiceDiscovery extends BaseClassForTests public void testNoServerOnStart() throws Exception { server.stop(); - List closeables = Lists.newArrayList(); + List closeables = Lists.newArrayList(); try { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); closeables.add(client); client.start(); - ServiceInstance instance = ServiceInstance.builder().payload("thing").name("test").port(10064).build(); - ServiceDiscovery discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build(); + ServiceInstance instance = ServiceInstance.builder().payload("thing").name("test").port(10064).build(); + ServiceDiscovery discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build(); closeables.add(discovery); discovery.start(); @@ -297,4 +300,69 @@ public class TestServiceDiscovery extends BaseClassForTests } } } + + // CURATOR-164 + @Test + public void testUnregisterService() throws Exception + { + final String name = "name"; + + final CountDownLatch restartLatch = new CountDownLatch(1); + List closeables = Lists.newArrayList(); + + InstanceSerializer slowSerializer = new JsonInstanceSerializer(String.class) + { + private boolean first = true; + + @Override + public byte[] serialize(ServiceInstance instance) throws Exception + { + if ( first ) + { + System.out.println("Serializer first registration."); + first = false; + } + else + { + System.out.println("Waiting for reconnect to finish."); + // Simulate the serialize method being slow. + // This could just be a timed wait, but that's kind of non-deterministic. + restartLatch.await(); + } + return super.serialize(instance); + } + }; + + try + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + closeables.add(client); + client.start(); + + ServiceInstance instance = ServiceInstance.builder().payload("thing").name(name).port(10064).build(); + ServiceDiscovery discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).build(); + closeables.add(discovery); + discovery.start(); + + Assert.assertFalse(discovery.queryForInstances(name).isEmpty(), "Service should start registered."); + + server.stop(); + server.restart(); + + discovery.unregisterService(instance); + restartLatch.countDown(); + + TimeUnit.SECONDS.sleep(1); // Wait for the rest of registration to finish. + + Assert.assertTrue(discovery.queryForInstances(name).isEmpty(), "Service should have unregistered."); + } + finally + { + Collections.reverse(closeables); + for ( Closeable c : closeables ) + { + CloseableUtils.closeQuietly(c); + } + } + } }