curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject git commit: CURATOR-126: Fix race condition in CuratorFrameworkImpl.close()
Date Mon, 28 Jul 2014 21:31:16 GMT
Repository: curator
Updated Branches:
  refs/heads/CURATOR-126 [created] 785e9f6c8


CURATOR-126: Fix race condition in CuratorFrameworkImpl.close()


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/785e9f6c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/785e9f6c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/785e9f6c

Branch: refs/heads/CURATOR-126
Commit: 785e9f6c8a528d0c07652450471dcb71a5717776
Parents: 15a0aac
Author: Scott Blum <scottb@squareup.com>
Authored: Mon Jul 28 14:10:37 2014 -0400
Committer: Scott Blum <scottb@squareup.com>
Committed: Mon Jul 28 17:13:17 2014 -0400

----------------------------------------------------------------------
 .../framework/CuratorFrameworkFactory.java      | 17 +++++++++++++++
 .../framework/imps/CuratorFrameworkImpl.java    | 22 +++++++++++++++-----
 2 files changed, 34 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/785e9f6c/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 2d21fb7..8ef2580 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -52,6 +52,7 @@ public class CuratorFrameworkFactory
     private static final DefaultZookeeperFactory    DEFAULT_ZOOKEEPER_FACTORY = new DefaultZookeeperFactory();
     private static final DefaultACLProvider         DEFAULT_ACL_PROVIDER = new DefaultACLProvider();
     private static final long                       DEFAULT_INACTIVE_THRESHOLD_MS = (int)TimeUnit.MINUTES.toMillis(3);
+    private static final int                        DEFAULT_CLOSE_WAIT_MS = (int)TimeUnit.SECONDS.toMillis(1);
 
     /**
      * Return a new builder that builds a CuratorFramework
@@ -101,6 +102,7 @@ public class CuratorFrameworkFactory
         private EnsembleProvider    ensembleProvider;
         private int                 sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS;
         private int                 connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS;
+        private int                 maxCloseWaitMs = DEFAULT_CLOSE_WAIT_MS;
         private RetryPolicy         retryPolicy;
         private ThreadFactory       threadFactory = null;
         private String              namespace;
@@ -239,6 +241,16 @@ public class CuratorFrameworkFactory
         }
 
         /**
+         * @param maxCloseWaitMs time to wait during close to join background threads
+         * @return this
+         */
+        public Builder maxCloseWaitMs(int maxCloseWaitMs)
+        {
+            this.maxCloseWaitMs = maxCloseWaitMs;
+            return this;
+        }
+
+        /**
          * @param retryPolicy retry policy to use
          * @return this
          */
@@ -336,6 +348,11 @@ public class CuratorFrameworkFactory
             return connectionTimeoutMs;
         }
 
+        public int getMaxCloseWaitMs()
+        {
+            return maxCloseWaitMs;
+        }
+
         public RetryPolicy getRetryPolicy()
         {
             return retryPolicy;

http://git-wip-us.apache.org/repos/asf/curator/blob/785e9f6c/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 23a3248..7f7cc98 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -63,6 +63,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final ListenerContainer<CuratorListener> listeners;
     private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
     private final ThreadFactory threadFactory;
+    private final int maxCloseWaitMs;
     private final BlockingQueue<OperationAndData<?>> backgroundOperations;
     private final NamespaceImpl namespace;
     private final ConnectionStateManager connectionStateManager;
@@ -127,6 +128,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         backgroundOperations = new DelayQueue<OperationAndData<?>>();
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
+        maxCloseWaitMs = builder.getMaxCloseWaitMs();
         connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
@@ -179,6 +181,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         listeners = parent.listeners;
         unhandledErrorListeners = parent.unhandledErrorListeners;
         threadFactory = parent.threadFactory;
+        maxCloseWaitMs = parent.maxCloseWaitMs;
         backgroundOperations = parent.backgroundOperations;
         connectionStateManager = parent.connectionStateManager;
         defaultData = parent.defaultData;
@@ -297,15 +300,24 @@ public class CuratorFrameworkImpl implements CuratorFramework
                     }
                 });
 
+            if ( executorService != null )
+            {
+                executorService.shutdownNow();
+                try
+                {
+                    executorService.awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS);
+                }
+                catch ( InterruptedException e )
+                {
+                    // Interrupted while interrupting; I give up.
+                    Thread.currentThread().interrupt();
+                }
+            }
             listeners.clear();
             unhandledErrorListeners.clear();
             connectionStateManager.close();
             client.close();
             namespaceWatcherMap.close();
-            if ( executorService != null )
-            {
-                executorService.shutdownNow();
-            }
         }
     }
 
@@ -759,7 +771,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
     private void backgroundOperationsLoop()
     {
-        while ( !Thread.interrupted() )
+        while ( !Thread.currentThread().isInterrupted() )
         {
             OperationAndData<?> operationAndData;
             try


Mime
View raw message