curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [1/3] curator git commit: Initial error policy with two implementations. Also, applied it to LeaderSelector as a test
Date Mon, 24 Aug 2015 17:32:08 GMT
Repository: curator
Updated Branches:
  refs/heads/CURATOR-248 [created] 94dff8a5a


Initial error policy with two implementations. Also, applied it to LeaderSelector as a test


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/45df7ba7
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/45df7ba7
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/45df7ba7

Branch: refs/heads/CURATOR-248
Commit: 45df7ba71f14a5f9751061a7dff956312bfdd421
Parents: f9af0ce
Author: randgalt <randgalt@apache.org>
Authored: Mon Aug 24 12:24:06 2015 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Mon Aug 24 12:24:06 2015 -0500

----------------------------------------------------------------------
 .../curator/framework/CuratorFramework.java     |  8 ++
 .../framework/CuratorFrameworkFactory.java      | 20 +++++
 .../framework/imps/CuratorFrameworkImpl.java    | 10 +++
 .../curator/framework/state/ErrorPolicy.java    | 18 ++++
 .../framework/state/SessionErrorPolicy.java     | 13 +++
 .../framework/state/StandardErrorPolicy.java    | 14 +++
 .../leader/LeaderSelectorListenerAdapter.java   |  2 +-
 .../recipes/leader/TestLeaderSelector.java      | 90 ++++++++++++++++++++
 8 files changed, 174 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 58c5bf5..d755d28 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -28,6 +28,7 @@ import org.apache.curator.framework.api.transaction.TransactionOp;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.framework.state.ErrorPolicy;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.Watcher;
 
@@ -297,4 +298,11 @@ public interface CuratorFramework extends Closeable
      * @return facade
      */
     public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework();
+
+    /**
+     * Return the configured error policy
+     *
+     * @return error policy
+     */
+    public ErrorPolicy getErrorPolicy();
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index dcb2ee6..aa5181d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -31,6 +31,8 @@ import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.CuratorTempFrameworkImpl;
 import org.apache.curator.framework.imps.DefaultACLProvider;
 import org.apache.curator.framework.imps.GzipCompressionProvider;
+import org.apache.curator.framework.state.ErrorPolicy;
+import org.apache.curator.framework.state.StandardErrorPolicy;
 import org.apache.curator.utils.DefaultZookeeperFactory;
 import org.apache.curator.utils.ZookeeperFactory;
 import org.apache.zookeeper.CreateMode;
@@ -116,6 +118,7 @@ public class CuratorFrameworkFactory
         private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
         private boolean canBeReadOnly = false;
         private boolean useContainerParentsIfAvailable = true;
+        private ErrorPolicy errorPolicy = new StandardErrorPolicy();
 
         /**
          * Apply the current values and build a new CuratorFramework
@@ -343,6 +346,18 @@ public class CuratorFrameworkFactory
             return this;
         }
 
+        /**
+         * Set the error policy to use. The default is {@link StandardErrorPolicy}
+         *
+         * @param errorPolicy new error policy
+         * @return this
+         */
+        public Builder errorPolicy(ErrorPolicy errorPolicy)
+        {
+            this.errorPolicy = errorPolicy;
+            return this;
+        }
+
         public ACLProvider getAclProvider()
         {
             return aclProvider;
@@ -398,6 +413,11 @@ public class CuratorFrameworkFactory
             return useContainerParentsIfAvailable;
         }
 
+        public ErrorPolicy getErrorPolicy()
+        {
+            return errorPolicy;
+        }
+
         @Deprecated
         public String getAuthScheme()
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 41bb7cd..3310daf 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -40,6 +40,7 @@ import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.framework.state.ConnectionStateManager;
+import org.apache.curator.framework.state.ErrorPolicy;
 import org.apache.curator.utils.DebugUtils;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.utils.ThreadUtils;
@@ -83,6 +84,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final NamespaceFacadeCache namespaceFacadeCache;
     private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
     private final boolean useContainerParentsIfAvailable;
+    private final ErrorPolicy errorPolicy;
 
     private volatile ExecutorService executorService;
     private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -124,6 +126,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         aclProvider = builder.getAclProvider();
         state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
         useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
+        errorPolicy = Preconditions.checkNotNull(builder.getErrorPolicy(), "errorPolicy cannot
be null");
 
         byte[] builderDefaultData = builder.getDefaultData();
         defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length)
: new byte[0];
@@ -197,6 +200,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         state = parent.state;
         authInfos = parent.authInfos;
         useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable;
+        errorPolicy = parent.errorPolicy;
     }
 
     @Override
@@ -241,6 +245,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
     }
 
     @Override
+    public ErrorPolicy getErrorPolicy()
+    {
+        return errorPolicy;
+    }
+
+    @Override
     public void start()
     {
         log.info("Starting");

http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java
b/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java
new file mode 100644
index 0000000..0e1bfb5
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java
@@ -0,0 +1,18 @@
+package org.apache.curator.framework.state;
+
+/**
+ * Recipes should use the configured error policy to decide how to handle
+ * errors such as {@link ConnectionState} changes.
+ */
+public interface ErrorPolicy
+{
+    /**
+     * Returns true if the given state should cause the recipe to
+     * act as though the connection has been lost. i.e. locks should
+     * exit, etc.
+     *
+     * @param state the state
+     * @return true/false
+     */
+    boolean isErrorState(ConnectionState state);
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/state/SessionErrorPolicy.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/SessionErrorPolicy.java
b/curator-framework/src/main/java/org/apache/curator/framework/state/SessionErrorPolicy.java
new file mode 100644
index 0000000..3f68fe4
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/SessionErrorPolicy.java
@@ -0,0 +1,13 @@
+package org.apache.curator.framework.state;
+
+/**
+ * This policy treats only {@link ConnectionState#LOST} as an error
+ */
+public class SessionErrorPolicy implements ErrorPolicy
+{
+    @Override
+    public boolean isErrorState(ConnectionState state)
+    {
+        return state == ConnectionState.LOST;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/state/StandardErrorPolicy.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/StandardErrorPolicy.java
b/curator-framework/src/main/java/org/apache/curator/framework/state/StandardErrorPolicy.java
new file mode 100644
index 0000000..ea0c668
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/StandardErrorPolicy.java
@@ -0,0 +1,14 @@
+package org.apache.curator.framework.state;
+
+/**
+ * This policy treats {@link ConnectionState#SUSPENDED} and {@link ConnectionState#LOST}
+ * as errors
+ */
+public class StandardErrorPolicy implements ErrorPolicy
+{
+    @Override
+    public boolean isErrorState(ConnectionState state)
+    {
+        return ((state == ConnectionState.SUSPENDED) || (state == ConnectionState.LOST));
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListenerAdapter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListenerAdapter.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListenerAdapter.java
index 7402fa7..1b0070a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListenerAdapter.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListenerAdapter.java
@@ -30,7 +30,7 @@ public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorLis
     @Override
     public void stateChanged(CuratorFramework client, ConnectionState newState)
     {
-        if ( (newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)
)
+        if ( client.getErrorPolicy().isErrorState(newState) )
         {
             throw new CancelLeadershipException();
         }

http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
index c7f415c..ae19b3c 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
@@ -20,10 +20,13 @@
 package org.apache.curator.framework.recipes.leader;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.framework.state.SessionErrorPolicy;
+import org.apache.curator.framework.state.StandardErrorPolicy;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.KillSession;
@@ -50,6 +53,93 @@ public class TestLeaderSelector extends BaseClassForTests
     private static final String PATH_NAME = "/one/two/me";
 
     @Test
+    public void testErrorPolicies() throws Exception
+    {
+        Timing timing = new Timing();
+        LeaderSelector selector = null;
+        CuratorFramework client = CuratorFrameworkFactory
+            .builder()
+            .connectString(server.getConnectString())
+            .connectionTimeoutMs(timing.connection())
+            .sessionTimeoutMs(timing.session())
+            .retryPolicy(new RetryOneTime(1))
+            .errorPolicy(new StandardErrorPolicy())
+            .build();
+        try
+        {
+            final BlockingQueue<String> changes = Queues.newLinkedBlockingQueue();
+
+            ConnectionStateListener stateListener = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    changes.add(newState.name());
+                }
+            };
+            client.getConnectionStateListenable().addListener(stateListener);
+            client.start();
+            LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()
+            {
+                @Override
+                public void takeLeadership(CuratorFramework client) throws Exception
+                {
+                    changes.add("leader");
+                    try
+                    {
+                        Thread.currentThread().join();
+                    }
+                    catch ( InterruptedException e )
+                    {
+                        changes.add("release");
+                        Thread.currentThread().interrupt();
+                    }
+                }
+            };
+            selector = new LeaderSelector(client, "/test", listener);
+            selector.start();
+
+            Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.CONNECTED.name());
+            Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS),
"leader");
+            server.close();
+            Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.SUSPENDED.name());
+            Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS),
"release");
+            Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.LOST.name());
+
+            selector.close();
+            client.close();
+            timing.sleepABit();
+            changes.clear();
+
+            server = new TestingServer();
+            client = CuratorFrameworkFactory
+                .builder()
+                .connectString(server.getConnectString())
+                .connectionTimeoutMs(timing.connection())
+                .sessionTimeoutMs(timing.session())
+                .retryPolicy(new RetryOneTime(1))
+                .errorPolicy(new SessionErrorPolicy())
+                .build();
+            client.getConnectionStateListenable().addListener(stateListener);
+            client.start();
+            selector = new LeaderSelector(client, "/test", listener);
+            selector.start();
+
+            Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.CONNECTED.name());
+            Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS),
"leader");
+            server.stop();
+            Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.SUSPENDED.name());
+            Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.LOST.name());
+            Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS),
"release");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(selector);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
     public void testLeaderNodeDeleteOnInterrupt() throws Exception
     {
         Timing timing = new Timing();


Mime
View raw message