curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [07/12] git commit: CURATOR-24 Improve the handling of hung ZooKeeper connections
Date Fri, 17 May 2013 20:39:17 GMT
CURATOR-24
Improve the handling of hung ZooKeeper connections


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/11ae23ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/11ae23ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/11ae23ad

Branch: refs/heads/master
Commit: 11ae23adc013fa6526ac001bce2f5b2967a1d9b5
Parents: 38f28b5
Author: randgalt <randgalt@apache.org>
Authored: Thu May 9 17:18:09 2013 -0700
Committer: randgalt <randgalt@apache.org>
Committed: Thu May 9 17:18:09 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/curator/ConnectionState.java   |  169 +++++++--------
 .../org/apache/curator/CuratorZookeeperClient.java |    9 -
 .../framework/state/ConnectionStateManager.java    |    5 -
 3 files changed, 84 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/11ae23ad/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index 8de3e27..bbb0588 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -38,24 +38,23 @@ import java.util.concurrent.atomic.AtomicReference;
 
 class ConnectionState implements Watcher, Closeable
 {
-    private volatile long connectionStartMs = 0;
-
-    private final Logger                        log = LoggerFactory.getLogger(getClass());
-    private final HandleHolder                  zooKeeper;
-    private final AtomicBoolean                 isConnected = new AtomicBoolean(false);
-    private final AtomicBoolean                 lost = new AtomicBoolean(false);
-    private final EnsembleProvider              ensembleProvider;
-    private final int                           connectionTimeoutMs;
+    private static final int MAX_BACKGROUND_EXCEPTIONS = 10;
+    private static final boolean LOG_EVENTS = Boolean.getBoolean(DebugUtils.PROPERTY_LOG_EVENTS);
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final HandleHolder zooKeeper;
+    private final AtomicBoolean isConnected = new AtomicBoolean(false);
+    private final EnsembleProvider ensembleProvider;
+    private final int sessionTimeoutMs;
+    private final int connectionTimeoutMs;
     private final AtomicReference<TracerDriver> tracer;
-    private final Queue<Exception>              backgroundExceptions = new ConcurrentLinkedQueue<Exception>();
-    private final Queue<Watcher>                parentWatchers = new ConcurrentLinkedQueue<Watcher>();
-
-    private static final int        MAX_BACKGROUND_EXCEPTIONS = 10;
-    private static final boolean    LOG_EVENTS = Boolean.getBoolean(DebugUtils.PROPERTY_LOG_EVENTS);
+    private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>();
+    private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
+    private volatile long connectionStartMs = 0;
 
     ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver>
tracer, boolean canBeReadOnly)
     {
         this.ensembleProvider = ensembleProvider;
+        this.sessionTimeoutMs = sessionTimeoutMs;
         this.connectionTimeoutMs = connectionTimeoutMs;
         this.tracer = tracer;
         if ( parentWatcher != null )
@@ -73,12 +72,6 @@ class ConnectionState implements Watcher, Closeable
             throw new SessionFailRetryLoop.SessionFailedException();
         }
 
-        if ( lost.compareAndSet(true, false) )
-        {
-            log.info("resetting after loss");
-            reset();
-        }
-
         Exception exception = backgroundExceptions.poll();
         if ( exception != null )
         {
@@ -90,24 +83,7 @@ class ConnectionState implements Watcher, Closeable
         boolean localIsConnected = isConnected.get();
         if ( !localIsConnected )
         {
-            long        elapsed = System.currentTimeMillis() - connectionStartMs;
-            if ( elapsed >= connectionTimeoutMs )
-            {
-                if ( zooKeeper.hasNewConnectionString() )
-                {
-                    handleNewConnectionString();
-                }
-                else
-                {
-                    KeeperException.ConnectionLossException connectionLossException = new
KeeperException.ConnectionLossException();
-                    if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES)
)
-                    {
-                        log.error(String.format("Connection timed out for connection string
(%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs,
elapsed), connectionLossException);
-                    }
-                    tracer.get().addCount("connections-timed-out", 1);
-                    throw connectionLossException;
-                }
-            }
+            checkTimeouts();
         }
 
         return zooKeeper.getZooKeeper();
@@ -118,7 +94,7 @@ class ConnectionState implements Watcher, Closeable
         return isConnected.get();
     }
 
-    void        start() throws Exception
+    void start() throws Exception
     {
         log.debug("Starting");
         ensembleProvider.start();
@@ -126,7 +102,7 @@ class ConnectionState implements Watcher, Closeable
     }
 
     @Override
-    public void        close() throws IOException
+    public void close() throws IOException
     {
         log.debug("Closing");
 
@@ -142,27 +118,19 @@ class ConnectionState implements Watcher, Closeable
         finally
         {
             isConnected.set(false);
-            lost.set(false);
         }
     }
 
-    void        addParentWatcher(Watcher watcher)
+    void addParentWatcher(Watcher watcher)
     {
         parentWatchers.offer(watcher);
     }
 
-    void        removeParentWatcher(Watcher watcher)
+    void removeParentWatcher(Watcher watcher)
     {
         parentWatchers.remove(watcher);
     }
 
-    void markLost()
-    {
-        log.info("lost marked");
-
-        lost.set(true);
-    }
-
     @Override
     public void process(WatchedEvent event)
     {
@@ -188,10 +156,6 @@ class ConnectionState implements Watcher, Closeable
         if ( newIsConnected != wasConnected )
         {
             isConnected.set(newIsConnected);
-            if ( newIsConnected )
-            {
-                lost.set(false);
-            }
             connectionStartMs = System.currentTimeMillis();
         }
     }
@@ -201,6 +165,41 @@ class ConnectionState implements Watcher, Closeable
         return ensembleProvider;
     }
 
+    private synchronized void checkTimeouts() throws Exception
+    {
+        int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
+        long elapsed = System.currentTimeMillis() - connectionStartMs;
+        if ( elapsed >= minTimeout )
+        {
+            if ( zooKeeper.hasNewConnectionString() )
+            {
+                handleNewConnectionString();
+            }
+            else
+            {
+                int maxTimeout = Math.max(sessionTimeoutMs, connectionTimeoutMs);
+                if ( elapsed > maxTimeout )
+                {
+                    if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES)
)
+                    {
+                        log.warn(String.format("Connection attempt unsuccessful after %d
(greater than max timeout of %d). Resetting connection and trying again with a new connection.",
elapsed, maxTimeout));
+                    }
+                    reset();
+                }
+                else
+                {
+                    KeeperException.ConnectionLossException connectionLossException = new
KeeperException.ConnectionLossException();
+                    if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES)
)
+                    {
+                        log.error(String.format("Connection timed out for connection string
(%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs,
elapsed), connectionLossException);
+                    }
+                    tracer.get().addCount("connections-timed-out", 1);
+                    throw connectionLossException;
+                }
+            }
+        }
+    }
+
     private synchronized void reset() throws Exception
     {
         log.debug("reset");
@@ -213,44 +212,44 @@ class ConnectionState implements Watcher, Closeable
 
     private boolean checkState(Event.KeeperState state, boolean wasConnected)
     {
-        boolean     isConnected = wasConnected;
-        boolean     checkNewConnectionString = true;
+        boolean isConnected = wasConnected;
+        boolean checkNewConnectionString = true;
         switch ( state )
         {
-            default:
-            case Disconnected:
-            {
-                isConnected = false;
-                break;
-            }
+        default:
+        case Disconnected:
+        {
+            isConnected = false;
+            break;
+        }
 
-            case SyncConnected:
-            case ConnectedReadOnly:
-            {
-                isConnected = true;
-                break;
-            }
+        case SyncConnected:
+        case ConnectedReadOnly:
+        {
+            isConnected = true;
+            break;
+        }
 
-            case AuthFailed:
-            {
-                isConnected = false;
-                log.error("Authentication failed");
-                break;
-            }
+        case AuthFailed:
+        {
+            isConnected = false;
+            log.error("Authentication failed");
+            break;
+        }
 
-            case Expired:
-            {
-                isConnected = false;
-                checkNewConnectionString = false;
-                handleExpiredSession();
-                break;
-            }
+        case Expired:
+        {
+            isConnected = false;
+            checkNewConnectionString = false;
+            handleExpiredSession();
+            break;
+        }
 
-            case SaslAuthenticated:
-            {
-                // NOP
-                break;
-            }
+        case SaslAuthenticated:
+        {
+            // NOP
+            break;
+        }
         }
 
         if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() )

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/11ae23ad/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index be63a9b..f633fe1 100644
--- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -184,15 +184,6 @@ public class CuratorZookeeperClient implements Closeable
     }
 
     /**
-     * Mark the connection as lost. The next time {@link #getZooKeeper()} is called,
-     * a new connection will be created.
-     */
-    public void markLost()
-    {
-        state.markLost();
-    }
-
-    /**
      * Close the client
      */
     public void     close()

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/11ae23ad/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 8f5d9ff..2ed60c1 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -141,11 +141,6 @@ public class ConnectionStateManager implements Closeable
             return;
         }
 
-        if ( newState == ConnectionState.LOST )
-        {
-            client.getZookeeperClient().markLost();
-        }
-
         ConnectionState     previousState = currentState.getAndSet(newState);
         if ( previousState == newState )
         {


Mime
View raw message