curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [05/45] curator git commit: Continued work on new LOST behavior. Added some tests. To get correct behavior it's necessary to not retry connection failures. Retrying connection failures was never a good idea and here's a good opportunity to fix it as this
Date Tue, 01 Sep 2015 13:27:57 GMT
Continued work on new LOST behavior. Added some tests. To get correct behavior it's necessary
to not retry connection failures. Retrying connection failures was never a good idea and here's
a good opportunity to fix it as this requires client action to enable


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/62f3c33c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/62f3c33c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/62f3c33c

Branch: refs/heads/CURATOR-248
Commit: 62f3c33cdb556eccf6fe1cc87ee74b3458431777
Parents: 2343daf
Author: randgalt <randgalt@apache.org>
Authored: Fri Aug 21 17:35:44 2015 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Fri Aug 21 17:35:44 2015 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     | 24 ++---
 .../apache/curator/CuratorZookeeperClient.java  | 58 +++++++-----
 .../main/java/org/apache/curator/RetryLoop.java | 12 +++
 .../framework/CuratorFrameworkFactory.java      |  2 +-
 .../framework/imps/CuratorFrameworkImpl.java    | 43 ++++++++-
 .../framework/state/ConnectionState.java        |  5 +
 .../framework/state/ConnectionStateManager.java | 13 ++-
 .../framework/imps/TestBlockUntilConnected.java |  1 +
 .../imps/TestEnabledSessionExpiredState.java    | 99 ++++++++++++++++++++
 .../apache/curator/test/BaseClassForTests.java  | 37 +++++++-
 .../java/org/apache/curator/test/Timing.java    |  2 +-
 11 files changed, 253 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index d3900a1..1dfdbef 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -171,6 +171,18 @@ class ConnectionState implements Watcher, Closeable
         return ensembleProvider;
     }
 
+    synchronized void reset() throws Exception
+    {
+        log.debug("reset");
+
+        instanceIndex.incrementAndGet();
+
+        isConnected.set(false);
+        connectionStartMs = System.currentTimeMillis();
+        zooKeeper.closeAndReset();
+        zooKeeper.getZooKeeper();   // initiate connection
+    }
+
     private synchronized void checkTimeouts() throws Exception
     {
         int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
@@ -206,18 +218,6 @@ class ConnectionState implements Watcher, Closeable
         }
     }
 
-    private synchronized void reset() throws Exception
-    {
-        log.debug("reset");
-
-        instanceIndex.incrementAndGet();
-
-        isConnected.set(false);
-        connectionStartMs = System.currentTimeMillis();
-        zooKeeper.closeAndReset();
-        zooKeeper.getZooKeeper();   // initiate connection
-    }
-
     private boolean checkState(Event.KeeperState state, boolean wasConnected)
     {
         boolean isConnected = wasConnected;

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index 09b28b2..fbb2f4c 100644
--- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator;
 
 import com.google.common.base.Preconditions;
@@ -43,12 +44,12 @@ import java.util.concurrent.atomic.AtomicReference;
 @SuppressWarnings("UnusedDeclaration")
 public class CuratorZookeeperClient implements Closeable
 {
-    private final Logger                            log = LoggerFactory.getLogger(getClass());
-    private final ConnectionState                   state;
-    private final AtomicReference<RetryPolicy>      retryPolicy = new AtomicReference<RetryPolicy>();
-    private final int                               connectionTimeoutMs;
-    private final AtomicBoolean                     started = new AtomicBoolean(false);
-    private final AtomicReference<TracerDriver>     tracer = new AtomicReference<TracerDriver>(new
DefaultTracerDriver());
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final ConnectionState state;
+    private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<RetryPolicy>();
+    private final int connectionTimeoutMs;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new
DefaultTracerDriver());
 
     /**
      *
@@ -159,7 +160,7 @@ public class CuratorZookeeperClient implements Closeable
         Preconditions.checkState(started.get(), "Client is not started");
 
         log.debug("blockUntilConnectedOrTimedOut() start");
-        TimeTrace       trace = startTracer("blockUntilConnectedOrTimedOut");
+        TimeTrace trace = startTracer("blockUntilConnectedOrTimedOut");
 
         internalBlockUntilConnectedOrTimedOut();
 
@@ -176,7 +177,7 @@ public class CuratorZookeeperClient implements Closeable
      *
      * @throws IOException errors
      */
-    public void     start() throws Exception
+    public void start() throws Exception
     {
         log.debug("Starting");
 
@@ -192,7 +193,7 @@ public class CuratorZookeeperClient implements Closeable
     /**
      * Close the client
      */
-    public void     close()
+    public void close()
     {
         log.debug("Closing");
 
@@ -212,7 +213,7 @@ public class CuratorZookeeperClient implements Closeable
      *
      * @param policy new policy
      */
-    public void     setRetryPolicy(RetryPolicy policy)
+    public void setRetryPolicy(RetryPolicy policy)
     {
         Preconditions.checkNotNull(policy, "policy cannot be null");
 
@@ -234,7 +235,7 @@ public class CuratorZookeeperClient implements Closeable
      * @param name name of the event
      * @return the new tracer ({@link TimeTrace#commit()} must be called)
      */
-    public TimeTrace          startTracer(String name)
+    public TimeTrace startTracer(String name)
     {
         return new TimeTrace(name, tracer.get());
     }
@@ -244,7 +245,7 @@ public class CuratorZookeeperClient implements Closeable
      *
      * @return tracing driver
      */
-    public TracerDriver       getTracerDriver()
+    public TracerDriver getTracerDriver()
     {
         return tracer.get();
     }
@@ -254,7 +255,7 @@ public class CuratorZookeeperClient implements Closeable
      *
      * @param tracer new tracing driver
      */
-    public void               setTracerDriver(TracerDriver tracer)
+    public void setTracerDriver(TracerDriver tracer)
     {
         this.tracer.set(tracer);
     }
@@ -265,7 +266,7 @@ public class CuratorZookeeperClient implements Closeable
      *
      * @return connection string
      */
-    public String             getCurrentConnectionString()
+    public String getCurrentConnectionString()
     {
         return state.getEnsembleProvider().getConnectionString();
     }
@@ -281,6 +282,16 @@ public class CuratorZookeeperClient implements Closeable
     }
 
     /**
+     * For internal use only - reset the internally managed ZK handle
+     *
+     * @throws Exception errors
+     */
+    public void reset() throws Exception
+    {
+        state.reset();
+    }
+
+    /**
      * Every time a new {@link ZooKeeper} instance is allocated, the "instance index"
      * is incremented.
      *
@@ -291,22 +302,27 @@ public class CuratorZookeeperClient implements Closeable
         return state.getInstanceIndex();
     }
 
-    void        addParentWatcher(Watcher watcher)
+    public boolean retryConnectionTimeouts()
+    {
+        return true;
+    }
+
+    void addParentWatcher(Watcher watcher)
     {
         state.addParentWatcher(watcher);
     }
 
-    void        removeParentWatcher(Watcher watcher)
+    void removeParentWatcher(Watcher watcher)
     {
         state.removeParentWatcher(watcher);
     }
 
     void internalBlockUntilConnectedOrTimedOut() throws InterruptedException
     {
-        long            waitTimeMs = connectionTimeoutMs;
+        long waitTimeMs = connectionTimeoutMs;
         while ( !state.isConnected() && (waitTimeMs > 0) )
         {
-            final CountDownLatch            latch = new CountDownLatch(1);
+            final CountDownLatch latch = new CountDownLatch(1);
             Watcher tempWatcher = new Watcher()
             {
                 @Override
@@ -315,9 +331,9 @@ public class CuratorZookeeperClient implements Closeable
                     latch.countDown();
                 }
             };
-            
+
             state.addParentWatcher(tempWatcher);
-            long        startTimeMs = System.currentTimeMillis();
+            long startTimeMs = System.currentTimeMillis();
             try
             {
                 latch.await(1, TimeUnit.SECONDS);
@@ -326,7 +342,7 @@ public class CuratorZookeeperClient implements Closeable
             {
                 state.removeParentWatcher(tempWatcher);
             }
-            long        elapsed = Math.max(1, System.currentTimeMillis() - startTimeMs);
+            long elapsed = Math.max(1, System.currentTimeMillis() - startTimeMs);
             waitTimeMs -= elapsed;
         }
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-client/src/main/java/org/apache/curator/RetryLoop.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
index 065ebef..8d77cf7 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
@@ -98,11 +98,17 @@ public class RetryLoop
     {
         T               result = null;
         RetryLoop       retryLoop = client.newRetryLoop();
+        boolean         connectionFailed = false;
         while ( retryLoop.shouldContinue() )
         {
             try
             {
                 client.internalBlockUntilConnectedOrTimedOut();
+                if ( !client.isConnected() && !client.retryConnectionTimeouts() )
+                {
+                    connectionFailed = true;
+                    break;
+                }
                 
                 result = proc.call();
                 retryLoop.markComplete();
@@ -112,6 +118,12 @@ public class RetryLoop
                 retryLoop.takeException(e);
             }
         }
+
+        if ( connectionFailed )
+        {
+            throw new KeeperException.ConnectionLossException();
+        }
+
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 6209b06..fad4fc2 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -117,7 +117,7 @@ public class CuratorFrameworkFactory
         private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
         private boolean canBeReadOnly = false;
         private boolean useContainerParentsIfAvailable = true;
-        private boolean enableSessionExpiredState = false;
+        private boolean enableSessionExpiredState = Boolean.getBoolean("curator-enable-session-expired-state");
 
         /**
          * Apply the current values and build a new CuratorFramework

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c64fb8f..c359fdc 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -61,6 +61,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class CuratorFrameworkImpl implements CuratorFramework
@@ -84,6 +85,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
     private final boolean useContainerParentsIfAvailable;
     private final boolean enableSessionExpiredState;
+    private final AtomicLong currentInstanceIndex = new AtomicLong(-1);
 
     private volatile ExecutorService executorService;
     private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -104,15 +106,31 @@ public class CuratorFrameworkImpl implements CuratorFramework
     public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
     {
         ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
-        this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
+        this.client = new CuratorZookeeperClient
+        (
+            localZookeeperFactory,
+            builder.getEnsembleProvider(),
+            builder.getSessionTimeoutMs(),
+            builder.getConnectionTimeoutMs(),
+            new Watcher()
+            {
+                @Override
+                public void process(WatchedEvent watchedEvent)
+                {
+                    CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this,
CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()),
null, null, null, null, null, watchedEvent, null, null);
+                    processEvent(event);
+                }
+            },
+            builder.getRetryPolicy(),
+            builder.canBeReadOnly()
+        )
         {
             @Override
-            public void process(WatchedEvent watchedEvent)
+            public boolean retryConnectionTimeouts()
             {
-                CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED,
watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null,
null, null, null, watchedEvent, null, null);
-                processEvent(event);
+                return !enableSessionExpiredState;
             }
-        }, builder.getRetryPolicy(), builder.canBeReadOnly());
+        };
 
         listeners = new ListenerContainer<CuratorListener>();
         unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
@@ -675,14 +693,29 @@ public class CuratorFrameworkImpl implements CuratorFramework
         }
         else if ( state == Watcher.Event.KeeperState.SyncConnected )
         {
+            checkNewConnection();
             connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
         }
         else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
         {
+            checkNewConnection();
             connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
         }
     }
 
+    private void checkNewConnection()
+    {
+        if ( enableSessionExpiredState )
+        {
+            long instanceIndex = client.getInstanceIndex();
+            long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex);
+            if ( (newInstanceIndex >= 0) && (instanceIndex != newInstanceIndex)
)   // currentInstanceIndex is initially -1 - ignore this
+            {
+                connectionStateManager.addStateChange(ConnectionState.LOST);
+            }
+        }
+    }
+
     Watcher.Event.KeeperState codeToState(KeeperException.Code code)
     {
         switch ( code )

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index 49d0044..79f3b62 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -86,6 +86,11 @@ public enum ConnectionState
      *     b) Curator closes the internally managed ZooKeeper instance; c) The configured
session timeout
      *     elapses during a network partition.
      * </p>
+     *
+     * <p>
+     *     NOTE: the new behavior for the LOST state can also be enabled via the command
line
+     *     property "curator-enable-session-expired-state" (e.g. -Dcurator-enable-session-expired-state=true)
+     * </p>
      */
     LOST
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index c0feb84..553faac 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -251,7 +251,8 @@ public class ConnectionStateManager implements Closeable
         {
             while ( !Thread.currentThread().isInterrupted() )
             {
-                final ConnectionState newState = eventQueue.poll(sessionTimeoutMs, TimeUnit.MILLISECONDS);
+                int pollMaxMs = (sessionTimeoutMs * 2) / 3; // 2/3 of session timeout
+                final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
                 if ( newState != null )
                 {
                     if ( listeners.size() == 0 )
@@ -294,7 +295,15 @@ public class ConnectionStateManager implements Closeable
             long elapsedMs = System.currentTimeMillis() - startOfSuspendedEpoch;
             if ( elapsedMs >= sessionTimeoutMs )
             {
-                log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting
LOST event. Elapsed ms: %d", elapsedMs));
+                log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting
LOST event and resetting the connection. Elapsed ms: %d", elapsedMs));
+                try
+                {
+                    client.getZookeeperClient().reset();
+                }
+                catch ( Exception e )
+                {
+                    log.error("Could not reset the connection", e);
+                }
                 addStateChange(ConnectionState.LOST);
             }
         }

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
index f649afb..eeec797 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -114,6 +114,7 @@ public class TestBlockUntilConnected extends BaseClassForTests
         Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.builder().
             connectString(server.getConnectString()).
+            sessionTimeoutMs(timing.session()).
             retryPolicy(new RetryOneTime(1)).
             build();
 

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
new file mode 100644
index 0000000..030a292
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
@@ -0,0 +1,99 @@
+package org.apache.curator.framework.imps;
+
+import com.google.common.collect.Queues;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class TestEnabledSessionExpiredState extends BaseClassForTests
+{
+    private final Timing timing = new Timing();
+
+    private CuratorFramework client;
+    private BlockingQueue<ConnectionState> states;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception
+    {
+        super.setup();
+
+        client = CuratorFrameworkFactory.builder()
+            .connectString(server.getConnectString())
+            .connectionTimeoutMs(timing.connection())
+            .sessionTimeoutMs(timing.session())
+            .enableSessionExpiredState()
+            .retryPolicy(new RetryOneTime(1))
+            .build();
+        client.start();
+
+        states = Queues.newLinkedBlockingQueue();
+        ConnectionStateListener listener = new ConnectionStateListener()
+        {
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState newState)
+            {
+                states.add(newState);
+            }
+        };
+        client.getConnectionStateListenable().addListener(listener);
+    }
+
+    @AfterMethod
+    @Override
+    public void teardown() throws Exception
+    {
+        CloseableUtils.closeQuietly(client);
+
+        super.teardown();
+    }
+
+    @Test
+    public void testKillSession() throws Exception
+    {
+        Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
+
+        KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
+
+        Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+        Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
+        Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
+    }
+
+    @Test
+    public void testReconnectWithoutExpiration() throws Exception
+    {
+        Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
+        server.stop();
+        try
+        {
+            client.checkExists().forPath("/");  // any API call that will invoke the retry
policy, etc.
+        }
+        catch ( KeeperException.ConnectionLossException ignore )
+        {
+        }
+        Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+        server.restart();
+        client.checkExists().forPath("/");
+        Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
+    }
+
+    @Override
+    protected boolean enabledSessionExpiredStateAware()
+    {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 6ef3bb0..c9f3524 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -16,10 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.test;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.IInvokedMethod;
+import org.testng.IInvokedMethodListener;
 import org.testng.IRetryAnalyzer;
 import org.testng.ITestContext;
+import org.testng.ITestNGListener;
 import org.testng.ITestNGMethod;
 import org.testng.ITestResult;
 import org.testng.annotations.AfterMethod;
@@ -32,11 +38,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class BaseClassForTests
 {
     protected TestingServer server;
+    private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private static final int    RETRY_WAIT_MS = 5000;
+    private static final int RETRY_WAIT_MS = 5000;
     private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES;
     private static final String INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND;
     private static final String INTERNAL_RETRY_FAILED_TESTS;
+
     static
     {
         String logConnectionIssues = null;
@@ -70,8 +78,30 @@ public class BaseClassForTests
     @BeforeSuite(alwaysRun = true)
     public void beforeSuite(ITestContext context)
     {
+        if ( !enabledSessionExpiredStateAware() )
+        {
+            ITestNGListener listener = new IInvokedMethodListener()
+            {
+                @Override
+                public void beforeInvocation(IInvokedMethod method, ITestResult testResult)
+                {
+                    int invocationCount = method.getTestMethod().getCurrentInvocationCount();
+                    System.setProperty("curator-enable-session-expired-state", Boolean.toString(invocationCount
== 1));
+                    log.info("curator-enable-session-expired-state: " + Boolean.toString(invocationCount
== 1));
+                }
+
+                @Override
+                public void afterInvocation(IInvokedMethod method, ITestResult testResult)
+                {
+                    System.clearProperty("curator-enable-session-expired-state");
+                }
+            };
+            context.getSuite().addListener(listener);
+        }
+
         for ( ITestNGMethod method : context.getAllTestMethods() )
         {
+            method.setInvocationCount(enabledSessionExpiredStateAware() ? 1 : 2);
             method.setRetryAnalyzer(new RetryTest());
         }
     }
@@ -117,6 +147,11 @@ public class BaseClassForTests
         }
     }
 
+    protected boolean enabledSessionExpiredStateAware()
+    {
+        return false;
+    }
+
     private static class RetryTest implements IRetryAnalyzer
     {
         private final AtomicBoolean hasBeenRetried = new AtomicBoolean(!Boolean.getBoolean(INTERNAL_RETRY_FAILED_TESTS));

http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-test/src/main/java/org/apache/curator/test/Timing.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/Timing.java b/curator-test/src/main/java/org/apache/curator/test/Timing.java
index f29b1c5..753d62d 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Timing.java
+++ b/curator-test/src/main/java/org/apache/curator/test/Timing.java
@@ -34,7 +34,7 @@ public class Timing
 
     private static final int DEFAULT_SECONDS = 10;
     private static final int DEFAULT_WAITING_MULTIPLE = 5;
-    private static final double SESSION_MULTIPLE = .25;
+    private static final double SESSION_MULTIPLE = 1.5;
 
     /**
      * Use the default base time


Mime
View raw message