curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cammcken...@apache.org
Subject git commit: CURATOR-154 - Modified the handling for creating the ephemeral node so that if it already exists, an attempt will be made to set its data to match the data that the PersistentEphemeralNode has cached.
Date Wed, 29 Oct 2014 05:35:56 GMT
Repository: curator
Updated Branches:
  refs/heads/CURATOR-154 [created] 64973b0d9


CURATOR-154 - Modified the handling for creating the ephemeral node so
that if it already exists, an attempt will be made to set its data to
match the data that the PersistentEphemeralNode has cached.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/64973b0d
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/64973b0d
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/64973b0d

Branch: refs/heads/CURATOR-154
Commit: 64973b0d91c0625b00d400227e6b1971233df595
Parents: 1c194b4
Author: Cameron McKenzie <cameron@unico.com.au>
Authored: Wed Oct 29 16:32:35 2014 +1100
Committer: Cameron McKenzie <cameron@unico.com.au>
Committed: Wed Oct 29 16:32:35 2014 +1100

----------------------------------------------------------------------
 .../recipes/nodes/PersistentEphemeralNode.java  | 52 ++++++++++--
 .../nodes/TestPersistentEphemeralNode.java      | 84 +++++++++++++++++++-
 2 files changed, 124 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/64973b0d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index d78573c..41c04f6 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -20,6 +20,7 @@
 package org.apache.curator.framework.recipes.nodes;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
 import org.apache.curator.framework.api.BackgroundCallback;
@@ -31,14 +32,17 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.curator.utils.PathUtils;
 
 /**
@@ -67,7 +71,10 @@ public class PersistentEphemeralNode implements Closeable
         @Override
         public void process(WatchedEvent event)
         {
-            createNode();
+        	if ( event.getType() == EventType.NodeDeleted)
+        	{
+        		createNode();
+        	}
         }
     };
     private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
@@ -81,6 +88,21 @@ public class PersistentEphemeralNode implements Closeable
             }
         }
     };
+    private final BackgroundCallback setDataCallback = new BackgroundCallback() {
+		
+		@Override
+		public void processResult(CuratorFramework client, CuratorEvent event)
+				throws Exception {
+			//If the result is ok then initialisation is complete (if we're still initialising)
+			//Don't retry on other errors as the only recoverable cases will be connection loss
+			//and the node not existing, both of which are already handled by other watches.
+			if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+			{
+				//Update is ok, mark initialisation as complete if required.
+				initialisationComplete();
+			}
+		}
+	};	
     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
     {
         @Override
@@ -188,12 +210,12 @@ public class PersistentEphemeralNode implements Closeable
      * @param basePath the base path for the node
      * @param data     data for the node
      */
-    public PersistentEphemeralNode(CuratorFramework client, Mode mode, String basePath, byte[]
data)
+    public PersistentEphemeralNode(CuratorFramework client, Mode mode, String basePath, byte[]
initData)
     {
         this.client = Preconditions.checkNotNull(client, "client cannot be null");
         this.basePath = PathUtils.validatePath(basePath);
         this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
-        data = Preconditions.checkNotNull(data, "data cannot be null");
+        final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
 
         backgroundCallback = new BackgroundCallback()
         {
@@ -201,9 +223,11 @@ public class PersistentEphemeralNode implements Closeable
             public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception
             {
                 String path = null;
+                boolean nodeExists = false;
                 if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue()
)
-                {
-                    path = event.getPath();
+                {                	
+                	path = event.getPath();
+                	nodeExists = true;
                 }
                 else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                 {
@@ -214,10 +238,13 @@ public class PersistentEphemeralNode implements Closeable
                     nodePath.set(path);
                     watchNode();
 
-                    CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
-                    if ( localLatch != null )
+                    if(nodeExists)
                     {
-                        localLatch.countDown();
+                    	client.setData().inBackground(setDataCallback).forPath(getActualPath(),
data);
+                    }
+                    else
+                    {
+                    	initialisationComplete();
                     }
                 }
                 else
@@ -230,6 +257,15 @@ public class PersistentEphemeralNode implements Closeable
         createMethod = mode.isProtected() ? client.create().creatingParentsIfNeeded().withProtection()
: client.create().creatingParentsIfNeeded();
         this.data.set(Arrays.copyOf(data, data.length));
     }
+    
+    private void initialisationComplete()
+    {
+        CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
+        if ( localLatch != null )
+        {
+            localLatch.countDown();
+        }
+    }
 
     /**
      * You must call start() to initiate the persistent ephemeral node. An attempt to create
the node

http://git-wip-us.apache.org/repos/asf/curator/blob/64973b0d/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 47ae757..31e7ef2 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.nodes;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.state.ConnectionState;
@@ -31,12 +32,15 @@ import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -75,7 +79,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
     @Test
     public void testListenersReconnectedIsFast() throws Exception
     {
-        server.close();
+        server.stop();
 
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
         try
@@ -103,13 +107,13 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             };
             client.getConnectionStateListenable().addListener(listener);
             timing.sleepABit();
-            server = new TestingServer(server.getPort());
+            server.restart();
             Assert.assertTrue(timing.awaitLatch(connectedLatch));
             timing.sleepABit();
             Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS));
-            server.close();
+            server.stop();
             timing.sleepABit();
-            server = new TestingServer(server.getPort());
+            server.restart();
             timing.sleepABit();
             Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
         }
@@ -459,6 +463,78 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             node.close();
         }
     }
+    
+    /**
+     * Test that if a persistent ephemeral node is created and the node already exists
+     * that if data is present in the PersistentEphermalNode that it is still set. 
+     * @throws Exception
+     */
+    @Test
+    public void testSetDataWhenNodeExists() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+        curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH,
"InitialData".getBytes());
+        
+        byte[] data = "Hello World".getBytes();
+             
+        PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL,
PATH, data);
+        node.start();
+        try
+        {
+            node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+            assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), data));
+        }
+        finally
+        {
+            node.close();
+        }
+    }
+    
+    @Test
+    public void testSetDataWhenDisconnected() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+        
+        byte[] initialData = "Hello World".getBytes();
+        byte[] updatedData = "Updated".getBytes();
+             
+        PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL,
PATH, initialData);
+        node.start();
+        try
+        {
+            node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+            assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), initialData));
+            
+            server.stop();
+            
+            final CountDownLatch dataUpdateLatch = new CountDownLatch(1);
+            
+            Watcher watcher = new Watcher()
+            {
+				@Override
+				public void process(WatchedEvent event)
+				{
+					if ( event.getType() == EventType.NodeDataChanged )
+					{
+						dataUpdateLatch.countDown();
+					}
+				}            	
+            };
+            
+            curator.getData().usingWatcher(watcher).inBackground().forPath(node.getActualPath());
+            
+            node.setData(updatedData);
+            server.restart();
+
+            assertTrue(timing.awaitLatch(dataUpdateLatch));
+                       
+            assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), updatedData));
+        }
+        finally
+        {
+            node.close();
+        }    	
+    }
 
     private void assertNodeExists(CuratorFramework curator, String path) throws Exception
     {


Mime
View raw message