curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject git commit: ensurePath might throw an exception if connection couldn't succeed. The exception was being swallowed by createNode causing the recipe to fail if there wasn't a good connection when the object was started. Switched to use creatingParentsIfNee
Date Fri, 06 Sep 2013 17:50:01 GMT
Updated Branches:
  refs/heads/master 86e442a1d -> fd4cd433e


ensurePath might throw an exception if connection couldn't succeed. The exception was being
swallowed by
createNode causing the recipe to fail if there wasn't a good connection when the object was
started. Switched
to use creatingParentsIfNeeded. It's more efficient anyway. Also, fixed swallowed exceptions.

TODO - check other recipes' uses of ensurePath for similar problem.


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/fd4cd433
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/fd4cd433
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/fd4cd433

Branch: refs/heads/master
Commit: fd4cd433eec2ea302fa5ca0c1f362f94ceb53bf7
Parents: 86e442a
Author: randgalt <randgalt@apache.org>
Authored: Fri Sep 6 10:48:11 2013 -0700
Committer: randgalt <randgalt@apache.org>
Committed: Fri Sep 6 10:48:11 2013 -0700

----------------------------------------------------------------------
 .../recipes/nodes/PersistentEphemeralNode.java  | 182 +++++++++----------
 .../nodes/TestPersistentEphemeralNode.java      |  47 +++++
 2 files changed, 138 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/fd4cd433/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index 9366492..b178a00 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.nodes;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
@@ -28,8 +28,6 @@ import org.apache.curator.framework.api.CreateModable;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.utils.EnsurePath;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -37,6 +35,7 @@ import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -45,31 +44,28 @@ import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * <p>
- *     A persistent ephemeral node is an ephemeral node that attempts to stay present in
- *     ZooKeeper, even through connection and session interruptions.
+ * A persistent ephemeral node is an ephemeral node that attempts to stay present in
+ * ZooKeeper, even through connection and session interruptions.
  * </p>
- *
+ * <p/>
  * <p>
- *     Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
+ * Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
  * </p>
  */
 public class PersistentEphemeralNode implements Closeable
 {
-    @VisibleForTesting
-    volatile CountDownLatch         initialCreateLatch = new CountDownLatch(1);
-
-    private final Logger                    log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework          client;
-    private final EnsurePath                ensurePath;
-    private final CreateModable<ACLBackgroundPathAndBytesable<String>>  createMethod;
-    private final AtomicReference<String>   nodePath = new AtomicReference<String>(null);
-    private final String                    basePath;
-    private final Mode                      mode;
-    private final AtomicReference<byte[]>   data = new AtomicReference<byte[]>();
-    private final AtomicReference<State>    state = new AtomicReference<State>(State.LATENT);
-    private final AtomicBoolean             isSuspended = new AtomicBoolean(false);
-    private final BackgroundCallback        backgroundCallback;
-    private final Watcher                   watcher = new Watcher()
+    private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new
CountDownLatch(1));
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorFramework client;
+    private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
+    private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
+    private final String basePath;
+    private final Mode mode;
+    private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final AtomicBoolean isSuspended = new AtomicBoolean(false);
+    private final BackgroundCallback backgroundCallback;
+    private final Watcher watcher = new Watcher()
     {
         @Override
         public void process(WatchedEvent event)
@@ -80,7 +76,7 @@ public class PersistentEphemeralNode implements Closeable
             }
         }
     };
-    private final ConnectionStateListener   listener = new ConnectionStateListener()
+    private final ConnectionStateListener listener = new ConnectionStateListener()
     {
         @Override
         public void stateChanged(CuratorFramework client, ConnectionState newState)
@@ -92,7 +88,7 @@ public class PersistentEphemeralNode implements Closeable
             }
         }
     };
-    private final BackgroundCallback        checkExistsCallback = new BackgroundCallback()
+    private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
     {
         @Override
         public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
@@ -120,75 +116,73 @@ public class PersistentEphemeralNode implements Closeable
          * Same as {@link CreateMode#EPHEMERAL}
          */
         EPHEMERAL()
-        {
-            @Override
-            protected CreateMode getCreateMode(boolean pathIsSet)
             {
-                return CreateMode.EPHEMERAL;
-            }
+                @Override
+                protected CreateMode getCreateMode(boolean pathIsSet)
+                {
+                    return CreateMode.EPHEMERAL;
+                }
 
-            @Override
-            protected boolean isProtected()
-            {
-                return false;
-            }
-        },
+                @Override
+                protected boolean isProtected()
+                {
+                    return false;
+                }
+            },
 
         /**
          * Same as {@link CreateMode#EPHEMERAL_SEQUENTIAL}
          */
         EPHEMERAL_SEQUENTIAL()
-        {
-            @Override
-            protected CreateMode getCreateMode(boolean pathIsSet)
             {
-                return pathIsSet ? CreateMode.EPHEMERAL : CreateMode.EPHEMERAL_SEQUENTIAL;
-            }
+                @Override
+                protected CreateMode getCreateMode(boolean pathIsSet)
+                {
+                    return pathIsSet ? CreateMode.EPHEMERAL : CreateMode.EPHEMERAL_SEQUENTIAL;
+                }
 
-            @Override
-            protected boolean isProtected()
-            {
-                return false;
-            }
-        },
+                @Override
+                protected boolean isProtected()
+                {
+                    return false;
+                }
+            },
 
         /**
          * Same as {@link CreateMode#EPHEMERAL} with protection
          */
         PROTECTED_EPHEMERAL()
-        {
-            @Override
-            protected CreateMode getCreateMode(boolean pathIsSet)
             {
-                return CreateMode.EPHEMERAL;
-            }
+                @Override
+                protected CreateMode getCreateMode(boolean pathIsSet)
+                {
+                    return CreateMode.EPHEMERAL;
+                }
 
-            @Override
-            protected boolean isProtected()
-            {
-                return true;
-            }
-        },
+                @Override
+                protected boolean isProtected()
+                {
+                    return true;
+                }
+            },
 
         /**
          * Same as {@link CreateMode#EPHEMERAL_SEQUENTIAL} with protection
          */
         PROTECTED_EPHEMERAL_SEQUENTIAL()
-        {
-            @Override
-            protected CreateMode getCreateMode(boolean pathIsSet)
-            {
-                return pathIsSet ? CreateMode.EPHEMERAL : CreateMode.EPHEMERAL_SEQUENTIAL;
-            }
-
-            @Override
-            protected boolean isProtected()
             {
-                return true;
-            }
-        }
+                @Override
+                protected CreateMode getCreateMode(boolean pathIsSet)
+                {
+                    return pathIsSet ? CreateMode.EPHEMERAL : CreateMode.EPHEMERAL_SEQUENTIAL;
+                }
 
-        ;
+                @Override
+                protected boolean isProtected()
+                {
+                    return true;
+                }
+            };
 
         protected abstract CreateMode getCreateMode(boolean pathIsSet);
 
@@ -196,10 +190,10 @@ public class PersistentEphemeralNode implements Closeable
     }
 
     /**
-     * @param client client instance
-     * @param mode creation/protection mode
+     * @param client   client instance
+     * @param mode     creation/protection mode
      * @param basePath the base path for the node
-     * @param data data for the node
+     * @param data     data for the node
      */
     public PersistentEphemeralNode(CuratorFramework client, Mode mode, String basePath, byte[]
data)
     {
@@ -208,15 +202,12 @@ public class PersistentEphemeralNode implements Closeable
         this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
         data = Preconditions.checkNotNull(data, "data cannot be null");
 
-        String parentDir = ZKPaths.getPathAndNode(basePath).getPath();
-        ensurePath = client.newNamespaceAwareEnsurePath(parentDir);
-
         backgroundCallback = new BackgroundCallback()
         {
             @Override
             public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception
             {
-                String      path = null;
+                String path = null;
                 if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue()
)
                 {
                     path = event.getPath();
@@ -230,8 +221,7 @@ public class PersistentEphemeralNode implements Closeable
                     nodePath.set(path);
                     watchNode();
 
-                    CountDownLatch localLatch = initialCreateLatch;
-                    initialCreateLatch = null;
+                    CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
                     if ( localLatch != null )
                     {
                         localLatch.countDown();
@@ -244,7 +234,7 @@ public class PersistentEphemeralNode implements Closeable
             }
         };
 
-        createMethod = mode.isProtected() ? client.create().withProtection() : client.create();
+        createMethod = mode.isProtected() ? client.create().creatingParentsIfNeeded().withProtection()
: client.create().creatingParentsIfNeeded();
         this.data.set(Arrays.copyOf(data, data.length));
     }
 
@@ -265,7 +255,7 @@ public class PersistentEphemeralNode implements Closeable
      * the timeout elapses.
      *
      * @param timeout the maximum time to wait
-     * @param unit time unit
+     * @param unit    time unit
      * @return if the node was created before timeout
      * @throws InterruptedException if the thread is interrupted
      */
@@ -273,11 +263,12 @@ public class PersistentEphemeralNode implements Closeable
     {
         Preconditions.checkState(state.get() == State.STARTED, "Not started");
 
-        return initialCreateLatch.await(timeout, unit);
+        CountDownLatch localLatch = initialCreateLatch.get();
+        return (localLatch == null) || localLatch.await(timeout, unit);
     }
 
     @Override
-    public void close()
+    public void close() throws IOException
     {
         if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
         {
@@ -286,7 +277,14 @@ public class PersistentEphemeralNode implements Closeable
 
         client.getConnectionStateListenable().removeListener(listener);
 
-        deleteNode();
+        try
+        {
+            deleteNode();
+        }
+        catch ( Exception e )
+        {
+            throw new IOException(e);
+        }
     }
 
     /**
@@ -315,9 +313,9 @@ public class PersistentEphemeralNode implements Closeable
         }
     }
 
-    private void deleteNode()
+    private void deleteNode() throws Exception
     {
-        String          localNodePath = nodePath.getAndSet(null);
+        String localNodePath = nodePath.getAndSet(null);
         if ( localNodePath != null )
         {
             try
@@ -331,6 +329,7 @@ public class PersistentEphemeralNode implements Closeable
             catch ( Exception e )
             {
                 log.error("Deleting node: " + localNodePath, e);
+                throw e;
             }
         }
     }
@@ -344,25 +343,25 @@ public class PersistentEphemeralNode implements Closeable
 
         try
         {
-            String      existingPath = nodePath.get();
-            String      createPath = (existingPath != null) ? existingPath : basePath;
-            ensurePath.ensure(client.getZookeeperClient());
+            String existingPath = nodePath.get();
+            String createPath = (existingPath != null) ? existingPath : basePath;
             createMethod.withMode(mode.getCreateMode(existingPath != null)).inBackground(backgroundCallback).forPath(createPath,
data.get());
         }
         catch ( Exception e )
         {
             log.error("Creating node. BasePath: " + basePath, e);
+            throw new RuntimeException(e);  // should never happen unless there's a programming
error - so throw RuntimeException
         }
     }
 
-    private void watchNode()
+    private void watchNode() throws Exception
     {
         if ( !isActive() )
         {
             return;
         }
 
-        String          localNodePath = nodePath.get();
+        String localNodePath = nodePath.get();
         if ( localNodePath != null )
         {
             try
@@ -372,6 +371,7 @@ public class PersistentEphemeralNode implements Closeable
             catch ( Exception e )
             {
                 log.error("Watching node: " + localNodePath, e);
+                throw e;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/fd4cd433/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 865a43d..767afb0 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -20,11 +20,15 @@ package org.apache.curator.framework.recipes.nodes;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.KillSession;
+import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.WatchedEvent;
@@ -66,6 +70,49 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
         super.teardown();
     }
 
+    @Test
+    public void testNoServerAtStart() throws Exception
+    {
+        server.close();
+
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL,
"/abc/node", "hello".getBytes());
+            node.start();
+
+            final CountDownLatch connectedLatch = new CountDownLatch(1);
+            ConnectionStateListener listener = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    if ( newState == ConnectionState.CONNECTED )
+                    {
+                        connectedLatch.countDown();
+                    }
+                }
+            };
+            client.getConnectionStateListenable().addListener(listener);
+
+            timing.sleepABit();
+
+            server = new TestingServer(server.getPort());
+
+            Assert.assertTrue(timing.awaitLatch(connectedLatch));
+
+            timing.sleepABit();
+
+            Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS));
+        }
+        finally
+        {
+            Closeables.closeQuietly(client);
+        }
+    }
+
     @Test(expectedExceptions = NullPointerException.class)
     public void testNullCurator() throws Exception
     {


Mime
View raw message