curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [15/29] curator git commit: More refinement of classic/new connection handling. Reworked how the retry policy is invoked for each. New behavior is now confirmed to be: wait for connection timeout only once. Some tests will need work due to this
Date Tue, 01 Sep 2015 13:02:39 GMT
More refinement of classic/new connection handling. Reworked how the retry policy is invoked
for each. New behavior is now confirmed to be: wait for connection timeout only once. Some
tests will need work due to this


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5f094f8b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5f094f8b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5f094f8b

Branch: refs/heads/CURATOR-3.0
Commit: 5f094f8bb6dca3c056051cb8800b418839cca0e1
Parents: face403
Author: randgalt <randgalt@apache.org>
Authored: Sun Aug 23 07:49:17 2015 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Sun Aug 23 07:49:17 2015 -0500

----------------------------------------------------------------------
 .../apache/curator/CuratorZookeeperClient.java  |  7 ++-
 .../main/java/org/apache/curator/RetryLoop.java | 57 ++------------------
 .../ClassicConnectionHandlingPolicy.java        | 29 +++++++---
 .../connection/ConnectionHandlingPolicy.java    | 37 +++----------
 .../StandardConnectionHandlingPolicy.java       | 39 ++++++++------
 .../imps/TestEnabledSessionExpiredState.java    |  6 +--
 ...estResetConnectionWithBackgroundFailure.java | 10 ++--
 .../locks/TestInterProcessMutexBase.java        | 19 +++----
 .../java/org/apache/curator/test/Timing.java    | 36 ++++++++-----
 9 files changed, 99 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/5f094f8b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index 9342acf..c8a9936 100644
--- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -358,7 +358,12 @@ public class CuratorZookeeperClient implements Closeable
         state.removeParentWatcher(watcher);
     }
 
-    void internalBlockUntilConnectedOrTimedOut() throws InterruptedException
+    /**
+     * For internal use only
+     *
+     * @throws InterruptedException interruptions
+     */
+    public void internalBlockUntilConnectedOrTimedOut() throws InterruptedException
     {
         long waitTimeMs = connectionTimeoutMs;
         while ( !state.isConnected() && (waitTimeMs > 0) )

http://git-wip-us.apache.org/repos/asf/curator/blob/5f094f8b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
index a17cbf3..4353c61 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
@@ -96,62 +96,13 @@ public class RetryLoop
      */
     public static<T> T      callWithRetry(CuratorZookeeperClient client, Callable<T>
proc) throws Exception
     {
-        T               result = null;
-        RetryLoop       retryLoop = client.newRetryLoop();
-        boolean         connectionFailed = false;
-        while ( retryLoop.shouldContinue() )
+        Exception debugException = client.getDebugException();
+        if ( debugException != null )
         {
-            try
-            {
-                Exception debugException = client.getDebugException();
-                if ( debugException != null )
-                {
-                    throw debugException;
-                }
-
-                client.internalBlockUntilConnectedOrTimedOut();
-
-                switch ( client.getConnectionHandlingPolicy().preRetry(client) )
-                {
-                    default:
-                    case CALL_PROC:
-                    {
-                        result = proc.call();
-                        retryLoop.markComplete();
-                        break;
-                    }
-
-                    case WAIT_FOR_CONNECTION:
-                    {
-                        break;  // just loop
-                    }
-
-                    case EXIT_RETRIES:
-                    {
-                        retryLoop.markComplete();
-                        break;
-                    }
-
-                    case CONNECTION_TIMEOUT:
-                    {
-                        connectionFailed = true;
-                        retryLoop.markComplete();
-                        break;
-                    }
-                }
-            }
-            catch ( Exception e )
-            {
-                retryLoop.takeException(e);
-            }
-        }
-
-        if ( connectionFailed )
-        {
-            throw new KeeperException.ConnectionLossException();
+            throw debugException;
         }
 
-        return result;
+        return client.getConnectionHandlingPolicy().callWithRetry(client, proc);
     }
 
     RetryLoop(RetryPolicy retryPolicy, AtomicReference<TracerDriver> tracer)

http://git-wip-us.apache.org/repos/asf/curator/blob/5f094f8b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
index 71dc065..d0db0bb 100644
--- a/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
+++ b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
@@ -1,6 +1,7 @@
 package org.apache.curator.connection;
 
 import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.RetryLoop;
 import java.util.concurrent.Callable;
 
 public class ClassicConnectionHandlingPolicy implements ConnectionHandlingPolicy
@@ -12,6 +13,28 @@ public class ClassicConnectionHandlingPolicy implements ConnectionHandlingPolicy
     }
 
     @Override
+    public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc)
throws Exception
+    {
+        T result = null;
+        RetryLoop retryLoop = client.newRetryLoop();
+        while ( retryLoop.shouldContinue() )
+        {
+            try
+            {
+                client.internalBlockUntilConnectedOrTimedOut();
+                result = proc.call();
+                retryLoop.markComplete();
+            }
+            catch ( Exception e )
+            {
+                retryLoop.takeException(e);
+            }
+        }
+
+        return result;
+    }
+
+    @Override
     public CheckTimeoutsResult checkTimeouts(Callable<Boolean> hasNewConnectionString,
long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception
     {
         CheckTimeoutsResult result = CheckTimeoutsResult.NOP;
@@ -39,10 +62,4 @@ public class ClassicConnectionHandlingPolicy implements ConnectionHandlingPolicy
 
         return result;
     }
-
-    @Override
-    public PreRetryResult preRetry(CuratorZookeeperClient client) throws Exception
-    {
-        return PreRetryResult.CALL_PROC;
-    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/5f094f8b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
index 7f19159..51bdccc 100644
--- a/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
+++ b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
@@ -1,7 +1,6 @@
 package org.apache.curator.connection;
 
 import org.apache.curator.CuratorZookeeperClient;
-import org.apache.zookeeper.KeeperException;
 import java.util.concurrent.Callable;
 
 public interface ConnectionHandlingPolicy
@@ -13,6 +12,8 @@ public interface ConnectionHandlingPolicy
      */
     boolean isEmulatingClassicHandling();
 
+    <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws
Exception;
+
     enum CheckTimeoutsResult
     {
         /**
@@ -55,35 +56,9 @@ public interface ConnectionHandlingPolicy
      */
     CheckTimeoutsResult checkTimeouts(Callable<Boolean> hasNewConnectionString, long
connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception;
 
-    enum PreRetryResult
-    {
-        /**
-         * The retry loop should call the procedure
-         */
-        CALL_PROC,
-
-        /**
-         * Wait again for connection success or timeout
-         */
-        WAIT_FOR_CONNECTION,
-
-        /**
-         * Do not call the procedure and exit the retry loop
-         */
-        EXIT_RETRIES,
+/*
+    int getDefaultConnectionTimeoutMs();
 
-        /**
-         * Do not call the procedure and throw {@link KeeperException.ConnectionLossException}
-         */
-        CONNECTION_TIMEOUT
-    }
-
-    /**
-     * Called prior to each iteration of a procedure in a retry loop
-     *
-     * @param client the client
-     * @return result
-     * @throws Exception errors
-     */
-    PreRetryResult preRetry(CuratorZookeeperClient client) throws Exception;
+    int getDefaultSessionTimeoutMs();
+*/
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/5f094f8b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
index cbbceac..b16cd53 100644
--- a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
+++ b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
@@ -1,7 +1,7 @@
 package org.apache.curator.connection;
 
-import com.google.common.base.Splitter;
 import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.RetryLoop;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.concurrent.Callable;
@@ -17,30 +17,35 @@ public class StandardConnectionHandlingPolicy implements ConnectionHandlingPolic
     }
 
     @Override
-    public CheckTimeoutsResult checkTimeouts(Callable<Boolean> hasNewConnectionString,
long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception
+    public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc)
throws Exception
     {
-        if ( hasNewConnectionString.call() )
+        client.internalBlockUntilConnectedOrTimedOut();
+
+        T result = null;
+        RetryLoop retryLoop = client.newRetryLoop();
+        while ( retryLoop.shouldContinue() )
         {
-            return CheckTimeoutsResult.NEW_CONNECTION_STRING;
+            try
+            {
+                result = proc.call();
+                retryLoop.markComplete();
+            }
+            catch ( Exception e )
+            {
+                retryLoop.takeException(e);
+            }
         }
-        return CheckTimeoutsResult.NOP;
+
+        return result;
     }
 
     @Override
-    public PreRetryResult preRetry(CuratorZookeeperClient client) throws Exception
+    public CheckTimeoutsResult checkTimeouts(Callable<Boolean> hasNewConnectionString,
long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception
     {
-        if ( !client.isConnected() )
+        if ( hasNewConnectionString.call() )
         {
-            int serverCount = Splitter.on(",").omitEmptyStrings().splitToList(client.getCurrentConnectionString()).size();
-            if ( serverCount > 1 )
-            {
-                log.info("Connection timed out and connection string is > 1. Resetting
connection and trying again.");
-                client.reset(); // unfortunately, there's no way to guarantee that ZK tries
a different server. Internally it calls Collections.shuffle(). Hopefully, this will result
in a different server each time.
-                return PreRetryResult.WAIT_FOR_CONNECTION;
-            }
-            return PreRetryResult.CONNECTION_TIMEOUT;
+            return CheckTimeoutsResult.NEW_CONNECTION_STRING;
         }
-
-        return PreRetryResult.CALL_PROC;
+        return CheckTimeoutsResult.NOP;
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/5f094f8b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
index 4d6f473..a41d581 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
@@ -114,7 +114,7 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
         server.stop();
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
-        Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
+        Assert.assertEquals(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.LOST);
     }
 
     @Test
@@ -122,9 +122,9 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests
     {
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
         server.stop();
-        timing.sleepForSession();
+        timing.forSessionSleep().sleep();
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
-        Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
+        Assert.assertEquals(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.LOST);
         server.restart();
         client.checkExists().forPath("/");
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);

http://git-wip-us.apache.org/repos/asf/curator/blob/5f094f8b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
index b90311b..af31499 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
@@ -93,21 +93,21 @@ public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
 
             log.debug("Stopping ZK server");
             server.stop();
-            timing.sleepForSession();
+            timing.forSessionSleep().sleep();
 
             log.debug("Starting ZK server");
             server.restart();
 
             Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.CONNECTED);
-            Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS),
ConnectionState.SUSPENDED);
-            Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS),
ConnectionState.LOST);
+            Assert.assertEquals(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.SUSPENDED);
+            Assert.assertEquals(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.LOST);
             Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.RECONNECTED);
 
             log.debug("Stopping ZK server");
             server.close();
 
-            Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS),
ConnectionState.SUSPENDED);
-            Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS),
ConnectionState.LOST);
+            Assert.assertEquals(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.SUSPENDED);
+            Assert.assertEquals(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.LOST);
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/5f094f8b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
index 9eb4144..a784e46 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
@@ -32,7 +32,6 @@ import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
-import org.apache.zookeeper.CreateMode;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.List;
@@ -71,7 +70,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
                 @Override
                 public void stateChanged(CuratorFramework client, ConnectionState newState)
                 {
-                    if ( newState == ConnectionState.LOST )
+                    if ( !newState.isConnected() )
                     {
                         latch.countDown();
                     }
@@ -80,6 +79,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
             client.getConnectionStateListenable().addListener(listener);
 
             final AtomicBoolean isFirst = new AtomicBoolean(true);
+            final Object result = new Object();
             ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
             for ( int i = 0; i < 2; ++i )
             {
@@ -99,22 +99,15 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
                                     timing.sleepABit();
 
                                     server.stop();
-                                    Assert.assertTrue(timing.awaitLatch(latch));
+                                    Assert.assertTrue(timing.forWaiting().awaitLatch(latch));
                                     server.restart();
                                 }
                             }
                             finally
                             {
-                                try
-                                {
-                                    lock.release();
-                                }
-                                catch ( Exception e )
-                                {
-                                    // ignore
-                                }
+                                lock.release();
                             }
-                            return null;
+                            return result;
                         }
                     }
                 );
@@ -122,7 +115,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
 
             for ( int i = 0; i < 2; ++i )
             {
-                service.take().get(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
+                Assert.assertEquals(service.take().get(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS), result);
             }
         }
         finally

http://git-wip-us.apache.org/repos/asf/curator/blob/5f094f8b/curator-test/src/main/java/org/apache/curator/test/Timing.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/Timing.java b/curator-test/src/main/java/org/apache/curator/test/Timing.java
index fc4b314..5eb5bc0 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Timing.java
+++ b/curator-test/src/main/java/org/apache/curator/test/Timing.java
@@ -35,7 +35,7 @@ public class Timing
     private static final int DEFAULT_SECONDS = 10;
     private static final int DEFAULT_WAITING_MULTIPLE = 5;
     private static final double SESSION_MULTIPLE = 1.5;
-    private static final double SESSION_SLEEP_MULTIPLE = 1.75;  // has to be at least session
+ 2/3 of a session to account for missed heartbeat then session expiration
+    private static final double SESSION_SLEEP_MULTIPLE = SESSION_MULTIPLE * 1.75;  // has
to be at least session + 2/3 of a session to account for missed heartbeat then session expiration
 
     /**
      * Use the default base time
@@ -180,6 +180,18 @@ public class Timing
     }
 
     /**
+     * Return a new timing that is a multiple of the this timing
+     *
+     * @param n the multiple
+     * @param waitingMultiple new waitingMultiple
+     * @return this timing times the multiple
+     */
+    public Timing multiple(double n, int waitingMultiple)
+    {
+        return new Timing((int)(value * n), unit, waitingMultiple);
+    }
+
+    /**
      * Return a new timing with the standard multiple for waiting on latches, etc.
      *
      * @return this timing multiplied
@@ -191,33 +203,33 @@ public class Timing
     }
 
     /**
-     * Sleep for a small amount of time
+     * Return a new timing with a multiple that ensures a ZK session timeout
      *
-     * @throws InterruptedException if interrupted
+     * @return this timing multiplied
      */
-    public void sleepABit() throws InterruptedException
+    public Timing forSessionSleep()
     {
-        unit.sleep(value / 4);
+        return multiple(SESSION_SLEEP_MULTIPLE, 1);
     }
 
     /**
-     * Sleep enough so that the session should expire
+     * Sleep for a small amount of time
      *
      * @throws InterruptedException if interrupted
      */
-    public void sleepForSession() throws InterruptedException
+    public void sleepABit() throws InterruptedException
     {
-        TimeUnit.MILLISECONDS.sleep(sessionSleep());
+        unit.sleep(value / 4);
     }
 
     /**
-     * Return the value to sleep to ensure a ZK session timeout
+     * Sleep for a the full amount of time
      *
-     * @return session sleep timeout
+     * @throws InterruptedException if interrupted
      */
-    public int sessionSleep()
+    public void sleep() throws InterruptedException
     {
-        return multiple(SESSION_SLEEP_MULTIPLE).session();
+        unit.sleep(value);
     }
 
     /**


Mime
View raw message