airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dimuthu...@apache.org
Subject [airavata] branch staging updated: Adding monitor logic to identify and remove stale ssh connections in the pool and reducing default sessions per connection to 1
Date Tue, 10 Jul 2018 19:50:31 GMT
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/staging by this push:
     new 0461052  Adding monitor logic to identify and remove stale ssh connections in the
pool and reducing default sessions per connection to 1
0461052 is described below

commit 04610525e308c190976e440068d68bfdabe7eaf9
Author: dimuthu <dimuthu.upeksha2@gmail.com>
AuthorDate: Tue Jul 10 15:50:25 2018 -0400

    Adding monitor logic to identify and remove stale ssh connections in the pool and reducing
default sessions per connection to 1
---
 .../airavata/helix/adaptor/PoolingSSHJClient.java  | 48 ++++++++++++++++++----
 1 file changed, 41 insertions(+), 7 deletions(-)

diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
index c207ef4..5918e1f 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
@@ -38,8 +38,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.*;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 /**
  * This class will keep a pool of {@link SSHClient} and scale them according to the number
of SSH requests.
@@ -61,7 +63,8 @@ public class PoolingSSHJClient extends SSHClient {
     private String host;
     private int port;
 
-    private int maxSessionsForConnection = 10;
+    private int maxSessionsForConnection = 1;
+    private int maxConnectionIdleTimeMS = 60000;
 
     public void addHostKeyVerifier(HostKeyVerifier verifier) {
         this.hostKeyVerifier = verifier;
@@ -76,6 +79,19 @@ public class PoolingSSHJClient extends SSHClient {
         this.config = config;
         this.host = host;
         this.port = port;
+
+        ScheduledExecutorService poolMonitoringService = Executors.newSingleThreadScheduledExecutor(r
-> {
+            Thread thread = new Thread(r, "SSH-Pool-Monitor-" + host + "-" + port);
+            thread.setDaemon(true);
+            return thread;
+        });
+
+        poolMonitoringService.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                removeStaleConnections();
+            }
+        }, 10, maxConnectionIdleTimeMS, TimeUnit.MILLISECONDS);
     }
 
     ////////////////// client specific operations ///////
@@ -115,7 +131,7 @@ public class PoolingSSHJClient extends SSHClient {
                         }
                     }
                 } catch (Exception e) {
-                    logger.warn("Failed to fetch max session count for " + host + ". Continuing
with default value 10. " + e.getMessage() );
+                    logger.warn("Failed to fetch max session count for " + host + ". Continuing
with default value 1. " + e.getMessage() );
                 }
                 return newClient;
 
@@ -126,12 +142,12 @@ public class PoolingSSHJClient extends SSHClient {
                     Map.Entry<SSHClient, SSHClientInfo> minEntry = minEntryOp.get();
                     // use the connection with least amount of sessions created.
 
-                    logger.debug("Session count for selected connection {} is {}. Threshold
{}",
-                            minEntry.getValue().getClientId(), minEntry.getValue().getSessionCount(),
maxSessionsForConnection);
+                    logger.debug("Session count for selected connection {} is {}. Threshold
{} for host {}",
+                            minEntry.getValue().getClientId(), minEntry.getValue().getSessionCount(),
maxSessionsForConnection, host);
                     if (minEntry.getValue().getSessionCount() >= maxSessionsForConnection)
{
                         // if it exceeds the maximum session count, create a new connection
                         logger.debug("Connection with least amount of sessions exceeds the
threshold. So creating a new connection. " +
-                                "Current connection count " + clientInfoMap.size());
+                                "Current connection count {} for host {}", clientInfoMap.size(),
host);
                         SSHClient newClient = createNewSSHClient();
                         SSHClientInfo info = new SSHClientInfo(1, System.currentTimeMillis(),
clientInfoMap.size());
                         clientInfoMap.put(newClient, info);
@@ -139,13 +155,13 @@ public class PoolingSSHJClient extends SSHClient {
 
                     } else {
                         // otherwise reuse the same connetion
-                        logger.debug("Reusing the same connection {} as it doesn't exceed
the threshold", minEntry.getValue().getClientId());
+                        logger.debug("Reusing the same connection {} as it doesn't exceed
the threshold for host {}", minEntry.getValue().getClientId(), host);
                         minEntry.getValue().setSessionCount(minEntry.getValue().getSessionCount()
+ 1);
                         minEntry.getValue().setLastAccessedTime(System.currentTimeMillis());
                         return minEntry.getKey();
                     }
                 } else {
-                    throw new Exception("Failed to find a connection in the pool for " +
host);
+                    throw new Exception("Failed to find a connection in the pool for host
" + host);
                 }
             }
 
@@ -159,6 +175,7 @@ public class PoolingSSHJClient extends SSHClient {
 
         try {
             if (clientInfoMap.containsKey(client)) {
+                logger.debug("Removing the disconnected connection {} for host {}", clientInfoMap.get(client).getClientId(),
host);
                 clientInfoMap.remove(client);
             }
 
@@ -172,6 +189,7 @@ public class PoolingSSHJClient extends SSHClient {
 
         try {
             if (clientInfoMap.containsKey(client)) {
+                logger.debug("Removing the session for connection {} for host {}", clientInfoMap.get(client).getClientId(),
host);
                 SSHClientInfo sshClientInfo = clientInfoMap.get(client);
                 sshClientInfo.setSessionCount(sshClientInfo.getSessionCount() - 1);
             }
@@ -181,6 +199,22 @@ public class PoolingSSHJClient extends SSHClient {
         }
     }
 
+    private void removeStaleConnections() {
+        lock.writeLock().lock();
+        logger.debug("Removing stale connections for host {}", host);
+        try {
+            List<Map.Entry<SSHClient, SSHClientInfo>> entriesTobeRemoved = clientInfoMap.entrySet().stream().filter(entry
->
+                    ((entry.getValue().getSessionCount() == 0) &&
+                            (entry.getValue().getLastAccessedTime() + maxConnectionIdleTimeMS
< System.currentTimeMillis()))).collect(Collectors.toList());
+            entriesTobeRemoved.forEach(entry -> {
+                logger.debug("Removing connection {} due to inactivity for host {}", entry.getValue().getClientId(),
host);
+                clientInfoMap.remove(entry.getKey());
+            });
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
     private SSHClient createNewSSHClient() throws IOException {
 
         SSHClient sshClient;


Mime
View raw message