curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject git commit: Added APIs for getting/setting shared values with versions for better utility
Date Thu, 02 Oct 2014 18:04:35 GMT
Repository: curator
Updated Branches:
  refs/heads/CURATOR-151 [created] 0c2a5a5b4


Added APIs for getting/setting shared values with versions for better utility


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0c2a5a5b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0c2a5a5b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0c2a5a5b

Branch: refs/heads/CURATOR-151
Commit: 0c2a5a5b4f6cc07b9b1cd9c83ad77c3fa0170779
Parents: b66e9b6
Author: randgalt <randgalt@apache.org>
Authored: Thu Oct 2 13:04:26 2014 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Thu Oct 2 13:04:26 2014 -0500

----------------------------------------------------------------------
 .../framework/recipes/shared/SharedCount.java   |  28 ++-
 .../recipes/shared/SharedCountReader.java       |   9 +-
 .../framework/recipes/shared/SharedValue.java   | 138 ++++++++-----
 .../recipes/shared/SharedValueReader.java       |   7 +
 .../recipes/shared/VersionedValue.java          |  32 +++
 .../recipes/shared/TestSharedCount.java         | 195 +++++++++++++------
 6 files changed, 298 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/0c2a5a5b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
index b2d1218..49b2f2d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.framework.recipes.shared;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.curator.framework.CuratorFramework;
@@ -54,6 +55,13 @@ public class SharedCount implements Closeable, SharedCountReader, Listenable<Sha
         return fromBytes(sharedValue.getValue());
     }
 
+    @Override
+    public VersionedValue<Integer> getVersionedValue()
+    {
+        VersionedValue<byte[]> localValue = sharedValue.getVersionedValue();
+        return new VersionedValue<Integer>(localValue.getVersion(), fromBytes(localValue.getValue()));
+    }
+
     /**
      * Change the shared count value irrespective of its previous state
      *
@@ -81,6 +89,23 @@ public class SharedCount implements Closeable, SharedCountReader, Listenable<Sha
         return sharedValue.trySetValue(toBytes(newCount));
     }
 
+    /**
+     * Changes the shared count only if its value has not changed since the version specified
by
+     * newCount. If the count has changed, the value is not set and this client's view of
the
+     * value is updated. i.e. if the count is not successful you can get the updated value
+     * by calling {@link #getCount()}.
+     *
+     * @param newCount the new value to attempt
+     * @return true if the change attempt was successful, false if not. If the change
+     * was not successful, {@link #getCount()} will return the updated value
+     * @throws Exception ZK errors, interruptions, etc.
+     */
+    public boolean  trySetCount(VersionedValue<Integer> newCount) throws Exception
+    {
+        VersionedValue<byte[]> copy = new VersionedValue<byte[]>(newCount.getVersion(),
toBytes(newCount.getValue()));
+        return sharedValue.trySetValue(copy);
+    }
+
     @Override
     public void     addListener(SharedCountListener listener)
     {
@@ -131,7 +156,8 @@ public class SharedCount implements Closeable, SharedCountReader, Listenable<Sha
         sharedValue.close();
     }
 
-    private static byte[]   toBytes(int value)
+    @VisibleForTesting
+    static byte[]   toBytes(int value)
     {
         byte[]      bytes = new byte[4];
         ByteBuffer.wrap(bytes).putInt(value);

http://git-wip-us.apache.org/repos/asf/curator/blob/0c2a5a5b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCountReader.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCountReader.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCountReader.java
index 3d3d6a4..cae31bb 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCountReader.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCountReader.java
@@ -30,5 +30,12 @@ public interface SharedCountReader extends Listenable<SharedCountListener>
      *
      * @return count
      */
-    int      getCount();
+    public int      getCount();
+
+    /**
+     * Return the current count and version
+     *
+     * @return count and version
+     */
+    public VersionedValue<Integer> getVersionedValue();
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/0c2a5a5b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 5c4b53b..80fa53f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.shared;
 
 import com.google.common.base.Function;
@@ -25,6 +26,7 @@ import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.PathUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.data.Stat;
@@ -34,7 +36,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.curator.utils.PathUtils;
 
 /**
  * Manages a shared value. All clients watching the same path will have the up-to-date
@@ -42,14 +43,15 @@ import org.apache.curator.utils.PathUtils;
  */
 public class SharedValue implements Closeable, SharedValueReader
 {
-    private final Logger                    log = LoggerFactory.getLogger(getClass());
-    private final ListenerContainer<SharedValueListener>    listeners = new ListenerContainer<SharedValueListener>();
-    private final CuratorFramework          client;
-    private final String                    path;
-    private final byte[]                    seedValue;
-    private final AtomicReference<State>    state = new AtomicReference<State>(State.LATENT);
-
-    private final CuratorWatcher            watcher = new CuratorWatcher()
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final ListenerContainer<SharedValueListener> listeners = new ListenerContainer<SharedValueListener>();
+    private final CuratorFramework client;
+    private final String path;
+    private final byte[] seedValue;
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final AtomicReference<VersionedValue<byte[]>> currentValue;
+
+    private final CuratorWatcher watcher = new CuratorWatcher()
     {
         @Override
         public void process(WatchedEvent event) throws Exception
@@ -62,7 +64,7 @@ public class SharedValue implements Closeable, SharedValueReader
         }
     };
 
-    private final ConnectionStateListener   connectionStateListener = new ConnectionStateListener()
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
     {
         @Override
         public void stateChanged(CuratorFramework client, ConnectionState newState)
@@ -78,12 +80,9 @@ public class SharedValue implements Closeable, SharedValueReader
         CLOSED
     }
 
-    private volatile byte[]     value;
-    private volatile Stat       stat = new Stat();
-
     /**
-     * @param client the client
-     * @param path the shared path - i.e. where the shared value is stored
+     * @param client    the client
+     * @param path      the shared path - i.e. where the shared value is stored
      * @param seedValue the initial value for the value if/f the path has not yet been created
      */
     public SharedValue(CuratorFramework client, String path, byte[] seedValue)
@@ -91,13 +90,21 @@ public class SharedValue implements Closeable, SharedValueReader
         this.client = client;
         this.path = PathUtils.validatePath(path);
         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
-        value = seedValue;
+        currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(0,
Arrays.copyOf(seedValue, seedValue.length)));
     }
 
     @Override
     public byte[] getValue()
     {
-        return Arrays.copyOf(value, value.length);
+        VersionedValue<byte[]> localCopy = currentValue.get();
+        return Arrays.copyOf(localCopy.getValue(), localCopy.getValue().length);
+    }
+
+    @Override
+    public VersionedValue<byte[]> getVersionedValue()
+    {
+        VersionedValue<byte[]> localCopy = currentValue.get();
+        return new VersionedValue<byte[]>(localCopy.getVersion(), Arrays.copyOf(localCopy.getValue(),
localCopy.getValue().length));
     }
 
     /**
@@ -110,9 +117,10 @@ public class SharedValue implements Closeable, SharedValueReader
     {
         Preconditions.checkState(state.get() == State.STARTED, "not started");
 
+        VersionedValue<byte[]> localCopy = currentValue.get();
         client.setData().forPath(path, newValue);
-        stat.setVersion(stat.getVersion() + 1);
-        value = Arrays.copyOf(newValue, newValue.length);
+
+        currentValue.set(new VersionedValue<byte[]>(localCopy.getVersion() + 1, Arrays.copyOf(newValue,
newValue.length)));
     }
 
     /**
@@ -132,9 +140,39 @@ public class SharedValue implements Closeable, SharedValueReader
 
         try
         {
-            client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
-            stat.setVersion(stat.getVersion() + 1);
-            value = Arrays.copyOf(newValue, newValue.length);
+            VersionedValue<byte[]> localCopy = currentValue.get();
+            client.setData().withVersion(localCopy.getVersion()).forPath(path, newValue);
+            currentValue.set(new VersionedValue<byte[]>(localCopy.getVersion() + 1,
Arrays.copyOf(newValue, newValue.length)));
+            return true;
+        }
+        catch ( KeeperException.BadVersionException ignore )
+        {
+            // ignore
+        }
+
+        readValue();
+        return false;
+    }
+
+    /**
+     * Changes the shared value only if its value has not changed since the version specified
by
+     * newValue. If the value has changed, the value is not set and this client's view of
the
+     * value is updated. i.e. if the value is not successful you can get the updated value
+     * by calling {@link #getValue()}.
+     *
+     * @param newValue the new value to attempt
+     * @return true if the change attempt was successful, false if not. If the change
+     * was not successful, {@link #getValue()} will return the updated value
+     * @throws Exception ZK errors, interruptions, etc.
+     */
+    public boolean trySetValue(VersionedValue<byte[]> newValue) throws Exception
+    {
+        Preconditions.checkState(state.get() == State.STARTED, "not started");
+
+        try
+        {
+            client.setData().withVersion(newValue.getVersion()).forPath(path, newValue.getValue());
+            currentValue.set(new VersionedValue<byte[]>(newValue.getVersion() + 1,
Arrays.copyOf(newValue.getValue(), newValue.getValue().length)));
             return true;
         }
         catch ( KeeperException.BadVersionException ignore )
@@ -162,7 +200,7 @@ public class SharedValue implements Closeable, SharedValueReader
      *
      * @throws Exception ZK errors, interruptions, etc.
      */
-    public void     start() throws Exception
+    public void start() throws Exception
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot
be started more than once");
 
@@ -189,48 +227,48 @@ public class SharedValue implements Closeable, SharedValueReader
 
     private synchronized void readValue() throws Exception
     {
-        Stat    localStat = new Stat();
-        byte[]  bytes = client.getData().storingStatIn(localStat).usingWatcher(watcher).forPath(path);
-        stat = localStat;
-        value = bytes;
+        Stat localStat = new Stat();
+        byte[] bytes = client.getData().storingStatIn(localStat).usingWatcher(watcher).forPath(path);
+        currentValue.set(new VersionedValue<byte[]>(localStat.getVersion(), Arrays.copyOf(bytes,
bytes.length)));
     }
 
     private void notifyListeners()
     {
+        final byte[] localValue = getValue();
         listeners.forEach
-        (
-            new Function<SharedValueListener, Void>()
-            {
-                @Override
-                public Void apply(SharedValueListener listener)
+            (
+                new Function<SharedValueListener, Void>()
                 {
-                    try
-                    {
-                        listener.valueHasChanged(SharedValue.this, value);
-                    }
-                    catch ( Exception e )
+                    @Override
+                    public Void apply(SharedValueListener listener)
                     {
-                        log.error("From SharedValue listener", e);
+                        try
+                        {
+                            listener.valueHasChanged(SharedValue.this, localValue);
+                        }
+                        catch ( Exception e )
+                        {
+                            log.error("From SharedValue listener", e);
+                        }
+                        return null;
                     }
-                    return null;
                 }
-            }
-        );
+            );
     }
 
     private void notifyListenerOfStateChanged(final ConnectionState newState)
     {
         listeners.forEach
-        (
-            new Function<SharedValueListener, Void>()
-            {
-                @Override
-                public Void apply(SharedValueListener listener)
+            (
+                new Function<SharedValueListener, Void>()
                 {
-                    listener.stateChanged(client, newState);
-                    return null;
+                    @Override
+                    public Void apply(SharedValueListener listener)
+                    {
+                        listener.stateChanged(client, newState);
+                        return null;
+                    }
                 }
-            }
-        );
+            );
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/0c2a5a5b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java
index 93ed99c..e298cca 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java
@@ -33,6 +33,13 @@ public interface SharedValueReader
     public byte[]   getValue();
 
     /**
+     * Return the current version and value
+     *
+     * @return version/value
+     */
+    public VersionedValue<byte[]> getVersionedValue();
+
+    /**
      * Returns the listenable
      *
      * @return listenable

http://git-wip-us.apache.org/repos/asf/curator/blob/0c2a5a5b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
new file mode 100644
index 0000000..ef4c9cc
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
@@ -0,0 +1,32 @@
+package org.apache.curator.framework.recipes.shared;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * POJO for a version and a value
+ */
+public class VersionedValue<T>
+{
+    private final int version;
+    private final T value;
+
+    /**
+     * @param version the version
+     * @param value the value (cannot be null)
+     */
+    public VersionedValue(int version, T value)
+    {
+        this.version = version;
+        this.value = Preconditions.checkNotNull(value, "value cannot be null");
+    }
+
+    public int getVersion()
+    {
+        return version;
+    }
+
+    public T getValue()
+    {
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0c2a5a5b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 08f7263..9fdf20f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -16,16 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.shared;
 
 import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.List;
@@ -42,85 +44,88 @@ import java.util.concurrent.TimeUnit;
 public class TestSharedCount extends BaseClassForTests
 {
     @Test
-    public void     testMultiClients() throws Exception
+    public void testMultiClients() throws Exception
     {
-        final int           CLIENT_QTY = 5;
+        final int CLIENT_QTY = 5;
 
-        List<Future<List<Integer>>>     futures = Lists.newArrayList();
-        final List<CuratorFramework>    clients = new CopyOnWriteArrayList<CuratorFramework>();
+        List<Future<List<Integer>>> futures = Lists.newArrayList();
+        final List<CuratorFramework> clients = new CopyOnWriteArrayList<CuratorFramework>();
+        final List<SharedCount> counts = new CopyOnWriteArrayList<SharedCount>();
         try
         {
-            final CountDownLatch    startLatch = new CountDownLatch(CLIENT_QTY);
-            final Semaphore         semaphore = new Semaphore(0);
-            ExecutorService         service = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("Test-%d").build());
+            final CountDownLatch startLatch = new CountDownLatch(CLIENT_QTY);
+            final Semaphore semaphore = new Semaphore(0);
+            ExecutorService service = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("Test-%d").build());
             for ( int i = 0; i < CLIENT_QTY; ++i )
             {
                 Future<List<Integer>> future = service.submit
-                (
-                    new Callable<List<Integer>>()
-                    {
-                        @Override
-                        public List<Integer> call() throws Exception
+                    (
+                        new Callable<List<Integer>>()
                         {
-                            final List<Integer> countList = Lists.newArrayList();
-                            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
-                            clients.add(client);
-                            client.start();
-
-                            SharedCount count = new SharedCount(client, "/count", 10);
-
-                            final CountDownLatch        latch = new CountDownLatch(1);
-                            count.addListener
-                            (
-                                new SharedCountListener()
-                                {
-                                    @Override
-                                    public void countHasChanged(SharedCountReader sharedCount,
int newCount) throws Exception
-                                    {
-                                        if ( newCount < 0 )
-                                        {
-                                            latch.countDown();
-                                        }
-                                        else
+                            @Override
+                            public List<Integer> call() throws Exception
+                            {
+                                final List<Integer> countList = Lists.newArrayList();
+                                CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+                                clients.add(client);
+                                client.start();
+
+                                SharedCount count = new SharedCount(client, "/count", 10);
+                                counts.add(count);
+
+                                final CountDownLatch latch = new CountDownLatch(1);
+                                count.addListener
+                                    (
+                                        new SharedCountListener()
                                         {
-                                            countList.add(newCount);
-                                        }
+                                            @Override
+                                            public void countHasChanged(SharedCountReader
sharedCount, int newCount) throws Exception
+                                            {
+                                                if ( newCount < 0 )
+                                                {
+                                                    latch.countDown();
+                                                }
+                                                else
+                                                {
+                                                    countList.add(newCount);
+                                                }
+
+                                                semaphore.release();
+                                            }
 
-                                        semaphore.release();
-                                    }
-
-                                    @Override
-                                    public void stateChanged(CuratorFramework client, ConnectionState
newState)
-                                    {
-                                    }
-                                }
-                            );
-                            count.start();
-                            startLatch.countDown();
-                            latch.await();
-                            return countList;
+                                            @Override
+                                            public void stateChanged(CuratorFramework client,
ConnectionState newState)
+                                            {
+                                            }
+                                        }
+                                    );
+                                count.start();
+                                startLatch.countDown();
+                                latch.await();
+                                return countList;
+                            }
                         }
-                    }
-                );
+                    );
                 futures.add(future);
             }
 
             CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
             clients.add(client);
             client.start();
-            
+
             Assert.assertTrue(startLatch.await(10, TimeUnit.SECONDS));
 
             SharedCount count = new SharedCount(client, "/count", 10);
+            counts.add(count);
             count.start();
 
             List<Integer> countList = Lists.newArrayList();
-            Random        random = new Random();
+            Random random = new Random();
             for ( int i = 0; i < 100; ++i )
             {
                 Thread.sleep(random.nextInt(10));
 
-                int     next = random.nextInt(100);
+                int next = random.nextInt(100);
                 countList.add(next);
                 count.setCount(next);
 
@@ -130,12 +135,16 @@ public class TestSharedCount extends BaseClassForTests
 
             for ( Future<List<Integer>> future : futures )
             {
-                List<Integer>       thisCountList = future.get();
+                List<Integer> thisCountList = future.get();
                 Assert.assertEquals(thisCountList, countList);
             }
         }
         finally
         {
+            for ( SharedCount count : counts )
+            {
+                CloseableUtils.closeQuietly(count);
+            }
             for ( CuratorFramework client : clients )
             {
                 CloseableUtils.closeQuietly(client);
@@ -144,13 +153,13 @@ public class TestSharedCount extends BaseClassForTests
     }
 
     @Test
-    public void     testSimple() throws Exception
+    public void testSimple() throws Exception
     {
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
-        client.start();
+        SharedCount count = new SharedCount(client, "/count", 0);
         try
         {
-            SharedCount         count = new SharedCount(client, "/count", 0);
+            client.start();
             count.start();
 
             Assert.assertTrue(count.trySetCount(1));
@@ -160,7 +169,75 @@ public class TestSharedCount extends BaseClassForTests
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(count);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testSimpleVersioned() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+        SharedCount count = new SharedCount(client, "/count", 0);
+        client.start();
+        try
+        {
+            count.start();
+
+            Assert.assertTrue(count.trySetCount(new VersionedValue<Integer>(0, 1)));
+            Assert.assertTrue(count.trySetCount(new VersionedValue<Integer>(1, 5)));
+            Assert.assertTrue(count.trySetCount(new VersionedValue<Integer>(2, 10)));
+            Assert.assertEquals(count.getCount(), 10);
+            Assert.assertFalse(count.trySetCount(new VersionedValue<Integer>(10, 20)));
+
+            VersionedValue<Integer> versionedValue = count.getVersionedValue();
+            Assert.assertTrue(count.trySetCount(new VersionedValue<Integer>(versionedValue.getVersion(),
100)));
+            versionedValue = count.getVersionedValue();
+            client.setData().forPath("/count", SharedCount.toBytes(88));
+            Assert.assertFalse(count.trySetCount(new VersionedValue<Integer>(versionedValue.getVersion(),
234)));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(count);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testMultiClientVersioned() throws Exception
+    {
+        Timing timing = new Timing();
+        CuratorFramework client1 = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+        CuratorFramework client2 = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+        SharedCount count1 = new SharedCount(client1, "/count", 0);
+        SharedCount count2 = new SharedCount(client2, "/count", 0);
+        try
+        {
+            client1.start();
+            client2.start();
+            count1.start();
+            count2.start();
+
+            VersionedValue<Integer> versionedValue = count1.getVersionedValue();
+            Assert.assertTrue(count1.trySetCount(new VersionedValue<Integer>(versionedValue.getVersion(),
10)));
+            timing.sleepABit();
+            versionedValue = count2.getVersionedValue();
+            Assert.assertTrue(count2.trySetCount(new VersionedValue<Integer>(versionedValue.getVersion(),
20)));
+            timing.sleepABit();
+
+            VersionedValue<Integer> versionedValue1 = count1.getVersionedValue();
+            VersionedValue<Integer> versionedValue2 = count2.getVersionedValue();
+            Assert.assertTrue(count2.trySetCount(new VersionedValue<Integer>(versionedValue2.getVersion(),
30)));
+            Assert.assertFalse(count1.trySetCount(new VersionedValue<Integer>(versionedValue1.getVersion(),
40)));
+            versionedValue1 = count1.getVersionedValue();
+            Assert.assertTrue(count1.trySetCount(new VersionedValue<Integer>(versionedValue1.getVersion(),
40)));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(count2);
+            CloseableUtils.closeQuietly(count1);
+            CloseableUtils.closeQuietly(client2);
+            CloseableUtils.closeQuietly(client1);
         }
     }
 }


Mime
View raw message