curator-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Running Fly (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (CURATOR-320) Discovery reregister triggered even if retry policy suceeds. Connection looping condition.
Date Tue, 17 Jan 2017 18:17:26 GMT

    [ https://issues.apache.org/jira/browse/CURATOR-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826385#comment-15826385
] 

Running Fly edited comment on CURATOR-320 at 1/17/17 6:17 PM:
--------------------------------------------------------------

If it helps, I did write some code to treat the symptoms. I never felt like I had a good enough
understanding of Curator to create a proper patch. 
These are the workaround changes I made to "org.apache.curator.x.discovery.details.ServiceDiscoveryImpl".
It doesn't completely eliminate the problem but It does help it recover and avoid it from
getting stuck in an indeterminate loop. It essentially aborts the reregister if the connection
drops and start over. Every time it fails it waits a little longer before retrying. This helps
to soften the load on the ZK server during a sudden connection recovery. Its far from ideal
but it works and has gotten us by for months.
{code}
/**
  * A mechanism to register and query service instances using ZooKeeper
@@ -66,17 +70,16 @@
   private final Collection<ServiceProvider<T>> providers = Sets
           .newSetFromMap(Maps.<ServiceProvider<T>, Boolean> newConcurrentMap());
   private final boolean watchInstances;
+
+  private ExecutorService reRegisterExecutor = ThreadUtils.newSingleThreadExecutor("reRegister");
+  private volatile Future< ? > reRegisterFuture = null;
+  private int reRegisterRetryCount = 0;
   private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
     @Override
     public void stateChanged(CuratorFramework client, ConnectionState newState) {
       if ((newState == ConnectionState.RECONNECTED) || (newState == ConnectionState.CONNECTED))
{
-        try {
-          log.debug("Re-registering due to reconnection");
-          reRegisterServices();
-        } catch (Exception e) {
-          ThreadUtils.checkInterrupted(e);
-          log.error("Could not re-register instances after reconnection", e);
-        }
+        log.warn("Reconnection event. Calling re-register.");
+        reRegisterServices(true);
       }
     }
   };
@@ -118,11 +121,7 @@
    */
   @Override
   public void start() throws Exception {
-    try {
-      reRegisterServices();
-    } catch (KeeperException e) {
-      log.error("Could not register instances - will try again later", e);
-    }
+    reRegisterServices(false);
     client.getConnectionStateListenable().addListener(connectionStateListener);
   }
 
@@ -368,10 +367,57 @@
     return (entry != null) ? entry.service : null;
   }
 
-  private void reRegisterServices() throws Exception {
-    for (final Entry<T> entry : services.values()) {
-      synchronized (entry) {
-        internalRegisterService(entry.service);
+  private void reRegisterServices(final boolean concurrent) {
+    synchronized (reRegisterExecutor) {
+      if (reRegisterFuture != null) {
+        reRegisterFuture.cancel(true);
+        log.warn("Re-register restarting.");
+        reRegisterRetryCount++;
+      }
+      reRegisterFuture = reRegisterExecutor.submit(new Runnable() {
+        @Override
+        public void run() {
+          int count = 0;
+          try {
+            if (reRegisterRetryCount > 0) {
+              int secToWait = reRegisterRetryCount * 5;
+              secToWait = secToWait < 180 ? secToWait : 180;
+              log.info("Re-register attempt {} will start in {}s", reRegisterRetryCount,
secToWait);
+              Thread.sleep(secToWait * 1000);
+            } else {
+              log.info("Re-register will start immediately");
+            }
+            for (final Entry<T> entry : services.values()) {
+              synchronized (entry) {
+                if (Thread.interrupted()) {
+                  throw new InterruptedException();
+                }
+                internalRegisterService(entry.service);
+              }
+              count++;
+            }
+            log.warn(
+                    "Re-registered {} services."
+                            + (reRegisterRetryCount > 0 ? " After {} retries." : ""),
+                    count, reRegisterRetryCount);
+            synchronized (reRegisterExecutor) {
+              reRegisterRetryCount = 0;
+              reRegisterFuture = null;
+            }
+          } catch (InterruptedException ie) {
+            log.warn("Re-register interrupted. After registering {} services.", count);
+          } catch (Exception e) {
+            ThreadUtils.checkInterrupted(e);
+            log.error("Could not re-register instances after reconnection", e);
+          }
+        }
+      });
+    }
+    if (concurrent == false) {
+      try {
+        reRegisterFuture.get();
+      } catch (Exception e) {
+        log.error("Re-register execution exception:", e);
       }
     }
   }
{code}


was (Author: runningfly):
If it helps I did write some cod to treat the symptoms. I never felt like I had a good enough
understanding of Curator to create a proper patch. 
If it helps these are the workaround changes I made to "org.apache.curator.x.discovery.details.ServiceDiscoveryImpl".
It doesn't completely eliminate the problem but It does help it recover and avoid it from
getting stuck in an indeterminate loop. It essentially aborts the reregister if the connection
drops and start over. Every time it fails it waits a little longer before retrying. This helps
to soften the load on the ZK server during a sudden connection recovery. Its far from ideal
but it works and has gotten us by for months.
{code}
/**
  * A mechanism to register and query service instances using ZooKeeper
@@ -66,17 +70,16 @@
   private final Collection<ServiceProvider<T>> providers = Sets
           .newSetFromMap(Maps.<ServiceProvider<T>, Boolean> newConcurrentMap());
   private final boolean watchInstances;
+
+  private ExecutorService reRegisterExecutor = ThreadUtils.newSingleThreadExecutor("reRegister");
+  private volatile Future< ? > reRegisterFuture = null;
+  private int reRegisterRetryCount = 0;
   private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
     @Override
     public void stateChanged(CuratorFramework client, ConnectionState newState) {
       if ((newState == ConnectionState.RECONNECTED) || (newState == ConnectionState.CONNECTED))
{
-        try {
-          log.debug("Re-registering due to reconnection");
-          reRegisterServices();
-        } catch (Exception e) {
-          ThreadUtils.checkInterrupted(e);
-          log.error("Could not re-register instances after reconnection", e);
-        }
+        log.warn("Reconnection event. Calling re-register.");
+        reRegisterServices(true);
       }
     }
   };
@@ -118,11 +121,7 @@
    */
   @Override
   public void start() throws Exception {
-    try {
-      reRegisterServices();
-    } catch (KeeperException e) {
-      log.error("Could not register instances - will try again later", e);
-    }
+    reRegisterServices(false);
     client.getConnectionStateListenable().addListener(connectionStateListener);
   }
 
@@ -368,10 +367,57 @@
     return (entry != null) ? entry.service : null;
   }
 
-  private void reRegisterServices() throws Exception {
-    for (final Entry<T> entry : services.values()) {
-      synchronized (entry) {
-        internalRegisterService(entry.service);
+  private void reRegisterServices(final boolean concurrent) {
+    synchronized (reRegisterExecutor) {
+      if (reRegisterFuture != null) {
+        reRegisterFuture.cancel(true);
+        log.warn("Re-register restarting.");
+        reRegisterRetryCount++;
+      }
+      reRegisterFuture = reRegisterExecutor.submit(new Runnable() {
+        @Override
+        public void run() {
+          int count = 0;
+          try {
+            if (reRegisterRetryCount > 0) {
+              int secToWait = reRegisterRetryCount * 5;
+              secToWait = secToWait < 180 ? secToWait : 180;
+              log.info("Re-register attempt {} will start in {}s", reRegisterRetryCount,
secToWait);
+              Thread.sleep(secToWait * 1000);
+            } else {
+              log.info("Re-register will start immediately");
+            }
+            for (final Entry<T> entry : services.values()) {
+              synchronized (entry) {
+                if (Thread.interrupted()) {
+                  throw new InterruptedException();
+                }
+                internalRegisterService(entry.service);
+              }
+              count++;
+            }
+            log.warn(
+                    "Re-registered {} services."
+                            + (reRegisterRetryCount > 0 ? " After {} retries." : ""),
+                    count, reRegisterRetryCount);
+            synchronized (reRegisterExecutor) {
+              reRegisterRetryCount = 0;
+              reRegisterFuture = null;
+            }
+          } catch (InterruptedException ie) {
+            log.warn("Re-register interrupted. After registering {} services.", count);
+          } catch (Exception e) {
+            ThreadUtils.checkInterrupted(e);
+            log.error("Could not re-register instances after reconnection", e);
+          }
+        }
+      });
+    }
+    if (concurrent == false) {
+      try {
+        reRegisterFuture.get();
+      } catch (Exception e) {
+        log.error("Re-register execution exception:", e);
       }
     }
   }
{code}

> Discovery reregister triggered even if retry policy suceeds. Connection looping condition.
> ------------------------------------------------------------------------------------------
>
>                 Key: CURATOR-320
>                 URL: https://issues.apache.org/jira/browse/CURATOR-320
>             Project: Apache Curator
>          Issue Type: Bug
>          Components: Client, Framework
>    Affects Versions: TBD, 2.10.0
>         Environment: 3 server Quorum running on individual AWS boxes.
> Session timeout set to 1-2 min on most clients.
>            Reporter: Running Fly
>             Fix For: TBD
>
>
>     ServiceDiscoveryImpl.reRegisterServices() can be trigger  on ConnectionState events:
RECONNECTED and CONNECTED. Causing the reRegisterServices() method to be run on ConnectionStateManager
thread. If a connection drops while running reRegisterServices() it will be recovered by the
retry policy. However the ConnectionState SUSPENDED followed by RECONNECTED events will be
queued but not fired until reRegisterServices() completes(ConnectionStateManager Thread fires
these events but is in use). When it does complete the RECONNECTED event in the queue will
fire and reRegisterServices() will rerun.
>     When zookeeper's server connection is interrupted all of the clients will simultaneously
call reRegisterServices(). This overloads the server with requests causing connections to
timeout and reset. Thus queuing up more RECONNECTED events. This state can persist indefinitely.
>     Because the reRegisterServices() will most likely receive a NodeExistsException.
It deletes and recreates the node. Effectively causing the services to thrash up and down.
Wreaking havoc on our service dependency chain. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message