curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [10/29] curator git commit: major refactoring. Abstracting old/new behavior into a pluggable ConnectionHandlingPolicy. Also, IMPORTANT, made the new behavior the default. This needs to be discussed but it's a major improvement and we should default to it
Date Tue, 01 Sep 2015 13:02:34 GMT
major refactoring. Abstracting old/new behavior into a pluggable ConnectionHandlingPolicy.
Also, IMPORTANT, made the new behavior the default. This needs to be discussed but it's a
major improvement and we should default to it.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e2391370
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e2391370
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e2391370

Branch: refs/heads/CURATOR-3.0
Commit: e239137019608f02cabb23c27ab13adcef88c027
Parents: 6381ccb
Author: randgalt <randgalt@apache.org>
Authored: Sat Aug 22 19:06:55 2015 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Sat Aug 22 19:06:55 2015 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     | 85 ++++++++++++--------
 .../apache/curator/CuratorZookeeperClient.java  | 32 ++++----
 .../main/java/org/apache/curator/RetryLoop.java | 28 +++++--
 .../ClassicConnectionHandlingPolicy.java        | 48 +++++++++++
 .../connection/ConnectionHandlingPolicy.java    | 84 +++++++++++++++++++
 .../StandardConnectionHandlingPolicy.java       | 35 ++++++++
 .../java/org/apache/curator/TestEnsurePath.java |  5 +-
 .../framework/CuratorFrameworkFactory.java      | 54 +++++++++++--
 .../framework/imps/CuratorFrameworkImpl.java    | 43 +++++-----
 .../framework/state/ConnectionState.java        | 20 +----
 .../framework/state/ConnectionStateManager.java |  9 +--
 .../imps/TestEnabledSessionExpiredState.java    |  5 +-
 .../apache/curator/test/BaseClassForTests.java  |  6 +-
 13 files changed, 336 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index c3d6921..d6ddd33 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -18,9 +18,10 @@
  */
 package org.apache.curator;
 
-import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.connection.ConnectionHandlingPolicy;
 import org.apache.curator.drivers.TracerDriver;
 import org.apache.curator.ensemble.EnsembleProvider;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.DebugUtils;
 import org.apache.curator.utils.ZookeeperFactory;
 import org.apache.zookeeper.KeeperException;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -48,18 +50,19 @@ class ConnectionState implements Watcher, Closeable
     private final int sessionTimeoutMs;
     private final int connectionTimeoutMs;
     private final AtomicReference<TracerDriver> tracer;
+    private final ConnectionHandlingPolicy connectionHandlingPolicy;
     private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>();
     private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
     private final AtomicLong instanceIndex = new AtomicLong();
     private volatile long connectionStartMs = 0;
-    private final AtomicBoolean enableTimeoutChecks = new AtomicBoolean(true);
 
-    ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver>
tracer, boolean canBeReadOnly)
+    ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver>
tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
     {
         this.ensembleProvider = ensembleProvider;
         this.sessionTimeoutMs = sessionTimeoutMs;
         this.connectionTimeoutMs = connectionTimeoutMs;
         this.tracer = tracer;
+        this.connectionHandlingPolicy = connectionHandlingPolicy;
         if ( parentWatcher != null )
         {
             parentWatchers.offer(parentWatcher);
@@ -68,11 +71,6 @@ class ConnectionState implements Watcher, Closeable
         zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs,
canBeReadOnly);
     }
 
-    void disableTimeoutChecks()
-    {
-        enableTimeoutChecks.set(false);
-    }
-
     ZooKeeper getZooKeeper() throws Exception
     {
         if ( SessionFailRetryLoop.sessionForThreadHasFailed() )
@@ -87,13 +85,10 @@ class ConnectionState implements Watcher, Closeable
             throw exception;
         }
 
-        if ( enableTimeoutChecks.get() )
+        boolean localIsConnected = isConnected.get();
+        if ( !localIsConnected )
         {
-            boolean localIsConnected = isConnected.get();
-            if ( !localIsConnected )
-            {
-                checkTimeouts();
-            }
+            checkTimeouts();
         }
 
         return zooKeeper.getZooKeeper();
@@ -194,35 +189,57 @@ class ConnectionState implements Watcher, Closeable
 
     private synchronized void checkTimeouts() throws Exception
     {
-        int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
-        long elapsed = System.currentTimeMillis() - connectionStartMs;
-        if ( elapsed >= minTimeout )
+        Callable<Boolean> hasNewConnectionString  = new Callable<Boolean>()
+        {
+            @Override
+            public Boolean call()
+            {
+                return zooKeeper.hasNewConnectionString();
+            }
+        };
+        ConnectionHandlingPolicy.CheckTimeoutsResult result = connectionHandlingPolicy.checkTimeouts(hasNewConnectionString,
connectionStartMs, sessionTimeoutMs, connectionTimeoutMs);
+        switch ( result )
         {
-            if ( zooKeeper.hasNewConnectionString() )
+            default:
+            case NOP:
+            {
+                break;
+            }
+
+            case NEW_CONNECTION_STRING:
             {
                 handleNewConnectionString();
+                break;
             }
-            else
+
+            case RESET_CONNECTION:
             {
-                int maxTimeout = Math.max(sessionTimeoutMs, connectionTimeoutMs);
-                if ( elapsed > maxTimeout )
+                if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES)
)
                 {
-                    if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES)
)
-                    {
-                        log.warn(String.format("Connection attempt unsuccessful after %d
(greater than max timeout of %d). Resetting connection and trying again with a new connection.",
elapsed, maxTimeout));
-                    }
-                    reset();
+                    long elapsed = System.currentTimeMillis() - connectionStartMs;
+                    int maxTimeout = Math.max(sessionTimeoutMs, connectionTimeoutMs);
+                    log.warn(String.format("Connection attempt unsuccessful after %d (greater
than max timeout of %d). Resetting connection and trying again with a new connection.", elapsed,
maxTimeout));
                 }
-                else
+                reset();
+                break;
+            }
+
+            case CONNECTION_TIMEOUT:
+            {
+                KeeperException.ConnectionLossException connectionLossException = new CuratorConnectionLossException();
+                if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES)
)
                 {
-                    KeeperException.ConnectionLossException connectionLossException = new
CuratorConnectionLossException();
-                    if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES)
)
-                    {
-                        log.error(String.format("Connection timed out for connection string
(%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs,
elapsed), connectionLossException);
-                    }
-                    tracer.get().addCount("connections-timed-out", 1);
-                    throw connectionLossException;
+                    long elapsed = System.currentTimeMillis() - connectionStartMs;
+                    log.error(String.format("Connection timed out for connection string (%s)
and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed),
connectionLossException);
                 }
+                tracer.get().addCount("connections-timed-out", 1);
+                throw connectionLossException;
+            }
+
+            case SESSION_TIMEOUT:
+            {
+                handleExpiredSession();
+                break;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index a065d78..9342acf 100644
--- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -21,6 +21,8 @@ package org.apache.curator;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.curator.connection.ClassicConnectionHandlingPolicy;
+import org.apache.curator.connection.ConnectionHandlingPolicy;
 import org.apache.curator.drivers.TracerDriver;
 import org.apache.curator.ensemble.EnsembleProvider;
 import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
@@ -51,7 +53,7 @@ public class CuratorZookeeperClient implements Closeable
     private final int connectionTimeoutMs;
     private final AtomicBoolean started = new AtomicBoolean(false);
     private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new
DefaultTracerDriver());
-    private final boolean manageTimeouts;
+    private final ConnectionHandlingPolicy connectionHandlingPolicy;
     private final AtomicReference<Exception> debugException = new AtomicReference<>();
 
     /**
@@ -64,7 +66,7 @@ public class CuratorZookeeperClient implements Closeable
      */
     public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs,
Watcher watcher, RetryPolicy retryPolicy)
     {
-        this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs,
connectionTimeoutMs, watcher, retryPolicy, false, true);
+        this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs,
connectionTimeoutMs, watcher, retryPolicy, false, new ClassicConnectionHandlingPolicy());
     }
 
     /**
@@ -76,7 +78,7 @@ public class CuratorZookeeperClient implements Closeable
      */
     public CuratorZookeeperClient(EnsembleProvider ensembleProvider, int sessionTimeoutMs,
int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
     {
-        this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs,
watcher, retryPolicy, false, true);
+        this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs,
watcher, retryPolicy, false, new ClassicConnectionHandlingPolicy());
     }
 
     /**
@@ -93,7 +95,7 @@ public class CuratorZookeeperClient implements Closeable
      */
     public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean
canBeReadOnly)
     {
-        this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs,
watcher, retryPolicy, canBeReadOnly, true);
+        this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs,
watcher, retryPolicy, canBeReadOnly, new ClassicConnectionHandlingPolicy());
     }
 
     /**
@@ -107,11 +109,12 @@ public class CuratorZookeeperClient implements Closeable
      *                      read only mode in case of a network partition. See
      *                      {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[],
boolean)}
      *                      for details
-     * @param manageTimeouts in general, Curator clients try to manage session/connection
timeouts. If this is false, that management is turned off
+     * @param connectionHandlingPolicy connection handling policy - use one of the pre-defined
policies or write your own
+     * @since 3.0.0
      */
-    public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean
canBeReadOnly, boolean manageTimeouts)
+    public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean
canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
     {
-        this.manageTimeouts = manageTimeouts;
+        this.connectionHandlingPolicy = connectionHandlingPolicy;
         if ( sessionTimeoutMs < connectionTimeoutMs )
         {
             log.warn(String.format("session timeout [%d] is less than connection timeout
[%d]", sessionTimeoutMs, connectionTimeoutMs));
@@ -121,11 +124,7 @@ public class CuratorZookeeperClient implements Closeable
         ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider
cannot be null");
 
         this.connectionTimeoutMs = connectionTimeoutMs;
-        state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs,
connectionTimeoutMs, watcher, tracer, canBeReadOnly);
-        if ( !manageTimeouts )
-        {
-            state.disableTimeoutChecks();
-        }
+        state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs,
connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
         setRetryPolicy(retryPolicy);
     }
 
@@ -328,14 +327,13 @@ public class CuratorZookeeperClient implements Closeable
     }
 
     /**
-     * Returns true if connection timeouts should cause the retry policy to be checked. If
false
-     * is returned, throw a connection exception without retrying
+     * Return the configured connection handling policy
      *
-     * @return true/false
+     * @return ConnectionHandlingPolicy
      */
-    public boolean retryConnectionTimeouts()
+    public ConnectionHandlingPolicy getConnectionHandlingPolicy()
     {
-        return manageTimeouts;
+        return connectionHandlingPolicy;
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/RetryLoop.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
index 92291c1..35d55a1 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
@@ -110,14 +110,30 @@ public class RetryLoop
                 }
 
                 client.internalBlockUntilConnectedOrTimedOut();
-                if ( !client.isConnected() && !client.retryConnectionTimeouts() )
+
+                switch ( client.getConnectionHandlingPolicy().preRetry(client) )
                 {
-                    connectionFailed = true;
-                    break;
+                    default:
+                    case CALL_PROC:
+                    {
+                        result = proc.call();
+                        retryLoop.markComplete();
+                        break;
+                    }
+
+                    case EXIT_RETRIES:
+                    {
+                        retryLoop.markComplete();
+                        break;
+                    }
+
+                    case CONNECTION_TIMEOUT:
+                    {
+                        connectionFailed = true;
+                        retryLoop.markComplete();
+                        break;
+                    }
                 }
-
-                result = proc.call();
-                retryLoop.markComplete();
             }
             catch ( Exception e )
             {

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
new file mode 100644
index 0000000..71dc065
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
@@ -0,0 +1,48 @@
+package org.apache.curator.connection;
+
+import org.apache.curator.CuratorZookeeperClient;
+import java.util.concurrent.Callable;
+
+public class ClassicConnectionHandlingPolicy implements ConnectionHandlingPolicy
+{
+    @Override
+    public boolean isEmulatingClassicHandling()
+    {
+        return true;
+    }
+
+    @Override
+    public CheckTimeoutsResult checkTimeouts(Callable<Boolean> hasNewConnectionString,
long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception
+    {
+        CheckTimeoutsResult result = CheckTimeoutsResult.NOP;
+        int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
+        long elapsed = System.currentTimeMillis() - connectionStartMs;
+        if ( elapsed >= minTimeout )
+        {
+            if ( hasNewConnectionString.call() )
+            {
+                result = CheckTimeoutsResult.NEW_CONNECTION_STRING;
+            }
+            else
+            {
+                int maxTimeout = Math.max(sessionTimeoutMs, connectionTimeoutMs);
+                if ( elapsed > maxTimeout )
+                {
+                    result = CheckTimeoutsResult.RESET_CONNECTION;
+                }
+                else
+                {
+                    result = CheckTimeoutsResult.CONNECTION_TIMEOUT;
+                }
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    public PreRetryResult preRetry(CuratorZookeeperClient client) throws Exception
+    {
+        return PreRetryResult.CALL_PROC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
new file mode 100644
index 0000000..f3ecce6
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
@@ -0,0 +1,84 @@
+package org.apache.curator.connection;
+
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.zookeeper.KeeperException;
+import java.util.concurrent.Callable;
+
+public interface ConnectionHandlingPolicy
+{
+    /**
+     * Return true if this policy should behave like the pre-3.0.0 version of Curator
+     *
+     * @return true/false
+     */
+    boolean isEmulatingClassicHandling();
+
+    enum CheckTimeoutsResult
+    {
+        /**
+         * Do nothing
+         */
+        NOP,
+
+        /**
+         * handle a new connection string
+         */
+        NEW_CONNECTION_STRING,
+
+        /**
+         * reset/recreate the internal ZooKeeper connection
+         */
+        RESET_CONNECTION,
+
+        /**
+         * handle a connection timeout
+         */
+        CONNECTION_TIMEOUT,
+
+        /**
+         * handle a session timeout
+         */
+        SESSION_TIMEOUT
+    }
+
+    /**
+     * Check timeouts. NOTE: this method is only called when an attempt to access to the
ZooKeeper instances
+     * is made and the connection has not completed.
+     *
+     * @param hasNewConnectionString proc to call to check if there is a new connection string.
Important: the internal state is cleared after
+     *                               this is called so you MUST handle the new connection
string if <tt>true</tt> is returned
+     * @param connectionStartMs the epoch/ms time that the connection was first initiated
+     * @param sessionTimeoutMs the configured session timeout in milliseconds
+     * @param connectionTimeoutMs the configured connection timeout in milliseconds
+     * @return result
+     * @throws Exception errors
+     */
+    CheckTimeoutsResult checkTimeouts(Callable<Boolean> hasNewConnectionString, long
connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception;
+
+    enum PreRetryResult
+    {
+        /**
+         * The retry loop should call the procedure
+         */
+        CALL_PROC,
+
+        /**
+         * Do not call the procedure and exit the retry loop
+         */
+        EXIT_RETRIES,
+
+        /**
+         * Do not call the procedure and throw {@link KeeperException.ConnectionLossException}
+         */
+        CONNECTION_TIMEOUT
+    }
+
+    /**
+     * Called prior to each iteration of a procedure in a retry loop
+     *
+     * @param client the client
+     * @return result
+     * @throws Exception errors
+     */
+    PreRetryResult preRetry(CuratorZookeeperClient client) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
new file mode 100644
index 0000000..06285ca
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
@@ -0,0 +1,35 @@
+package org.apache.curator.connection;
+
+import org.apache.curator.CuratorZookeeperClient;
+import java.util.concurrent.Callable;
+
+public class StandardConnectionHandlingPolicy implements ConnectionHandlingPolicy
+{
+    @Override
+    public boolean isEmulatingClassicHandling()
+    {
+        return false;
+    }
+
+    @Override
+    public CheckTimeoutsResult checkTimeouts(Callable<Boolean> hasNewConnectionString,
long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception
+    {
+        if ( hasNewConnectionString.call() )
+        {
+            return CheckTimeoutsResult.NEW_CONNECTION_STRING;
+        }
+        return CheckTimeoutsResult.NOP;
+    }
+
+    @Override
+    public PreRetryResult preRetry(CuratorZookeeperClient client) throws Exception
+    {
+        // TODO - see if there are other servers to connect to
+        if ( !client.isConnected() )
+        {
+            return PreRetryResult.CONNECTION_TIMEOUT;
+        }
+
+        return PreRetryResult.CALL_PROC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
----------------------------------------------------------------------
diff --git a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
index 871e4af..59c30ac 100644
--- a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
+++ b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.curator.connection.ClassicConnectionHandlingPolicy;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.ZooKeeper;
@@ -51,7 +52,7 @@ public class TestEnsurePath
         CuratorZookeeperClient  curator = mock(CuratorZookeeperClient.class);
         RetryPolicy             retryPolicy = new RetryOneTime(1);
         RetryLoop               retryLoop = new RetryLoop(retryPolicy, null);
-        when(curator.retryConnectionTimeouts()).thenReturn(true);
+        when(curator.getConnectionHandlingPolicy()).thenReturn(new ClassicConnectionHandlingPolicy());
         when(curator.getZooKeeper()).thenReturn(client);
         when(curator.getRetryPolicy()).thenReturn(retryPolicy);
         when(curator.newRetryLoop()).thenReturn(retryLoop);
@@ -77,7 +78,7 @@ public class TestEnsurePath
         RetryPolicy             retryPolicy = new RetryOneTime(1);
         RetryLoop               retryLoop = new RetryLoop(retryPolicy, null);
         final CuratorZookeeperClient  curator = mock(CuratorZookeeperClient.class);
-        when(curator.retryConnectionTimeouts()).thenReturn(true);
+        when(curator.getConnectionHandlingPolicy()).thenReturn(new ClassicConnectionHandlingPolicy());
         when(curator.getZooKeeper()).thenReturn(client);
         when(curator.getRetryPolicy()).thenReturn(retryPolicy);
         when(curator.newRetryLoop()).thenReturn(retryLoop);

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index fad4fc2..01a8666 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -21,6 +21,9 @@ package org.apache.curator.framework;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.curator.RetryPolicy;
+import org.apache.curator.connection.ClassicConnectionHandlingPolicy;
+import org.apache.curator.connection.ConnectionHandlingPolicy;
+import org.apache.curator.connection.StandardConnectionHandlingPolicy;
 import org.apache.curator.ensemble.EnsembleProvider;
 import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
 import org.apache.curator.framework.api.ACLProvider;
@@ -35,6 +38,7 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.utils.DefaultZookeeperFactory;
 import org.apache.curator.utils.ZookeeperFactory;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import java.net.InetAddress;
@@ -117,7 +121,7 @@ public class CuratorFrameworkFactory
         private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
         private boolean canBeReadOnly = false;
         private boolean useContainerParentsIfAvailable = true;
-        private boolean enableSessionExpiredState = Boolean.getBoolean("curator-enable-session-expired-state");
+        private ConnectionHandlingPolicy connectionHandlingPolicy = Boolean.getBoolean("curator-use-classic-connection-handling")
? new ClassicConnectionHandlingPolicy() : new StandardConnectionHandlingPolicy();
 
         /**
          * Apply the current values and build a new CuratorFramework
@@ -346,14 +350,50 @@ public class CuratorFrameworkFactory
         }
 
         /**
-         * Changes the meaning of {@link ConnectionState#LOST} from it's pre Curator 3.0.0
meaning
-         * to a true lost session state. See the {@link ConnectionState#LOST} doc for details.
+         * <p>
+         *     Change the connection handling policy. The default policy is {@link StandardConnectionHandlingPolicy}.
+         * </p>
+         * <p>
+         *     <strong>IMPORTANT: </strong> StandardConnectionHandlingPolicy
has different behavior than the connection
+         *     policy handling prior to version 3.0.0. You can specify that the connection
handling be the method
+         *     prior to 3.0.0 by passing in an instance of {@link ClassicConnectionHandlingPolicy}
here or by
+         *     setting the command line value "curator-use-classic-connection-handling" to
true (e.g. <tt>-Dcurator-use-classic-connection-handling=true</tt>).
+         * </p>
+         * <p>
+         *     Major differences from the older behavior are:
+         * </p>
+         * <ul>
+         *     <li>
+         *         Session/connection timeouts are no longer managed by the low-level client.
They are managed
+         *         by the CuratorFramework instance. There should be no noticeable differences.
+         *     </li>
+         *     <li>
+         *         Prior to 3.0.0, an elapsed connection timeout would be presented to the
retry policy, possibly
+         *         causing retries. Now, elapsed connection timeouts are only retried if
there is an another server
+         *         in the connection string. i.e. a new instance will be retried should the
retry policy allow a retry.
+         *         If no other servers remain, a {@link KeeperException.ConnectionLossException}
is thrown immediately
+         *         without notifying the retry policy.
+         *     </li>
+         *     <li>
+         *         <strong>MOST IMPORTANTLY!</strong> Prior to 3.0.0, {@link
ConnectionState#LOST} did not imply
+         *         a lost session (much to the confusion of users). Now,
+         *         Curator will set the LOST state only when it believes that the ZooKeeper
session
+         *         has expired. ZooKeeper connections have a session. When the session expires,
clients must take appropriate
+         *         action. In Curator, this is complicated by the fact that Curator internally
manages the ZooKeeper
+         *         connection. Now, Curator will set the LOST state when any of the following
occurs:
+         *         a) ZooKeeper returns a {@link Watcher.Event.KeeperState#Expired} or {@link
KeeperException.Code#SESSIONEXPIRED};
+         *         b) Curator closes the internally managed ZooKeeper instance; c) The configured
session timeout
+         *         elapses during a network partition.
+         *     </li>
+         * </ul>
          *
+         * @param connectionHandlingPolicy the policy
          * @return this
+         * @since 3.0.0
          */
-        public Builder enableSessionExpiredState()
+        public Builder connectionHandlingPolicy(ConnectionHandlingPolicy connectionHandlingPolicy)
         {
-            this.enableSessionExpiredState = true;
+            this.connectionHandlingPolicy = connectionHandlingPolicy;
             return this;
         }
 
@@ -412,9 +452,9 @@ public class CuratorFrameworkFactory
             return useContainerParentsIfAvailable;
         }
 
-        public boolean getEnableSessionExpiredState()
+        public ConnectionHandlingPolicy getConnectionHandlingPolicy()
         {
-            return enableSessionExpiredState;
+            return connectionHandlingPolicy;
         }
 
         @Deprecated

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index bcbeecd..44a8ec6 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -84,7 +84,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final NamespaceFacadeCache namespaceFacadeCache;
     private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
     private final boolean useContainerParentsIfAvailable;
-    private final boolean enableSessionExpiredState;
     private final AtomicLong currentInstanceIndex = new AtomicLong(-1);
 
     private volatile ExecutorService executorService;
@@ -107,24 +106,24 @@ public class CuratorFrameworkImpl implements CuratorFramework
     {
         ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
         this.client = new CuratorZookeeperClient
-        (
-            localZookeeperFactory,
-            builder.getEnsembleProvider(),
-            builder.getSessionTimeoutMs(),
-            builder.getConnectionTimeoutMs(),
-            new Watcher()
-            {
-                @Override
-                public void process(WatchedEvent watchedEvent)
+            (
+                localZookeeperFactory,
+                builder.getEnsembleProvider(),
+                builder.getSessionTimeoutMs(),
+                builder.getConnectionTimeoutMs(),
+                new Watcher()
                 {
-                    CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this,
CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()),
null, null, null, null, null, watchedEvent, null, null);
-                    processEvent(event);
-                }
-            },
-            builder.getRetryPolicy(),
-            builder.canBeReadOnly(),
-            !builder.getEnableSessionExpiredState() // inverse is correct here. By default,
CuratorZookeeperClient manages timeouts. The new SessionExpiredState needs this disabled.
-        );
+                    @Override
+                    public void process(WatchedEvent watchedEvent)
+                    {
+                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this,
CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()),
null, null, null, null, null, watchedEvent, null, null);
+                        processEvent(event);
+                    }
+                },
+                builder.getRetryPolicy(),
+                builder.canBeReadOnly(),
+                builder.getConnectionHandlingPolicy()
+            );
 
         listeners = new ListenerContainer<CuratorListener>();
         unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
@@ -132,12 +131,11 @@ public class CuratorFrameworkImpl implements CuratorFramework
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         maxCloseWaitMs = builder.getMaxCloseWaitMs();
-        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(),
builder.getEnableSessionExpiredState(), builder.getSessionTimeoutMs());
+        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(),
builder.getSessionTimeoutMs());
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
         useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
-        enableSessionExpiredState = builder.getEnableSessionExpiredState();
 
         byte[] builderDefaultData = builder.getDefaultData();
         defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length)
: new byte[0];
@@ -211,7 +209,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         state = parent.state;
         authInfos = parent.authInfos;
         useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable;
-        enableSessionExpiredState = parent.enableSessionExpiredState;
     }
 
     @Override
@@ -699,7 +696,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
     private void checkNewConnection()
     {
-        if ( enableSessionExpiredState )
+        if ( !client.getConnectionHandlingPolicy().isEmulatingClassicHandling() )
         {
             long instanceIndex = client.getInstanceIndex();
             long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex);
@@ -752,7 +749,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
             return;
         }
 
-        if ( !enableSessionExpiredState )
+        if ( client.getConnectionHandlingPolicy().isEmulatingClassicHandling() )
         {
             doSyncForSuspendedConnection(client.getInstanceIndex());
         }

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index 79f3b62..fe40abf 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -64,32 +64,18 @@ public enum ConnectionState
 
     /**
      * <p>
-     *     NOTE: the meaning of this state depends on how your CuratorFramework instance
-     *     is created.
-     * </p>
-     *
-     * <p>
-     *     The default meaning of LOST (and the only meaning prior to Curator 3.0.0) is:
-     *     The connection is confirmed to be lost (i.e. the retry policy has given up). Close
any locks, leaders, etc. and
-     *     attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED}
-     *     state after this but you should still consider any locks, etc. as dirty/unstable
-     * </p>
-     *
-     * <p>
-     *     <strong>Since 3.0.0</strong>, you can alter the meaning of LOST by
calling
-     *     {@link CuratorFrameworkFactory.Builder#enableSessionExpiredState()}. In this mode,
      *     Curator will set the LOST state only when it believes that the ZooKeeper session
      *     has expired. ZooKeeper connections have a session. When the session expires, clients
must take appropriate
      *     action. In Curator, this is complicated by the fact that Curator internally manages
the ZooKeeper
-     *     connection. In this mode, Curator will set the LOST state when any of the following
occurs:
+     *     connection. Curator will set the LOST state when any of the following occurs:
      *     a) ZooKeeper returns a {@link Watcher.Event.KeeperState#Expired} or {@link KeeperException.Code#SESSIONEXPIRED};
      *     b) Curator closes the internally managed ZooKeeper instance; c) The configured
session timeout
      *     elapses during a network partition.
      * </p>
      *
      * <p>
-     *     NOTE: the new behavior for the LOST state can also be enabled via the command
line
-     *     property "curator-enable-session-expired-state" (e.g. -Dcurator-enable-session-expired-state=true)
+     *     NOTE: see {@link CuratorFrameworkFactory.Builder#connectionHandlingPolicy} for
an important note about a
+     *     change in meaning to LOST since 3.0.0
      * </p>
      */
     LOST

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 52e0d07..2e7492f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.state;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import org.apache.curator.connection.ConnectionHandlingPolicyStyle;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.utils.ThreadUtils;
@@ -35,7 +36,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -66,7 +66,6 @@ public class ConnectionStateManager implements Closeable
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
     private final CuratorFramework client;
-    private final boolean enableSessionExpiredState;
     private final int sessionTimeoutMs;
     private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
     private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
@@ -88,13 +87,11 @@ public class ConnectionStateManager implements Closeable
     /**
      * @param client        the client
      * @param threadFactory thread factory to use or null for a default
-     * @param enableSessionExpiredState if true, applies new meaning for LOST as described
here: {@link ConnectionState#LOST}
      * @param sessionTimeoutMs the ZK session timeout in milliseconds
      */
-    public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, boolean
enableSessionExpiredState, int sessionTimeoutMs)
+    public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int
sessionTimeoutMs)
     {
         this.client = client;
-        this.enableSessionExpiredState = enableSessionExpiredState;
         this.sessionTimeoutMs = sessionTimeoutMs;
         if ( threadFactory == null )
         {
@@ -273,7 +270,7 @@ public class ConnectionStateManager implements Closeable
                             }
                         );
                 }
-                else if ( enableSessionExpiredState )
+                else if ( !client.getZookeeperClient().getConnectionHandlingPolicy().isEmulatingClassicHandling()
)
                 {
                     synchronized(this)
                     {

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
index cd415b1..4d6f473 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
@@ -53,7 +53,6 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests
             .connectString(server.getConnectString())
             .connectionTimeoutMs(timing.connection())
             .sessionTimeoutMs(timing.session())
-            .enableSessionExpiredState()
             .retryPolicy(new RetryOneTime(1))
             .build();
         client.start();
@@ -115,7 +114,7 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
         server.stop();
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
-        Assert.assertEquals(states.poll(timing.multiple(2).session(), TimeUnit.MILLISECONDS),
ConnectionState.LOST);
+        Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
     }
 
     @Test
@@ -125,7 +124,7 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests
         server.stop();
         timing.sleepForSession();
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
-        Assert.assertEquals(states.poll(timing.multiple(2).session(), TimeUnit.MILLISECONDS),
ConnectionState.LOST);
+        Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
         server.restart();
         client.checkExists().forPath("/");
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index c9f3524..1f6503d 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -86,14 +86,14 @@ public class BaseClassForTests
                 public void beforeInvocation(IInvokedMethod method, ITestResult testResult)
                 {
                     int invocationCount = method.getTestMethod().getCurrentInvocationCount();
-                    System.setProperty("curator-enable-session-expired-state", Boolean.toString(invocationCount
== 1));
-                    log.info("curator-enable-session-expired-state: " + Boolean.toString(invocationCount
== 1));
+                    System.setProperty("curator-use-classic-connection-handling", Boolean.toString(invocationCount
== 1));
+                    log.info("curator-use-classic-connection-handling: " + Boolean.toString(invocationCount
== 1));
                 }
 
                 @Override
                 public void afterInvocation(IInvokedMethod method, ITestResult testResult)
                 {
-                    System.clearProperty("curator-enable-session-expired-state");
+                    System.clearProperty("curator-use-classic-connection-handling");
                 }
             };
             context.getSuite().addListener(listener);


Mime
View raw message