curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [2/2] curator git commit: Added a way to watch registered services so that tools such as admin consoles can change values and have SD recognize the changes
Date Sun, 04 Jan 2015 22:03:41 GMT
Added a way to watch registered services so that tools such as admin consoles can change values
and have SD recognize the changes


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/37dc4478
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/37dc4478
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/37dc4478

Branch: refs/heads/CURATOR-176
Commit: 37dc4478597c6db0dcab83b636318b51bb389c58
Parents: 742e092
Author: randgalt <randgalt@apache.org>
Authored: Sun Jan 4 17:03:29 2015 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Sun Jan 4 17:03:29 2015 -0500

----------------------------------------------------------------------
 .../x/discovery/ServiceDiscoveryBuilder.java    | 45 +++++++----
 .../discovery/details/ServiceDiscoveryImpl.java | 59 +++++++++++++--
 .../x/discovery/TestServiceDiscovery.java       | 79 ++++++++++----------
 .../discovery/details/TestWatchedInstances.java | 76 +++++++++++++++++++
 4 files changed, 200 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/37dc4478/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscoveryBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscoveryBuilder.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscoveryBuilder.java
index 2b972ca..e25fc67 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscoveryBuilder.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscoveryBuilder.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.x.discovery;
 
 import org.apache.curator.framework.CuratorFramework;
@@ -25,20 +26,21 @@ import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl;
 
 public class ServiceDiscoveryBuilder<T>
 {
-    private CuratorFramework        client;
-    private String                  basePath;
-    private InstanceSerializer<T>   serializer;
-    private ServiceInstance<T>      thisInstance;
-    private Class<T>                payloadClass;
+    private CuratorFramework client;
+    private String basePath;
+    private InstanceSerializer<T> serializer;
+    private ServiceInstance<T> thisInstance;
+    private Class<T> payloadClass;
+    private boolean watchInstances = false;
 
     /**
      * Return a new builder.
      *
      * @param payloadClass the class of the payload of your service instance (you can use
{@link Void}
-     * if your instances don't need a payload)
+     *                     if your instances don't need a payload)
      * @return new builder
      */
-    public static<T> ServiceDiscoveryBuilder<T>     builder(Class<T> payloadClass)
+    public static <T> ServiceDiscoveryBuilder<T> builder(Class<T> payloadClass)
     {
         return new ServiceDiscoveryBuilder<T>(payloadClass);
     }
@@ -49,12 +51,13 @@ public class ServiceDiscoveryBuilder<T>
      *
      * @return new service discovery
      */
-    public ServiceDiscovery<T>      build()
+    public ServiceDiscovery<T> build()
     {
-        if ( serializer == null ) {
+        if ( serializer == null )
+        {
             serializer(new JsonInstanceSerializer<T>(payloadClass));
         }
-        return new ServiceDiscoveryImpl<T>(client, basePath, serializer, thisInstance);
+        return new ServiceDiscoveryImpl<T>(client, basePath, serializer, thisInstance,
watchInstances);
     }
 
     /**
@@ -63,7 +66,7 @@ public class ServiceDiscoveryBuilder<T>
      * @param client client
      * @return this
      */
-    public ServiceDiscoveryBuilder<T>   client(CuratorFramework client)
+    public ServiceDiscoveryBuilder<T> client(CuratorFramework client)
     {
         this.client = client;
         return this;
@@ -75,7 +78,7 @@ public class ServiceDiscoveryBuilder<T>
      * @param basePath base path
      * @return this
      */
-    public ServiceDiscoveryBuilder<T>   basePath(String basePath)
+    public ServiceDiscoveryBuilder<T> basePath(String basePath)
     {
         this.basePath = basePath;
         return this;
@@ -87,7 +90,7 @@ public class ServiceDiscoveryBuilder<T>
      * @param serializer the serializer
      * @return this
      */
-    public ServiceDiscoveryBuilder<T>   serializer(InstanceSerializer<T> serializer)
+    public ServiceDiscoveryBuilder<T> serializer(InstanceSerializer<T> serializer)
     {
         this.serializer = serializer;
         return this;
@@ -99,12 +102,26 @@ public class ServiceDiscoveryBuilder<T>
      * @param thisInstance initial instance
      * @return this
      */
-    public ServiceDiscoveryBuilder<T>   thisInstance(ServiceInstance<T> thisInstance)
+    public ServiceDiscoveryBuilder<T> thisInstance(ServiceInstance<T> thisInstance)
     {
         this.thisInstance = thisInstance;
         return this;
     }
 
+    /**
+     * Optional - if true, watches for changes to locally registered instances
+     * (via {@link #thisInstance(ServiceInstance)} or {@link ServiceDiscovery#registerService(ServiceInstance)}).
+     * If the data for instances changes, they are reloaded.
+     *
+     * @param watchInstances true to watch instances
+     * @return this
+     */
+    public ServiceDiscoveryBuilder<T> watchInstances(boolean watchInstances)
+    {
+        this.watchInstances = watchInstances;
+        return this;
+    }
+
     ServiceDiscoveryBuilder(Class<T> payloadClass)
     {
         this.payloadClass = payloadClass;

http://git-wip-us.apache.org/repos/asf/curator/blob/37dc4478/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index a55f678..ca8eabe 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -24,11 +24,14 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.discovery.ServiceCache;
@@ -41,12 +44,11 @@ import org.apache.curator.x.discovery.ServiceType;
 import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -64,6 +66,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     private final Map<String, ServiceInstance<T>> services = Maps.newConcurrentMap();
     private final Collection<ServiceCache<T>> caches = Sets.newSetFromMap(Maps.<ServiceCache<T>,
Boolean>newConcurrentMap());
     private final Collection<ServiceProvider<T>> providers = Sets.newSetFromMap(Maps.<ServiceProvider<T>,
Boolean>newConcurrentMap());
+    private final boolean watchInstances;
     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
     {
         @Override
@@ -89,9 +92,11 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
      * @param basePath base path to store data
      * @param serializer serializer for instances (e.g. {@link JsonInstanceSerializer})
      * @param thisInstance instance that represents the service that is running. The instance
will get auto-registered
+     * @param watchInstances if true, watches for changes to locally registered instances
      */
-    public ServiceDiscoveryImpl(CuratorFramework client, String basePath, InstanceSerializer<T>
serializer, ServiceInstance<T> thisInstance)
+    public ServiceDiscoveryImpl(CuratorFramework client, String basePath, InstanceSerializer<T>
serializer, ServiceInstance<T> thisInstance, boolean watchInstances)
     {
+        this.watchInstances = watchInstances;
         this.client = Preconditions.checkNotNull(client, "client cannot be null");
         this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot be null");
         this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null");
@@ -192,6 +197,10 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
             {
                 CreateMode      mode = (service.getServiceType() == ServiceType.DYNAMIC)
? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
                 client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, bytes);
+                if ( watchInstances )
+                {
+                    resetWatchedInstance(service);
+                }
                 isDone = true;
             }
             catch ( KeeperException.NodeExistsException e )
@@ -365,6 +374,37 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
         return builder.build();
     }
 
+    private void resetWatchedInstance(final ServiceInstance<T> service) throws Exception
+    {
+        CuratorWatcher watcher = new CuratorWatcher()
+        {
+            @Override
+            public void process(WatchedEvent event) throws Exception
+            {
+                if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
+                {
+                    resetWatchedInstance(service);
+                }
+            }
+        };
+
+        BackgroundCallback callback = new BackgroundCallback()
+        {
+            @Override
+            public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception
+            {
+                if ( event.getType() == CuratorEventType.GET_DATA )
+                {
+                    ServiceInstance<T> newInstance = serializer.deserialize(event.getData());
+                    services.put(newInstance.getId(), newInstance);
+                }
+            }
+        };
+
+        String path = pathForInstance(service.getName(), service.getId());
+        client.getData().usingWatcher(watcher).inBackground(callback).forPath(path);
+    }
+
     private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse)
throws Exception
     {
         List<String>    instanceIds;
@@ -394,11 +434,18 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
         return instanceIds;
     }
 
-    private String  pathForInstance(String name, String id) throws UnsupportedEncodingException
+    @VisibleForTesting
+    String pathForInstance(String name, String id)
     {
         return ZKPaths.makePath(pathForName(name), id);
     }
 
+    @VisibleForTesting
+    ServiceInstance<T> getRegisteredService(String id)
+    {
+        return services.get(id);
+    }
+
     private void reRegisterServices() throws Exception
     {
         for ( ServiceInstance<T> service : services.values() )

http://git-wip-us.apache.org/repos/asf/curator/blob/37dc4478/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
index 73de7fc..0465599 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
@@ -16,17 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.x.discovery;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.KillSession;
 import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
 import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl;
 import org.testng.Assert;
@@ -40,7 +41,7 @@ import java.util.concurrent.Semaphore;
 
 public class TestServiceDiscovery extends BaseClassForTests
 {
-    private static final Comparator<ServiceInstance<Void>>      comparator =
new Comparator<ServiceInstance<Void>>()
+    private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>()
     {
         @Override
         public int compare(ServiceInstance<Void> o1, ServiceInstance<Void> o2)
@@ -50,20 +51,20 @@ public class TestServiceDiscovery extends BaseClassForTests
     };
 
     @Test
-    public void         testCrashedServerMultiInstances() throws Exception
+    public void testCrashedServerMultiInstances() throws Exception
     {
-        List<Closeable>     closeables = Lists.newArrayList();
+        List<Closeable> closeables = Lists.newArrayList();
         try
         {
-            Timing              timing = new Timing();
-            CuratorFramework    client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+            Timing timing = new Timing();
+            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
             closeables.add(client);
             client.start();
 
-            final Semaphore             semaphore = new Semaphore(0);
-            ServiceInstance<String>     instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceInstance<String>     instance2 = ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build();
-            ServiceDiscovery<String>    discovery = new ServiceDiscoveryImpl<String>(client,
"/test", new JsonInstanceSerializer<String>(String.class), instance1)
+            final Semaphore semaphore = new Semaphore(0);
+            ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            ServiceInstance<String> instance2 = ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build();
+            ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client,
"/test", new JsonInstanceSerializer<String>(String.class), instance1, false)
             {
                 @Override
                 protected void internalRegisterService(ServiceInstance<String> service)
throws Exception
@@ -98,19 +99,19 @@ public class TestServiceDiscovery extends BaseClassForTests
     }
 
     @Test
-    public void         testCrashedServer() throws Exception
+    public void testCrashedServer() throws Exception
     {
-        List<Closeable>     closeables = Lists.newArrayList();
+        List<Closeable> closeables = Lists.newArrayList();
         try
         {
-            Timing              timing = new Timing();
-            CuratorFramework    client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+            Timing timing = new Timing();
+            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
             closeables.add(client);
             client.start();
 
-            final Semaphore             semaphore = new Semaphore(0);
-            ServiceInstance<String>     instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceDiscovery<String>    discovery = new ServiceDiscoveryImpl<String>(client,
"/test", new JsonInstanceSerializer<String>(String.class), instance)
+            final Semaphore semaphore = new Semaphore(0);
+            ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client,
"/test", new JsonInstanceSerializer<String>(String.class), instance, false)
             {
                 @Override
                 protected void internalRegisterService(ServiceInstance<String> service)
throws Exception
@@ -144,24 +145,24 @@ public class TestServiceDiscovery extends BaseClassForTests
     }
 
     @Test
-    public void         testCrashedInstance() throws Exception
+    public void testCrashedInstance() throws Exception
     {
-        List<Closeable>     closeables = Lists.newArrayList();
+        List<Closeable> closeables = Lists.newArrayList();
         try
         {
-            Timing              timing = new Timing();
+            Timing timing = new Timing();
 
-            CuratorFramework    client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
             closeables.add(client);
             client.start();
 
-            ServiceInstance<String>     instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceDiscovery<String>    discovery = new ServiceDiscoveryImpl<String>(client,
"/test", new JsonInstanceSerializer<String>(String.class), instance);
+            ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client,
"/test", new JsonInstanceSerializer<String>(String.class), instance, false);
             closeables.add(discovery);
             discovery.start();
 
             Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
-            
+
             KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
             Thread.sleep(timing.multiple(1.5).session());
 
@@ -178,24 +179,24 @@ public class TestServiceDiscovery extends BaseClassForTests
     }
 
     @Test
-    public void         testMultipleInstances() throws Exception
+    public void testMultipleInstances() throws Exception
     {
-        final String        SERVICE_ONE = "one";
-        final String        SERVICE_TWO = "two";
+        final String SERVICE_ONE = "one";
+        final String SERVICE_TWO = "two";
 
-        List<Closeable>     closeables = Lists.newArrayList();
+        List<Closeable> closeables = Lists.newArrayList();
         try
         {
             CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
             closeables.add(client);
             client.start();
 
-            ServiceInstance<Void>       s1_i1 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
-            ServiceInstance<Void>       s1_i2 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
-            ServiceInstance<Void>       s2_i1 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
-            ServiceInstance<Void>       s2_i2 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
+            ServiceInstance<Void> s1_i1 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
+            ServiceInstance<Void> s1_i2 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
+            ServiceInstance<Void> s2_i1 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
+            ServiceInstance<Void> s2_i2 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
 
-            ServiceDiscovery<Void>      discovery = ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build();
+            ServiceDiscovery<Void> discovery = ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build();
             closeables.add(discovery);
             discovery.start();
 
@@ -234,17 +235,17 @@ public class TestServiceDiscovery extends BaseClassForTests
     }
 
     @Test
-    public void         testBasic() throws Exception
+    public void testBasic() throws Exception
     {
-        List<Closeable>     closeables = Lists.newArrayList();
+        List<Closeable> closeables = Lists.newArrayList();
         try
         {
             CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
             closeables.add(client);
             client.start();
-            
-            ServiceInstance<String>     instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceDiscovery<String>    discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
+
+            ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
             closeables.add(discovery);
             discovery.start();
 

http://git-wip-us.apache.org/repos/asf/curator/blob/37dc4478/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
new file mode 100644
index 0000000..0a19b41
--- /dev/null
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
@@ -0,0 +1,76 @@
+package org.apache.curator.x.discovery.details;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class TestWatchedInstances extends BaseClassForTests
+{
+    @Test
+    public void testWatchedInstances() throws Exception
+    {
+        Timing timing = new Timing();
+        List<Closeable> closeables = Lists.newArrayList();
+        try
+        {
+            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+            closeables.add(client);
+            client.start();
+
+            ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder
+                .builder(String.class)
+                .basePath("/test")
+                .client(client)
+                .thisInstance(instance)
+                .watchInstances(true)
+                .build();
+            closeables.add(discovery);
+            discovery.start();
+
+            Assert.assertEquals(discovery.queryForNames(), Arrays.asList("test"));
+
+            List<ServiceInstance<String>> list = Lists.newArrayList();
+            list.add(instance);
+            Assert.assertEquals(discovery.queryForInstances("test"), list);
+
+            ServiceDiscoveryImpl<String> discoveryImpl = (ServiceDiscoveryImpl<String>)discovery;
+            ServiceInstance<String> changedInstance = ServiceInstance.<String>builder()
+                .id(instance.getId())
+                .address(instance.getAddress())
+                .payload("different")
+                .name(instance.getName())
+                .port(instance.getPort())
+                .build();
+            String path = discoveryImpl.pathForInstance("test", instance.getId());
+            byte[] bytes = discoveryImpl.getSerializer().serialize(changedInstance);
+            client.setData().forPath(path, bytes);
+            timing.sleepABit();
+
+            ServiceInstance<String> registeredService = discoveryImpl.getRegisteredService(instance.getId());
+            Assert.assertNotNull(registeredService);
+            Assert.assertEquals(registeredService.getPayload(), "different");
+        }
+        finally
+        {
+            Collections.reverse(closeables);
+            for ( Closeable c : closeables )
+            {
+                CloseableUtils.closeQuietly(c);
+            }
+        }
+    }
+}


Mime
View raw message