brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [07/16] git commit: fix leak of ssh pool cache and cleanup logic
Date Sun, 19 Oct 2014 00:59:54 GMT
fix leak of ssh pool cache and cleanup logic


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/8b838c31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/8b838c31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/8b838c31

Branch: refs/heads/master
Commit: 8b838c3115dd10050ce3f821edd6e04f963c3327
Parents: 81b027b
Author: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Authored: Sat Oct 18 04:00:46 2014 +0100
Committer: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Committed: Sat Oct 18 04:00:46 2014 +0100

----------------------------------------------------------------------
 .../location/basic/SshMachineLocation.java      | 61 +++++++++++++-------
 1 file changed, 39 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8b838c31/core/src/main/java/brooklyn/location/basic/SshMachineLocation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/location/basic/SshMachineLocation.java b/core/src/main/java/brooklyn/location/basic/SshMachineLocation.java
index c732e58..a381e34 100644
--- a/core/src/main/java/brooklyn/location/basic/SshMachineLocation.java
+++ b/core/src/main/java/brooklyn/location/basic/SshMachineLocation.java
@@ -52,6 +52,7 @@ import brooklyn.config.ConfigKey;
 import brooklyn.config.ConfigKey.HasConfigKey;
 import brooklyn.config.ConfigUtils;
 import brooklyn.entity.basic.BrooklynConfigKeys;
+import brooklyn.entity.basic.BrooklynTaskTags;
 import brooklyn.entity.basic.ConfigKeys;
 import brooklyn.event.basic.BasicConfigKey;
 import brooklyn.event.basic.MapConfigKey;
@@ -84,7 +85,6 @@ import brooklyn.util.ssh.BashCommands;
 import brooklyn.util.stream.KnownSizeInputStream;
 import brooklyn.util.stream.ReaderInputStream;
 import brooklyn.util.stream.StreamGobbler;
-import brooklyn.util.task.BasicTask;
 import brooklyn.util.task.ScheduledTask;
 import brooklyn.util.task.Tasks;
 import brooklyn.util.task.system.internal.ExecWithLoggingHelpers;
@@ -208,7 +208,7 @@ public class SshMachineLocation extends AbstractLocation implements MachineLocat
             }));
 
     private Task<?> cleanupTask;
-    private transient LoadingCache<Map<String, ?>, Pool<SshTool>> sshPoolCache;
+    private transient LoadingCache<Map<String, ?>, Pool<SshTool>> sshPoolCacheOrNull;
 
     public SshMachineLocation() {
         this(MutableMap.of());
@@ -219,6 +219,22 @@ public class SshMachineLocation extends AbstractLocation implements MachineLocat
         usedPorts = (usedPorts != null) ? Sets.newLinkedHashSet(usedPorts) : Sets.<Integer>newLinkedHashSet();
     }
 
+    @Override
+    public void init() {
+        super.init();
+    }
+
+    private final transient Object poolCacheMutex = new Object();
+    private LoadingCache<Map<String, ?>, Pool<SshTool>> getSshPoolCache()
{
+        synchronized (poolCacheMutex) {
+            if (sshPoolCacheOrNull==null) {
+                sshPoolCacheOrNull = buildSshToolPoolCacheLoader();
+                addSshPoolCacheCleanupTask();
+            }
+        }
+        return sshPoolCacheOrNull;
+    }
+    
     private LoadingCache<Map<String, ?>, Pool<SshTool>> buildSshToolPoolCacheLoader()
{
         // TODO: Appropriate numbers for maximum size and expire after access
         // At the moment every SshMachineLocation instance creates its own pool.
@@ -261,7 +277,7 @@ public class SshMachineLocation extends AbstractLocation implements MachineLocat
                             LOG.debug("{} building ssh pool for {} with properties: {}",
                                     new Object[] {this, getSshHostAndPort(), properties});
                         }
-                        return buildPool(properties);
+                        return _buildPool(properties);
                     }
                 });
 
@@ -284,7 +300,7 @@ public class SshMachineLocation extends AbstractLocation implements MachineLocat
                 });
     }
 
-    private BasicPool<SshTool> buildPool(final Map<String, ?> properties) {
+    private BasicPool<SshTool> _buildPool(final Map<String, ?> properties) {
         return BasicPool.<SshTool>builder()
                 .name(getDisplayName()+"@"+address+
                         (hasConfig(SSH_HOST, true) ? "("+getConfig(SSH_HOST)+":"+getConfig(SSH_PORT)+")"
: "")+
@@ -332,19 +348,23 @@ public class SshMachineLocation extends AbstractLocation implements
MachineLocat
         }
         return this;
     }
-
-    @Override
-    public void init() {
-        super.init();
+    
+    protected void addSshPoolCacheCleanupTask() {
+        if (cleanupTask!=null && !cleanupTask.isDone()) {
+            return;
+        }
         
-        sshPoolCache = buildSshToolPoolCacheLoader();
-
         Callable<Task<?>> cleanupTaskFactory = new Callable<Task<?>>()
{
             @Override public Task<Void> call() {
-                return Tasks.<Void>builder().dynamic(false).name("ssh-location cache
cleaner").body(new Callable<Void>() {
+                return Tasks.<Void>builder().dynamic(false).tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
+                    .name("ssh-location cache cleaner").body(new Callable<Void>() {
                     @Override public Void call() {
                         try {
-                            if (sshPoolCache != null) sshPoolCache.cleanUp();
+                            if (sshPoolCacheOrNull != null) sshPoolCacheOrNull.cleanUp();
+                            if (!SshMachineLocation.this.isManaged()) {
+                                cleanupTask.cancel(false);
+                                sshPoolCacheOrNull = null;
+                            }
                             return null;
                         } catch (Exception e) {
                             // Don't rethrow: the behaviour of executionManager is different
from a scheduledExecutorService,
@@ -368,15 +388,16 @@ public class SshMachineLocation extends AbstractLocation implements
MachineLocat
     // we should probably expose a mechanism such as that in Entity (or re-use Entity for
locations!)
     @Override
     public void close() throws IOException {
-        if (sshPoolCache != null) {
+        if (sshPoolCacheOrNull != null) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("{} invalidating all entries in ssh pool cache. Final stats: {}",
this, sshPoolCache.stats());
+                LOG.debug("{} invalidating all entries in ssh pool cache. Final stats: {}",
this, sshPoolCacheOrNull.stats());
             }
-            sshPoolCache.invalidateAll();
+            sshPoolCacheOrNull.invalidateAll();
         }
         if (cleanupTask != null) {
             cleanupTask.cancel(false);
             cleanupTask = null;
+            sshPoolCacheOrNull = null;
         }
     }
 
@@ -436,10 +457,7 @@ public class SshMachineLocation extends AbstractLocation implements MachineLocat
     }
 
     protected <T> T execSsh(final Map<String, ?> props, final Function<ShellTool,
T> task) {
-        if (sshPoolCache == null) {
-            // required for uses that instantiate SshMachineLocation directly, so init()
will not have been called
-            sshPoolCache = buildSshToolPoolCacheLoader();
-        }
+        final LoadingCache<Map<String, ?>, Pool<SshTool>> sshPoolCache
= getSshPoolCache();
         Pool<SshTool> pool = sshPoolCache.getUnchecked(props);
         if (LOG.isTraceEnabled()) {
             LOG.trace("{} execSsh got pool: {}", this, pool);
@@ -899,10 +917,9 @@ public class SshMachineLocation extends AbstractLocation implements MachineLocat
     //We want the SshMachineLocation to be serializable and therefore the pool needs to be
dealt with correctly.
     //In this case we are not serializing the pool (we made the field transient) and create
a new pool when deserialized.
     //This fix is currently needed for experiments, but isn't used in normal Brooklyn usage.
-    private void readObject(java.io.ObjectInputStream in)
-            throws IOException, ClassNotFoundException {
+    private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException
{
         in.defaultReadObject();
-        sshPoolCache = buildSshToolPoolCacheLoader();
+        getSshPoolCache();
     }
 
     /** returns the un-passphrased key-pair info if a key is being used, or else null */


Mime
View raw message