airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [2/2] airavata git commit: Fixed AIRAVATA-2284, using guave caching with time based eviction
Date Fri, 30 Dec 2016 20:50:53 GMT
Fixed AIRAVATA-2284, using guave caching with time based eviction


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/644bf896
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/644bf896
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/644bf896

Branch: refs/heads/develop
Commit: 644bf89629ac61ab35917bafa737ecbcbe547463
Parents: c50e3ec
Author: Shameera Rathnayaka <shameerainfo@gmail.com>
Authored: Fri Dec 30 15:50:38 2016 -0500
Committer: Shameera Rathnayaka <shameerainfo@gmail.com>
Committed: Fri Dec 30 15:50:38 2016 -0500

----------------------------------------------------------------------
 .../airavata/common/utils/ServerSettings.java   |  8 ++++
 .../main/resources/airavata-server.properties   |  2 +
 .../org/apache/airavata/gfac/impl/Factory.java  | 39 ++++++++++++++++----
 .../airavata/gfac/impl/HPCRemoteCluster.java    | 29 ++++++---------
 .../airavata/messaging/core/Subscriber.java     |  2 -
 5 files changed, 54 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/644bf896/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index cdf49e0..7ab807e 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -127,6 +127,9 @@ public class ServerSettings extends ApplicationSettings {
     public static final String USER_PROFILE_SERVER_HOST = "user.profile.server.host";
     public static final String USER_PROFILE_SERVER_PORT = "user.profile.server.port";
 
+    /* Caching */
+    private static final String SESSION_CACHE_ACCESS_TIME_OUT = "ssh.session.cache.access.timeout";
+
     // todo until AIRAVATA-2066 is finished, keep server side list configurations here.
     private static Map<String, String[]> listConfigurations = new HashMap<>();
 
@@ -470,4 +473,9 @@ public class ServerSettings extends ApplicationSettings {
     public static int getAuroraSchedulerTimeout() throws ApplicationSettingsException {
     	return Integer.valueOf(getSetting(AURORA_SCHEDULER_CONNECT_TIMEOUT_MS));
     }
+
+    public static int getSessionCacheAccessTimeout() {
+        return Integer.valueOf(getSetting(SESSION_CACHE_ACCESS_TIME_OUT, "30"));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/644bf896/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index ffc9ea8..defc1cc 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -170,6 +170,8 @@ gfac.server.port=8950
 gfac.thread.pool.size=50
 host.scheduler=org.apache.airavata.gfac.impl.DefaultHostScheduler
 
+# ssh session access timeout in minutes default is 30 minutes
+#ssh.session.cache.access.timeout=30
 
 ###########################################################################
 #  Registry Server Configurations

http://git-wip-us.apache.org/repos/asf/airavata/blob/644bf896/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index bf06086..fda5386 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -20,6 +20,9 @@
  */
 package org.apache.airavata.gfac.impl;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
 import com.jcraft.jsch.Channel;
 import com.jcraft.jsch.ChannelExec;
 import com.jcraft.jsch.JSch;
@@ -33,7 +36,6 @@ import org.apache.airavata.credential.store.credential.Credential;
 import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
 import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.credential.store.store.CredentialStoreException;
-import org.apache.airavata.gfac.core.GFac;
 import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
@@ -55,13 +57,21 @@ import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
 import org.apache.airavata.gfac.core.watcher.RedeliveryRequestWatcher;
-import org.apache.airavata.gfac.impl.job.*;
+import org.apache.airavata.gfac.impl.job.ForkJobConfiguration;
+import org.apache.airavata.gfac.impl.job.LSFJobConfiguration;
+import org.apache.airavata.gfac.impl.job.PBSJobConfiguration;
+import org.apache.airavata.gfac.impl.job.SlurmJobConfiguration;
+import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
 import org.apache.airavata.gfac.impl.task.ArchiveTask;
 import org.apache.airavata.gfac.impl.watcher.CancelRequestWatcherImpl;
 import org.apache.airavata.gfac.impl.watcher.RedeliveryRequestWatcherImpl;
 import org.apache.airavata.gfac.monitor.cloud.AuroraJobMonitor;
 import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
-import org.apache.airavata.messaging.core.*;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.messaging.core.Type;
 import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
 import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
@@ -81,7 +91,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 public abstract class Factory {
 
@@ -106,6 +121,7 @@ public abstract class Factory {
 	private static Map<MonitorMode, JobMonitor> jobMonitorServices = new HashMap<>();
 	private static Subscriber processLaunchSubscriber;
 	private static Map<String, Session> sessionMap = new HashMap<>();
+	private static Cache<String,Session> sessionCache;
 
 	public static GFacEngine getGFacEngine() throws GFacException {
 		if (engine == null) {
@@ -422,6 +438,15 @@ public abstract class Factory {
 		} catch (Exception e) {
 			throw new GFacException("Gfac config issue", e);
 		}
+
+		sessionCache = CacheBuilder.newBuilder()
+				.expireAfterAccess(ServerSettings.getSessionCacheAccessTimeout(), TimeUnit.MINUTES)
+				.removalListener((RemovalListener<String, Session>) removalNotification -> {
+					if (removalNotification.getValue().isConnected()) {
+						removalNotification.getValue().disconnect();
+					}
+                })
+				.build();
 	}
 
 	public static JobMonitor getMonitorService(MonitorMode monitorMode) throws AiravataException,
GFacException {
@@ -479,7 +504,7 @@ public abstract class Factory {
 			throw new GFacException("Support ssh key authentication only");
 		}
 		String key = buildKey(serverInfo);
-		Session session = sessionMap.get(key);
+		Session session = sessionCache.getIfPresent(key);
 		boolean valid = isValidSession(session);
 		// FIXME - move following info logs to debug
 		if (valid) {
@@ -514,7 +539,7 @@ public abstract class Factory {
 					session.setConfig("StrictHostKeyChecking", "no");
 				}
 				session.connect(); // 0 connection timeout
-				sessionMap.put(key, session);
+				sessionCache.put(key, session);
 			} catch (JSchException e) {
 				throw new GFacException("JSch initialization error ", e);
 			}
@@ -522,7 +547,7 @@ public abstract class Factory {
 			// FIXME - move following info log to debug
 			log.info("Reuse SSH session for :" + key);
 		}
-		return sessionMap.get(key);
+		return session;
 
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/644bf896/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index 5002a6b..262f7f7 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -55,7 +55,6 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 	private static final int MAX_RETRY_COUNT = 3;
 	private final SSHKeyAuthentication authentication;
 	private final JSch jSch;
-	private Session session;
 
 	public HPCRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration,
AuthenticationInfo
 			authenticationInfo) throws AiravataException, GFacException {
@@ -69,7 +68,6 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 			jSch = new JSch();
 			jSch.addIdentity(UUID.randomUUID().toString(), authentication.getPrivateKey(), authentication.getPublicKey(),
 					authentication.getPassphrase().getBytes());
-			session = Factory.getSSHSession(authenticationInfo, serverInfo);
 		} catch (JSchException e) {
 			throw new AiravataException("JSch initialization error ", e);
 		}
@@ -120,13 +118,11 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 		int retry = 3;
 		while (retry > 0) {
 			try {
-				session = Factory.getSSHSession(authenticationInfo, serverInfo);
 				log.info("Transferring localhost:" + localFile  + " to " + serverInfo.getHost() + ":"
+ remoteFile);
-				SSHUtils.scpTo(localFile, remoteFile, session);
+				SSHUtils.scpTo(localFile, remoteFile,  getSshSession());
 				retry = 0;
 			} catch (Exception e) {
 				retry--;
-				session = Factory.getSSHSession(authenticationInfo, serverInfo);
 				if (retry == 0) {
 					throw new GFacException("Failed to scp localhost:" + localFile + " to " + serverInfo.getHost()
+
 							":" + remoteFile, e);
@@ -138,18 +134,20 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 		}
 	}
 
+	private Session getSshSession() throws GFacException {
+		return Factory.getSSHSession(authenticationInfo, serverInfo);
+	}
+
 	@Override
 	public void copyFrom(String remoteFile, String localFile) throws GFacException {
 		int retry = 3;
 		while(retry>0) {
 			try {
-				session = Factory.getSSHSession(authenticationInfo, serverInfo);
 				log.info("Transferring " + serverInfo.getHost() + ":" + remoteFile + " To localhost:"
+ localFile);
-				SSHUtils.scpFrom(remoteFile, localFile, session);
+				SSHUtils.scpFrom(remoteFile, localFile, getSession());
 				retry=0;
 			} catch (Exception e) {
 				retry--;
-				session = Factory.getSSHSession(authenticationInfo, serverInfo);
 				if (retry == 0) {
 					throw new GFacException("Failed to scp " + serverInfo.getHost() + ":" + remoteFile +
" to " +
 							"localhost:" + localFile, e);
@@ -170,13 +168,12 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 		try {
 			while (retryCount < MAX_RETRY_COUNT) {
 				retryCount++;
-				session = Factory.getSSHSession(authenticationInfo, serverInfo);
 				log.info("Transferring from:" + sourceFile + " To: " + destinationFile);
 				try {
 					if (direction == DIRECTION.FROM) {
-                        SSHUtils.scpThirdParty(sourceFile, session, destinationFile, clientSession,
ignoreEmptyFile);
+                        SSHUtils.scpThirdParty(sourceFile, getSession(), destinationFile,
clientSession, ignoreEmptyFile);
                     } else {
-                        SSHUtils.scpThirdParty(sourceFile, clientSession, destinationFile,
session, ignoreEmptyFile);
+                        SSHUtils.scpThirdParty(sourceFile, clientSession, destinationFile,
getSession(), ignoreEmptyFile);
                     }
 					break; // exit while loop
 				} catch (JSchException e) {
@@ -200,10 +197,9 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 		try {
 			while (retryCount < MAX_RETRY_COUNT) {
 				retryCount++;
-				session = Factory.getSSHSession(authenticationInfo, serverInfo);
 				log.info("Creating directory: " + serverInfo.getHost() + ":" + directoryPath);
 				try {
-					SSHUtils.makeDirectory(directoryPath, session);
+					SSHUtils.makeDirectory(directoryPath, getSession());
 					break;  // Exit while loop
 				} catch (JSchException e) {
 					if (retryCount == MAX_RETRY_COUNT) {
@@ -260,9 +256,8 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 	@Override
 	public List<String> listDirectory(String directoryPath) throws GFacException {
 		try {
-			session = Factory.getSSHSession(authenticationInfo, serverInfo);
 			log.info("Creating directory: " + serverInfo.getHost() + ":" + directoryPath);
-			return SSHUtils.listDirectory(directoryPath, session);
+			return SSHUtils.listDirectory(directoryPath, getSession());
 		} catch (JSchException | IOException e) {
 			throw new GFacException("Failed to list directory " + serverInfo.getHost() + ":" + directoryPath,
e);
 		}
@@ -277,7 +272,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 
 	@Override
 	public Session getSession() throws GFacException {
-		return Factory.getSSHSession(authenticationInfo, serverInfo);
+		return getSshSession();
 	}
 
 	@Override
@@ -314,7 +309,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 			while (retryCount < MAX_RETRY_COUNT) {
 				retryCount++;
 				try {
-					session = Factory.getSSHSession(authenticationInfo, serverInfo);
+					Session session = getSshSession();
 					channelExec = ((ChannelExec) session.openChannel("exec"));
 					channelExec.setCommand(command);
 					channelExec.setInputStream(null);

http://git-wip-us.apache.org/repos/asf/airavata/blob/644bf896/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
index cc357a0..5c8a65f 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
@@ -26,10 +26,8 @@ import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Consumer;
 import org.apache.airavata.common.exception.AiravataException;
 
-import javax.annotation.Nonnull;
 import java.util.List;
 import java.util.function.BiFunction;
-import java.util.function.Function;
 
 /**
  * This is the basic consumer


Mime
View raw message