curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [2/2] curator git commit: wip
Date Sat, 22 Aug 2015 15:47:12 GMT
wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/847cc0d2
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/847cc0d2
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/847cc0d2

Branch: refs/heads/CURATOR-247
Commit: 847cc0d2415f59c2943d4a2734564119ffb38bb1
Parents: b8d4c3d
Author: randgalt <randgalt@apache.org>
Authored: Sat Aug 22 10:47:01 2015 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Sat Aug 22 10:47:01 2015 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     | 15 ++++++--
 .../apache/curator/CuratorZookeeperClient.java  | 36 ++++++++++++++++++--
 .../framework/imps/CuratorFrameworkImpl.java    | 12 ++-----
 .../framework/state/ConnectionStateManager.java |  2 +-
 .../imps/TestEnabledSessionExpiredState.java    |  2 +-
 ...estResetConnectionWithBackgroundFailure.java | 19 +++++++----
 .../java/org/apache/curator/test/Timing.java    | 21 ++++++++++++
 7 files changed, 84 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index 1dfdbef..c3d6921 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -52,6 +52,7 @@ class ConnectionState implements Watcher, Closeable
     private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
     private final AtomicLong instanceIndex = new AtomicLong();
     private volatile long connectionStartMs = 0;
+    private final AtomicBoolean enableTimeoutChecks = new AtomicBoolean(true);
 
     ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver>
tracer, boolean canBeReadOnly)
     {
@@ -67,6 +68,11 @@ class ConnectionState implements Watcher, Closeable
         zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs,
canBeReadOnly);
     }
 
+    void disableTimeoutChecks()
+    {
+        enableTimeoutChecks.set(false);
+    }
+
     ZooKeeper getZooKeeper() throws Exception
     {
         if ( SessionFailRetryLoop.sessionForThreadHasFailed() )
@@ -81,10 +87,13 @@ class ConnectionState implements Watcher, Closeable
             throw exception;
         }
 
-        boolean localIsConnected = isConnected.get();
-        if ( !localIsConnected )
+        if ( enableTimeoutChecks.get() )
         {
-            checkTimeouts();
+            boolean localIsConnected = isConnected.get();
+            if ( !localIsConnected )
+            {
+                checkTimeouts();
+            }
         }
 
         return zooKeeper.getZooKeeper();

http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index fbb2f4c..ce6e9d3 100644
--- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -50,6 +50,7 @@ public class CuratorZookeeperClient implements Closeable
     private final int connectionTimeoutMs;
     private final AtomicBoolean started = new AtomicBoolean(false);
     private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new
DefaultTracerDriver());
+    private final boolean manageTimeouts;
 
     /**
      *
@@ -61,7 +62,7 @@ public class CuratorZookeeperClient implements Closeable
      */
     public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs,
Watcher watcher, RetryPolicy retryPolicy)
     {
-        this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs,
connectionTimeoutMs, watcher, retryPolicy, false);
+        this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs,
connectionTimeoutMs, watcher, retryPolicy, false, true);
     }
 
     /**
@@ -73,7 +74,7 @@ public class CuratorZookeeperClient implements Closeable
      */
     public CuratorZookeeperClient(EnsembleProvider ensembleProvider, int sessionTimeoutMs,
int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
     {
-        this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs,
watcher, retryPolicy, false);
+        this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs,
watcher, retryPolicy, false, true);
     }
 
     /**
@@ -90,6 +91,25 @@ public class CuratorZookeeperClient implements Closeable
      */
     public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean
canBeReadOnly)
     {
+        this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs,
watcher, retryPolicy, canBeReadOnly, true);
+    }
+
+    /**
+     * @param zookeeperFactory factory for creating {@link ZooKeeper} instances
+     * @param ensembleProvider the ensemble provider
+     * @param sessionTimeoutMs session timeout
+     * @param connectionTimeoutMs connection timeout
+     * @param watcher default watcher or null
+     * @param retryPolicy the retry policy to use
+     * @param canBeReadOnly if true, allow ZooKeeper client to enter
+     *                      read only mode in case of a network partition. See
+     *                      {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[],
boolean)}
+     *                      for details
+     * @param manageTimeouts in general, Curator clients try to manage session/connection
timeouts. If this is false, that management is turned off
+     */
+    public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean
canBeReadOnly, boolean manageTimeouts)
+    {
+        this.manageTimeouts = manageTimeouts;
         if ( sessionTimeoutMs < connectionTimeoutMs )
         {
             log.warn(String.format("session timeout [%d] is less than connection timeout
[%d]", sessionTimeoutMs, connectionTimeoutMs));
@@ -100,6 +120,10 @@ public class CuratorZookeeperClient implements Closeable
 
         this.connectionTimeoutMs = connectionTimeoutMs;
         state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs,
connectionTimeoutMs, watcher, tracer, canBeReadOnly);
+        if ( !manageTimeouts )
+        {
+            state.disableTimeoutChecks();
+        }
         setRetryPolicy(retryPolicy);
     }
 
@@ -302,9 +326,15 @@ public class CuratorZookeeperClient implements Closeable
         return state.getInstanceIndex();
     }
 
+    /**
+     * Returns true if connection timeouts should cause the retry policy to be checked. If
false
+     * is returned, throw a connection exception without retrying
+     *
+     * @return true/false
+     */
     public boolean retryConnectionTimeouts()
     {
-        return true;
+        return manageTimeouts;
     }
 
     void addParentWatcher(Watcher watcher)

http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c359fdc..bcbeecd 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -122,15 +122,9 @@ public class CuratorFrameworkImpl implements CuratorFramework
                 }
             },
             builder.getRetryPolicy(),
-            builder.canBeReadOnly()
-        )
-        {
-            @Override
-            public boolean retryConnectionTimeouts()
-            {
-                return !enableSessionExpiredState;
-            }
-        };
+            builder.canBeReadOnly(),
+            !builder.getEnableSessionExpiredState() // inverse is correct here. By default,
CuratorZookeeperClient manages timeouts. The new SessionExpiredState needs this disabled.
+        );
 
         listeners = new ListenerContainer<CuratorListener>();
         unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();

http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 553faac..52e0d07 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -295,7 +295,7 @@ public class ConnectionStateManager implements Closeable
             long elapsedMs = System.currentTimeMillis() - startOfSuspendedEpoch;
             if ( elapsedMs >= sessionTimeoutMs )
             {
-                log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting
LOST event and resetting the connection. Elapsed ms: %d", elapsedMs));
+                log.warn(String.format("Session timeout has elapsed while SUSPENDED. Posting
LOST event and resetting the connection. Elapsed ms: %d", elapsedMs));
                 try
                 {
                     client.getZookeeperClient().reset();

http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
index 150eb50..cd415b1 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
@@ -123,7 +123,7 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests
     {
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
         server.stop();
-        Thread.sleep(timing.multiple(1.2).session());
+        timing.sleepForSession();
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
         Assert.assertEquals(states.poll(timing.multiple(2).session(), TimeUnit.MILLISECONDS),
ConnectionState.LOST);
         server.restart();

http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
index 7d2cb89..b90311b 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
@@ -19,6 +19,7 @@
 
 package org.apache.curator.framework.client;
 
+import com.google.common.collect.Queues;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.leader.LeaderSelector;
@@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
 {
@@ -53,7 +56,6 @@ public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
     {
         server.stop();
 
-        final StringBuilder listenerSequence = new StringBuilder();
         LeaderSelector selector = null;
         Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(100));
@@ -74,12 +76,13 @@ public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
             selector.autoRequeue();
             selector.start();
 
+            final BlockingQueue<ConnectionState> states = Queues.newLinkedBlockingQueue();
             ConnectionStateListener listener1 = new ConnectionStateListener()
             {
                 @Override
                 public void stateChanged(CuratorFramework client, ConnectionState newState)
                 {
-                    listenerSequence.append("-").append(newState);
+                    states.add(newState);
                 }
             };
 
@@ -90,17 +93,21 @@ public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
 
             log.debug("Stopping ZK server");
             server.stop();
-            timing.forWaiting().sleepABit();
+            timing.sleepForSession();
 
             log.debug("Starting ZK server");
             server.restart();
-            timing.forWaiting().sleepABit();
+
+            Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.CONNECTED);
+            Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS),
ConnectionState.SUSPENDED);
+            Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS),
ConnectionState.LOST);
+            Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS),
ConnectionState.RECONNECTED);
 
             log.debug("Stopping ZK server");
             server.close();
-            timing.forWaiting().sleepABit();
 
-            Assert.assertEquals(listenerSequence.toString(), "-CONNECTED-SUSPENDED-LOST-RECONNECTED-SUSPENDED-LOST");
+            Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS),
ConnectionState.SUSPENDED);
+            Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS),
ConnectionState.LOST);
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-test/src/main/java/org/apache/curator/test/Timing.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/Timing.java b/curator-test/src/main/java/org/apache/curator/test/Timing.java
index 753d62d..fc4b314 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Timing.java
+++ b/curator-test/src/main/java/org/apache/curator/test/Timing.java
@@ -35,6 +35,7 @@ public class Timing
     private static final int DEFAULT_SECONDS = 10;
     private static final int DEFAULT_WAITING_MULTIPLE = 5;
     private static final double SESSION_MULTIPLE = 1.5;
+    private static final double SESSION_SLEEP_MULTIPLE = 1.75;  // has to be at least session
+ 2/3 of a session to account for missed heartbeat then session expiration
 
     /**
      * Use the default base time
@@ -200,6 +201,26 @@ public class Timing
     }
 
     /**
+     * Sleep enough so that the session should expire
+     *
+     * @throws InterruptedException if interrupted
+     */
+    public void sleepForSession() throws InterruptedException
+    {
+        TimeUnit.MILLISECONDS.sleep(sessionSleep());
+    }
+
+    /**
+     * Return the value to sleep to ensure a ZK session timeout
+     *
+     * @return session sleep timeout
+     */
+    public int sessionSleep()
+    {
+        return multiple(SESSION_SLEEP_MULTIPLE).session();
+    }
+
+    /**
      * Return the value to use for ZK session timeout
      *
      * @return session timeout


Mime
View raw message