Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F0B8D17A98 for ; Thu, 16 Oct 2014 20:54:52 +0000 (UTC) Received: (qmail 38940 invoked by uid 500); 16 Oct 2014 20:54:52 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 38905 invoked by uid 500); 16 Oct 2014 20:54:52 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 38892 invoked by uid 99); 16 Oct 2014 20:54:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Oct 2014 20:54:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 89EC69D47C7; Thu, 16 Oct 2014 20:54:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dragonsinth@apache.org To: commits@curator.apache.org Date: Thu, 16 Oct 2014 20:54:52 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: CURATOR-151: SharedValue/SharedCount API update Repository: curator Updated Branches: refs/heads/master 861ba15b7 -> ac2b903d9 CURATOR-151: SharedValue/SharedCount API update Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0bc73191 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0bc73191 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0bc73191 Branch: refs/heads/master Commit: 0bc73191ba0fce5fb28894f8e4124d9d42655b37 Parents: 86299ed Author: Scott Blum Authored: Fri Oct 3 22:04:36 2014 -0400 Committer: Scott Blum Committed: Fri Oct 3 22:04:36 2014 -0400 ---------------------------------------------------------------------- .../framework/recipes/shared/SharedCount.java | 11 +++- .../framework/recipes/shared/SharedValue.java | 63 ++++++++++++-------- .../recipes/shared/VersionedValue.java | 2 +- .../recipes/shared/TestSharedCount.java | 42 +++++++++---- 4 files changed, 76 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/0bc73191/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java index 49b2f2d..87fffdd 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java @@ -79,11 +79,16 @@ public class SharedCount implements Closeable, SharedCountReader, Listenable newCount) throws Exception + public boolean trySetCount(VersionedValue previous, int newCount) throws Exception { - VersionedValue copy = new VersionedValue(newCount.getVersion(), toBytes(newCount.getValue())); - return sharedValue.trySetValue(copy); + VersionedValue previousCopy = new VersionedValue(previous.getVersion(), toBytes(previous.getValue())); + return sharedValue.trySetValue(previousCopy, toBytes(newCount)); } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/0bc73191/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java index 80fa53f..6ca53ec 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java @@ -117,10 +117,8 @@ public class SharedValue implements Closeable, SharedValueReader { Preconditions.checkState(state.get() == State.STARTED, "not started"); - VersionedValue localCopy = currentValue.get(); - client.setData().forPath(path, newValue); - - currentValue.set(new VersionedValue(localCopy.getVersion() + 1, Arrays.copyOf(newValue, newValue.length))); + Stat result = client.setData().forPath(path, newValue); + updateValue(result.getVersion(), Arrays.copyOf(newValue, newValue.length)); } /** @@ -129,29 +127,19 @@ public class SharedValue implements Closeable, SharedValueReader * value is updated. i.e. if the value is not successful you can get the updated value * by calling {@link #getValue()}. * + * @deprecated use {@link #trySetValue(VersionedValue, byte[])} for stronger atomicity + * guarantees. Even if this object's internal state is up-to-date, the caller has no way to + * ensure that they've read the most recently seen value. + * * @param newValue the new value to attempt * @return true if the change attempt was successful, false if not. If the change * was not successful, {@link #getValue()} will return the updated value * @throws Exception ZK errors, interruptions, etc. */ + @Deprecated public boolean trySetValue(byte[] newValue) throws Exception { - Preconditions.checkState(state.get() == State.STARTED, "not started"); - - try - { - VersionedValue localCopy = currentValue.get(); - client.setData().withVersion(localCopy.getVersion()).forPath(path, newValue); - currentValue.set(new VersionedValue(localCopy.getVersion() + 1, Arrays.copyOf(newValue, newValue.length))); - return true; - } - catch ( KeeperException.BadVersionException ignore ) - { - // ignore - } - - readValue(); - return false; + return trySetValue(currentValue.get(), newValue); } /** @@ -165,14 +153,20 @@ public class SharedValue implements Closeable, SharedValueReader * was not successful, {@link #getValue()} will return the updated value * @throws Exception ZK errors, interruptions, etc. */ - public boolean trySetValue(VersionedValue newValue) throws Exception + public boolean trySetValue(VersionedValue previous, byte[] newValue) throws Exception { Preconditions.checkState(state.get() == State.STARTED, "not started"); + VersionedValue current = currentValue.get(); + if ( previous.getVersion() != current.getVersion() || !Arrays.equals(previous.getValue(), current.getValue()) ) + { + return false; + } + try { - client.setData().withVersion(newValue.getVersion()).forPath(path, newValue.getValue()); - currentValue.set(new VersionedValue(newValue.getVersion() + 1, Arrays.copyOf(newValue.getValue(), newValue.getValue().length))); + Stat result = client.setData().withVersion(previous.getVersion()).forPath(path, newValue); + updateValue(result.getVersion(), Arrays.copyOf(newValue, newValue.length)); return true; } catch ( KeeperException.BadVersionException ignore ) @@ -184,6 +178,25 @@ public class SharedValue implements Closeable, SharedValueReader return false; } + private void updateValue(int version, byte[] bytes) + { + while (true) + { + VersionedValue current = currentValue.get(); + if (current.getVersion() >= version) + { + // A newer version was concurrently set. + return; + } + if ( currentValue.compareAndSet(current, new VersionedValue(version, bytes)) ) + { + // Successfully set. + return; + } + // Lost a race, retry. + } + } + /** * Returns the listenable * @@ -225,11 +238,11 @@ public class SharedValue implements Closeable, SharedValueReader listeners.clear(); } - private synchronized void readValue() throws Exception + private void readValue() throws Exception { Stat localStat = new Stat(); byte[] bytes = client.getData().storingStatIn(localStat).usingWatcher(watcher).forPath(path); - currentValue.set(new VersionedValue(localStat.getVersion(), Arrays.copyOf(bytes, bytes.length))); + updateValue(localStat.getVersion(), bytes); } private void notifyListeners() http://git-wip-us.apache.org/repos/asf/curator/blob/0bc73191/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java index beb38a5..a3c15ab 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java @@ -32,7 +32,7 @@ public class VersionedValue * @param version the version * @param value the value (cannot be null) */ - public VersionedValue(int version, T value) + VersionedValue(int version, T value) { this.version = version; this.value = Preconditions.checkNotNull(value, "value cannot be null"); http://git-wip-us.apache.org/repos/asf/curator/blob/0bc73191/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java index 9fdf20f..659154a 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java @@ -184,17 +184,33 @@ public class TestSharedCount extends BaseClassForTests { count.start(); - Assert.assertTrue(count.trySetCount(new VersionedValue(0, 1))); - Assert.assertTrue(count.trySetCount(new VersionedValue(1, 5))); - Assert.assertTrue(count.trySetCount(new VersionedValue(2, 10))); + VersionedValue current = count.getVersionedValue(); + Assert.assertEquals(current.getVersion(), 0); + + Assert.assertTrue(count.trySetCount(current, 1)); + current = count.getVersionedValue(); + Assert.assertEquals(current.getVersion(), 1); + Assert.assertEquals(count.getCount(), 1); + + Assert.assertTrue(count.trySetCount(current, 5)); + current = count.getVersionedValue(); + Assert.assertEquals(current.getVersion(), 2); + Assert.assertEquals(count.getCount(), 5); + + Assert.assertTrue(count.trySetCount(current, 10)); + + current = count.getVersionedValue(); + Assert.assertEquals(current.getVersion(), 3); Assert.assertEquals(count.getCount(), 10); - Assert.assertFalse(count.trySetCount(new VersionedValue(10, 20))); - VersionedValue versionedValue = count.getVersionedValue(); - Assert.assertTrue(count.trySetCount(new VersionedValue(versionedValue.getVersion(), 100))); - versionedValue = count.getVersionedValue(); + // Wrong value + Assert.assertFalse(count.trySetCount(new VersionedValue(3, 20), 7)); + // Wrong version + Assert.assertFalse(count.trySetCount(new VersionedValue(10, 10), 7)); + + // Server changed client.setData().forPath("/count", SharedCount.toBytes(88)); - Assert.assertFalse(count.trySetCount(new VersionedValue(versionedValue.getVersion(), 234))); + Assert.assertFalse(count.trySetCount(current, 234)); } finally { @@ -219,18 +235,18 @@ public class TestSharedCount extends BaseClassForTests count2.start(); VersionedValue versionedValue = count1.getVersionedValue(); - Assert.assertTrue(count1.trySetCount(new VersionedValue(versionedValue.getVersion(), 10))); + Assert.assertTrue(count1.trySetCount(versionedValue, 10)); timing.sleepABit(); versionedValue = count2.getVersionedValue(); - Assert.assertTrue(count2.trySetCount(new VersionedValue(versionedValue.getVersion(), 20))); + Assert.assertTrue(count2.trySetCount(versionedValue, 20)); timing.sleepABit(); VersionedValue versionedValue1 = count1.getVersionedValue(); VersionedValue versionedValue2 = count2.getVersionedValue(); - Assert.assertTrue(count2.trySetCount(new VersionedValue(versionedValue2.getVersion(), 30))); - Assert.assertFalse(count1.trySetCount(new VersionedValue(versionedValue1.getVersion(), 40))); + Assert.assertTrue(count2.trySetCount(versionedValue2, 30)); + Assert.assertFalse(count1.trySetCount(versionedValue1, 40)); versionedValue1 = count1.getVersionedValue(); - Assert.assertTrue(count1.trySetCount(new VersionedValue(versionedValue1.getVersion(), 40))); + Assert.assertTrue(count1.trySetCount(versionedValue1, 40)); } finally {